Crash Course In Java Brain Surgery

insinuation and speculations: My thoughts about Java, HTML5, software development and IT in general

Who is tweeting about hashtag KSQL?

Written by  Viktor Gamov -
TL;DR

Another day, another post. This time it’s another playbook for my http://DataSciCon.tech talk «Who’s tweeting about #datascicon» on November 30th 2018 [1] A full source code published in confluetninc/demo-scene repository [2]

Table 1. Revisions history
Version Date Comments

v1.1

12/02/2018

Small fixes in codes, screenshots, images

v1.0

11/28/2018

Initial revision

Prerequisites

  • Docker

  • Docker Compose

  • Get example from GitHub

    • If you will follow steps below you should checkout only directory that has source code relevant to this post.

      mkdir ~/temp/demo-scene
      cd ~/temp/demo-scene
      git init .
      git remote add origin -f https://github.com/confluentinc/demo-scene/
      git config core.sparsecheckout true
      echo "twitter-streams/*" >> .git/info/sparse-checkout
      git pull --depth=2 origin master
      cd twitter-streams
      ls -lh

      and you should see something like this

      ls lh twitter streams
      Figure 1. The output of ls -lh twitter-streams

Demo Playbook

Download dependencies

This command downloads monitoring-interceptors and installs it local folder
make

Starting the containers

./scripts/00-start.sh

KSQL Part

Let’s start with few simple commands.

Connect to remote KSQL server with KSQL cli
ksql http://ksql-demo:9098
Taking a quick look around
SET 'auto.offset.reset' = 'earliest'; (1)

> show topics;
> print 'twitter_json_01' from beginning;

# ask people to tweet to see how messages are floating
> select * from twitter_json_01 limit 2;
1 Configure KSQL consumer offsets

make structured stream

Create Structured Stream
CREATE STREAM twitter_raw ( \
   CreatedAt bigint,Id bigint, Text VARCHAR, SOURCE VARCHAR, Truncated VARCHAR, InReplyToStatusId VARCHAR, InReplyToUserId VARCHAR, InReplyToScreenName VARCHAR, GeoLocation VARCHAR, Place VARCHAR, Favorited VARCHAR, Retweeted VARCHAR, FavoriteCount VARCHAR, User VARCHAR, Retweet VARCHAR, Contributors VARCHAR, RetweetCount VARCHAR, RetweetedByMe VARCHAR, CurrentUserRetweetId VARCHAR, PossiblySensitive VARCHAR, Lang VARCHAR, WithheldInCountries VARCHAR, HashtagEntities VARCHAR, UserMentionEntities VARCHAR, MediaEntities VARCHAR, SymbolEntities VARCHAR, URLEntities VARCHAR) \
WITH (KAFKA_TOPIC='twitter_json_01',VALUE_FORMAT='JSON');

select * from twitter_raw limit 2;

All tweets from DataSciCon
CREATE STREAM twitter_datascicon AS \
    SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
    EXTRACTJSONFIELD(user,'$.Name') AS user_Name,\
    EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName,\
    EXTRACTJSONFIELD(user,'$.Location') AS user_Location,\
    EXTRACTJSONFIELD(user,'$.Description') AS  user_Description,\
    Text, hashtagentities, lang\
    FROM twitter_raw WHERE LCASE(hashtagentities) LIKE '%datascicon%';
All tweets from DataSciCon about Kafka and KSQL
CREATE STREAM twitter_datascicon_kafka_ksql AS\
    SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
    EXTRACTJSONFIELD(user,'$.Name') AS user_Name,\
    EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName,\
    EXTRACTJSONFIELD(user,'$.Location') AS user_Location,\
    EXTRACTJSONFIELD(user,'$.Description') AS  user_Description,\
    Text,hashtagentities,lang\
    FROM twitter_raw WHERE LCASE(hashtagentities) LIKE '%datascicon%' AND (LCASE(hashtagentities) LIKE '%ksql%' OR LCASE(hashtagentities) LIKE '%apachekafka%');
CREATE STREAM twitter AS \
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.Name') AS user_Name, \
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName, \
EXTRACTJSONFIELD(user,'$.Location') AS user_Location, \
EXTRACTJSONFIELD(user,'$.Description') AS  user_Description, \
Text,hashtagentities,lang \
FROM twitter_raw ;
SELECT USER_NAME, TEXT FROM TWITTER WHERE TEXT LIKE '%ksql%';
CREATE TABLE user_tweet_count AS \
  SELECT user_screenname, count(*) AS  tweet_count \
  FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) \
  GROUP BY user_screenname ;

# ---------------------

CREATE TABLE USER_TWEET_COUNT_DISPLAY AS\
    SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START,\
    USER_SCREENNAME,\
    TWEET_COUNT FROM user_tweet_count;

create table top_5_datascicon as \
    SELECT WINDOW_START, USER_SCREENNAME, TWEET_COUNT \
    FROM USER_TWEET_COUNT_DISPLAY \
    WHERE TWEET_COUNT> 5;

select USER_SCREENNAME, TWEET_COUNT from top_5_datascicon;
SHOW STREAMS;
DESCRIBE twitter;
DESCRIBE extended twitter;
EXPLAIN twitter;

Appendix

How to test kafka-connect-twitter connector with local CP installation

Prerequisites

  • Confluent Platform Enterprise 5.x [3]

    • download it

    • unizip to any folder

    • add folder to PATH variable

      export CONFLUENT_PLATFORM_VERSION=5.0.1
      export CONFLUENT_HOME=~/projects/confluent/confluent-ent/$CONFLUENT_PLATFORM_VERSION
      export PATH=$CONFLUENT_HOME/bin:$PATH
      alias cnfl="confluent" (1)
      1 a neat little alias that can save few symbols to type

Install and Run the connector

confluent-hub install jcustenborder/kafka-connect-twitter:0.2.32        (1)
confluent start connect                                                 (2)
confluent config twitter_source -d connect_twitter.json                 (3)
confluent status connectors                                             (4)
confluent consume twitter_json_01 --max-messages=5 --from-beginning     (5)
1 Install the connector from Confluent Hub
2 Start Connect Runtime
3 Push the connector config
4 Display status of the connector
5 Grab few messages from source topic

find java in debian

JAVA_HOME=$(readlink -f /usr/bin/java | sed "s:bin/java::")


tags: