Crash Course In Java Brain Surgery

insinuation and speculations: My thoughts about Java, HTML5, software development and IT in general

Streaming Movies Ratings with Kafka Streams and KSQL

Written by  Viktor Gamov -
kafka summit

The sole purpose of this blog post is to draft a playbook for my presentation «Crossing the streams: Rethinking Stream processing with Kafka Streams and KSQL» [1] that I recently did on Kafka Summit 2018 in San Francisco. A full source code published in confluetninc/demo-scene repository [2]

Table 1. Revisions history
Version Date Comments



use CP 5.1.0, updated Control Center screenshots



Fixed links and minor grammar



Initial revision

Disclaimer: Another goal is to exercise some ideas around the visual representation of posts in this blog. And third and the last goal is to brush up my technical writing skills! Since I moved to DevX [3] from Professional Services where I did write a truckload of customer engagement reports.


  • Confluent Platform Enterprise 5.x [4]

    • download it

    • unizip to any folder

    • add folder to PATH variable

      export CONFLUENT_HOME=~/projects/confluent/confluent-ent/$CONFLUENT_PLATFORM_VERSION
      export PATH=$CONFLUENT_HOME/bin:$PATH
      alias cnfl="confluent" (1)
      1 a neat little alias that can save few symbols to type
  • Get example from GitHub

    • If you will follow steps below you should checkout only directory that has source code relevant to this post.

      mkdir ~/temp/demo-scene
      cd ~/temp/demo-scene
      git init .
      git remote add origin -f
      git config core.sparsecheckout true
      echo "streams-movie-demo/*" >> .git/info/sparse-checkout
      git pull --depth=2 origin master
      cd streams-movie-demo
      ls -lh

      and you should see something like this

      ls lh streams movie demo
      Figure 1. The output of ls -lh streams-movie-demo

Demo playbook

Let’s start Confluent platform, create topics and populate with some data

Start Confluent Platform

cnfl destroy    (1)
cnfl start      (2)
echo "auto.offset.reset=earliest" >> $CONFLUENT_HOME/etc/ksql/    (3)
1 Make sure that there is no leftovers data
2 Start confluent platform. May take up to minute to start all components.
3 This will allow KSQL queries to read all data
cnfl start
Figure 2. In the end you should be able to see the following output

Populate Movies and Rating Information

cd data/
cat movies.dat  | kafka-console-producer --broker-list localhost:9092 --topic raw-movies    (1)
cat ratings.dat | kafka-console-producer --broker-list localhost:9092 --topic raw-ratings   (2)
1 Source topic for movies - raw-movies
2 Source topic for ratings - raw-ratings
cat raw movies
Figure 3. Output of cat command
cat raw ratings
The warnings like WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {raw-movies=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) and WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {raw-ratings=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) are absolutely normal. Topics raw-movies and raw-ratings not created when we started producing messages to it. And because by default Apache Kafka allows automatic topic creation ( The parameter auto.create.topics.enable in configuration [5] topics created.

Create the destination topics

# enable compaction for this topics
$CONFLUENT_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --config cleanup.policy=compact --topic movies
$CONFLUENT_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --config cleanup.policy=compact --topic rating-sums
$CONFLUENT_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --config cleanup.policy=compact --topic rating-counts
$CONFLUENT_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --config cleanup.policy=compact --topic rating-averages
$CONFLUENT_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --config cleanup.policy=compact --topic rated-movies
created topics
Figure 4. Output should match the following screenshot

Run Streaming Movie Demo

  • Start Kafka Streams application

    ./gradlew streams:run

Generate test load

  • Start raw rating generator

    ./gradlew loader:streamWithRawRatingStreamer
    Or if you have groovy installed
    ./gradlew loader:build  (1)
    groovy -cp "./loader/build/libs/loader.jar"  \
    loader/src/main/groovy/RawRatingStreamer.groovy "localhost:9092"   (2)
    1 build loader.jar first. This is «fat jar» that has all required dependencies (Kafka client libraries, serializers, etc)
    2 run raw ratings generator script
    I recommend to run the raw rating generator in a separate terminal window so you can interrupt it with Ctrl+C

Consume result with KSQL UI in Control Center

  • load page from http://localhost:9021

  • open KSQL panel and switch to «Query Editor.»

        (MOVIE_ID BIGINT,\
         TITLE VARCHAR,\
         RATING DOUBLE)\
    WITH (KAFKA_TOPIC='rated-movies',VALUE_FORMAT='AVRO', KEY = 'movie_id'); (1)
    select TITLE, RATING  from RATED_MOVIES where MOVIE_ID=362; (2)
    1 Create a table with Movie ratings
    2 Find rating for Lethal Weapon (movie_id=362)
c3 ksql tables
Figure 5. RATED_MOVIES table is registered in KSQL
c3 ksql query
Figure 6. Continuously query RATED_MOVIES table for the rating of «Lethal Weapon» movie
c3 consumer lag
Figure 7. Monitoring how many messages left to process

Wrapping this up!

  • kill Gradle task with Ctrl+C

  • stop Confluent Platform and purge all data

    cnfl destroy