Who is tweeting about hashtag KSQL?
Written by  Viktor Gamov -
Table of Contents
TL;DR
Another day, another post.
This time it’s another playbook for my http://DataSciCon.tech talk «Who’s tweeting about #datascicon
» on November 30th 2018 [1]
A full source code published in confluetninc/demo-scene
repository [2]
Version | Date | Comments |
---|---|---|
v1.1 |
12/02/2018 |
Small fixes in codes, screenshots, images |
v1.0 |
11/28/2018 |
Initial revision |
Prerequisites
-
Docker
-
Docker Compose
-
Get example from GitHub
-
If you will follow steps below you should checkout only directory that has source code relevant to this post.
mkdir ~/temp/demo-scene cd ~/temp/demo-scene git init . git remote add origin -f https://github.com/confluentinc/demo-scene/ git config core.sparsecheckout true echo "twitter-streams/*" >> .git/info/sparse-checkout git pull --depth=2 origin master cd twitter-streams ls -lh
and you should see something like this
Figure 1. The output ofls -lh twitter-streams
-
Demo Playbook
Download dependencies
This command downloads monitoring-interceptors and installs it local folder
make
KSQL Part
Let’s start with few simple commands.
Connect to remote KSQL server with KSQL cli
ksql http://ksql-demo:9098
Taking a quick look around
SET 'auto.offset.reset' = 'earliest'; (1)
> show topics;
> print 'twitter_json_01' from beginning;
# ask people to tweet to see how messages are floating
> select * from twitter_json_01 limit 2;
1 | Configure KSQL consumer offsets |
make structured stream
Create Structured Stream
CREATE STREAM twitter_raw ( \
CreatedAt bigint,Id bigint, Text VARCHAR, SOURCE VARCHAR, Truncated VARCHAR, InReplyToStatusId VARCHAR, InReplyToUserId VARCHAR, InReplyToScreenName VARCHAR, GeoLocation VARCHAR, Place VARCHAR, Favorited VARCHAR, Retweeted VARCHAR, FavoriteCount VARCHAR, User VARCHAR, Retweet VARCHAR, Contributors VARCHAR, RetweetCount VARCHAR, RetweetedByMe VARCHAR, CurrentUserRetweetId VARCHAR, PossiblySensitive VARCHAR, Lang VARCHAR, WithheldInCountries VARCHAR, HashtagEntities VARCHAR, UserMentionEntities VARCHAR, MediaEntities VARCHAR, SymbolEntities VARCHAR, URLEntities VARCHAR) \
WITH (KAFKA_TOPIC='twitter_json_01',VALUE_FORMAT='JSON');
select * from twitter_raw limit 2;
All tweets from DataSciCon
CREATE STREAM twitter_datascicon AS \
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.Name') AS user_Name,\
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName,\
EXTRACTJSONFIELD(user,'$.Location') AS user_Location,\
EXTRACTJSONFIELD(user,'$.Description') AS user_Description,\
Text, hashtagentities, lang\
FROM twitter_raw WHERE LCASE(hashtagentities) LIKE '%datascicon%';
All tweets from DataSciCon about Kafka and KSQL
CREATE STREAM twitter_datascicon_kafka_ksql AS\
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.Name') AS user_Name,\
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName,\
EXTRACTJSONFIELD(user,'$.Location') AS user_Location,\
EXTRACTJSONFIELD(user,'$.Description') AS user_Description,\
Text,hashtagentities,lang\
FROM twitter_raw WHERE LCASE(hashtagentities) LIKE '%datascicon%' AND (LCASE(hashtagentities) LIKE '%ksql%' OR LCASE(hashtagentities) LIKE '%apachekafka%');
CREATE STREAM twitter AS \
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.Name') AS user_Name, \
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName, \
EXTRACTJSONFIELD(user,'$.Location') AS user_Location, \
EXTRACTJSONFIELD(user,'$.Description') AS user_Description, \
Text,hashtagentities,lang \
FROM twitter_raw ;
SELECT USER_NAME, TEXT FROM TWITTER WHERE TEXT LIKE '%ksql%';
CREATE TABLE user_tweet_count AS \
SELECT user_screenname, count(*) AS tweet_count \
FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) \
GROUP BY user_screenname ;
# ---------------------
CREATE TABLE USER_TWEET_COUNT_DISPLAY AS\
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START,\
USER_SCREENNAME,\
TWEET_COUNT FROM user_tweet_count;
create table top_5_datascicon as \
SELECT WINDOW_START, USER_SCREENNAME, TWEET_COUNT \
FROM USER_TWEET_COUNT_DISPLAY \
WHERE TWEET_COUNT> 5;
select USER_SCREENNAME, TWEET_COUNT from top_5_datascicon;
SHOW STREAMS; DESCRIBE twitter; DESCRIBE extended twitter; EXPLAIN twitter;
Appendix
How to test kafka-connect-twitter
connector with local CP installation
Prerequisites
-
Confluent Platform Enterprise 5.x [3]
-
download it
-
unizip to any folder
-
add folder to
PATH
variableexport CONFLUENT_PLATFORM_VERSION=5.0.1 export CONFLUENT_HOME=~/projects/confluent/confluent-ent/$CONFLUENT_PLATFORM_VERSION export PATH=$CONFLUENT_HOME/bin:$PATH alias cnfl="confluent" (1)
1 a neat little alias that can save few symbols to type
-
Install and Run the connector
confluent-hub install jcustenborder/kafka-connect-twitter:0.2.32 (1)
confluent start connect (2)
confluent config twitter_source -d connect_twitter.json (3)
confluent status connectors (4)
confluent consume twitter_json_01 --max-messages=5 --from-beginning (5)
1 | Install the connector from Confluent Hub |
2 | Start Connect Runtime |
3 | Push the connector config |
4 | Display status of the connector |
5 | Grab few messages from source topic |