Crash Course In Java Brain Surgery

insinuation and speculations: My thoughts about Java, HTML5, software development and IT in general

Stream Processing Like You Have Never Seen Before

Written by  Viktor Gamov -
TL;DR
This is playbook for «Stream Processing like you have never seen before» talk Full source code is available
Table 1. Revisions history
Version Date Comments

v1.1

11/05/2019

Updated version, presented at NYC Cloud Native meetup

v1.0

09/05/2019

Initial revision, presented at DC Spring Meetup

Spring Kafka Application

Getting started

Go to https://start.spring.io and generate project using «Spring for Apache Kafka», «Spring for Apache Kafka Streams», «Lombok», «Cloud Streams»

Add dependencies

build.gradle
buildscript {
  repositories {
    jcenter()
  }
  dependencies {
    classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.15.1'
  }
}

repositories {
    jcenter()

    maven {
        url 'http://packages.confluent.io/maven'
    }
}

dependencies {
    implementation 'org.apache.avro:avro:1.8.2'
    implementation 'org.apache.kafka:kafka-streams:2.2.0'
    implementation 'io.confluent:kafka-streams-avro-serde:5.2.0'
}

apply plugin: 'com.commercehub.gradle.plugin.avro'
apply plugin: 'idea'

Add Avro schemas

Place avro schema files in src/main/avro

movie.avsc
{
  "namespace": "io.confluent.demo",
  "type": "record",
  "name": "Movie",
  "fields": [
    {
      "name": "movie_id",
      "type": "long"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    },
    {
      "name": "country",
      "type": "string"
    },
    {
      "name": "genres",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "actors",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "directors",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "composers",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "screenwriters",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "cinematographer",
      "type": "string"
    },
    {
      "name": "production_companies",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

Add ratings schema

rating.avsc
{
  "namespace": "io.confluent.demo",
  "type": "record",
  "name": "Rating",
  "fields": [
    {
      "name": "movie_id",
      "type": "long"
    },
    {
      "name": "rating",
      "type": "double"
    }
  ]
}

Add Parser

KafkaApplication
class Parser {

	private static List<String> parseArray(String text) {
		return Collections.list(new StringTokenizer(text, "|")).stream()
			.map(token -> (String) token)
			.collect(Collectors.toList());
	}

	public static Movie parseMovie(String text) {
		String[] tokens = text.split("\\:\\:");
		String id = tokens[0];
		String title = tokens[1];
		String releaseYear = tokens[2];
		String country = tokens[4];
		String genres = tokens[7];
		String actors = tokens[8];
		String directors = tokens[9];
		String composers = tokens[10];
		String screenwriters = tokens[11];
		String cinematographer = tokens[12];
		String productionCompanies = "";
		if (tokens.length > 13) {
			productionCompanies = tokens[13];
		}

		Movie movie = new Movie();
		movie.setMovieId(Long.parseLong(id));
		movie.setTitle(title);
		movie.setReleaseYear(Integer.parseInt(releaseYear));
		movie.setCountry(country);
		movie.setGenres(Parser.parseArray(genres));
		movie.setActors(Parser.parseArray(actors));
		movie.setDirectors(Parser.parseArray(directors));
		movie.setComposers(Parser.parseArray(composers));
		movie.setScreenwriters(Parser.parseArray(screenwriters));
		movie.setCinematographer(cinematographer);
		movie.setProductionCompanies(Parser.parseArray(productionCompanies));

		return movie;
	}
}

Add Producer class

KafkaApplication.java
@Log4j2
@Component
@RequiredArgsConstructor
class Producer {

  private static final String MOVIES_TOPIC = "movies";
  private static final String RATINGS_TOPIC = "ratings";
  private final KafkaTemplate kafkaTemplate;

  @Value(value = "classpath:movies.dat")
  private Resource moviesFile;

  @EventListener(ApplicationStartedEvent.class)
  public void process() throws InterruptedException {
    try (Stream<String> stream = Files.lines(Paths.get(moviesFile.getURI()))) {
      stream.forEach(s -> {
        Movie movie = Parser.parseMovie(s);
        log.info("sending " + movie.getMovieId() + " for movie " + movie.toString() + " to " + MOVIES_TOPIC);
        kafkaTemplate.send(MOVIES_TOPIC, movie.getMovieId(), movie);
      });
    } catch (IOException e) {
      e.printStackTrace();
    }
    Random ran = new Random();
    while (true) {
      int movieId = ran.nextInt(920) + 1;
      int rating = 5 + ran.nextInt(6);
      Rating rat = new Rating((long) movieId, (double) rating);
      log.info(rat.toString());
      Thread.sleep(1_000);
      this.kafkaTemplate.send(RATINGS_TOPIC, rat.getMovieId(), rat);
    }
  }
}

Create topics

KafkaApplication.java
@SpringBootApplication
public class KafkaApplication {

  public static void main(String[] args) {
    SpringApplication.run(KafkaApplication.class, args);
  }

  @Bean
  NewTopic ratingsTopic() {
    return new NewTopic(Producer.MOVIES_TOPIC, 1, (short) 1);
  }

  @Bean
  NewTopic moviesTopic() {
    return new NewTopic(Producer.RATINGS_TOPIC, 1, (short) 1);
  }
}

Test locally

terminal
confluent local destroy

confluent local start schema-registry

kafka-topics --list --bootstrap-server localhost:9092

confluent local consume movies -- --value-format avro --from-beginning

confluent local consume ratings -- --value-format avro

Spring Cloud Streams Application

Bindings

Bindings.java
interface Bindings {

  String RATINGS = "ratings";
  String AVG_RATINGS = "avg-ratings";
  String MOVIES = "movies";
  String AVG_TABLE = "avg-table";
  String RATED_MOVIES = "rated-movies";

  //
  // this is the for the HTTP endpoint
  String RATED_MOVIES_STORE = "rated-movies-store";

  @Input(RATINGS)
  KStream<Long, Rating> ratingsIn();

  @Output(AVG_RATINGS)
  KStream<Long, Double> averageRatingsOut();

  @Input(MOVIES)
  KTable<Long, Movie> moviesIn();

  @Input(AVG_TABLE)
  KTable<Long, Double> averageRatingsIn();

  @Output(RATED_MOVIES)
  KStream<Long, RatedMovie> ratedMoviesOut();
}

processors

RatingsAverager.java
@Component
class RatingsAverager {

  @StreamListener
  @SendTo(Bindings.AVG_RATINGS)
  KStream<Long, Double> averageRatingsFor(@Input(Bindings.RATINGS) KStream<Long, Rating> ratings) {
    KGroupedStream<Long, Double> ratingsGrouped =
        ratings
            .mapValues(Rating::getRating)
            .groupByKey();
    KTable<Long, Long> count = ratingsGrouped.count();
    KTable<Long, Double>
        reduce = ratingsGrouped.reduce(Double::sum, Materialized.with(Serdes.Long(), Serdes.Double()));
    KTable<Long, Double>
        join = reduce.join(count, (sum, count1) -> sum / count1, Materialized.with(Serdes.Long(), Serdes.Double()));
    return join.toStream();
  }
}
MovieProcessor.java
@Component
class MovieProcessor {

  @StreamListener
  @SendTo(Bindings.RATED_MOVIES)
  KStream<Long, RatedMovie> rateMoviesFor(@Input(Bindings.AVG_TABLE) KTable<Long, Double> ratings,
                                          @Input(Bindings.MOVIES) KTable<Long, Movie> movies) {

    ValueJoiner<Movie, Double, RatedMovie> joiner = (movie, rating) ->
        new RatedMovie(movie.getMovieId(), movie.getReleaseYear(), movie.getTitle(), rating);

    movies
        .join(ratings, joiner, Materialized
            .<Long, RatedMovie, KeyValueStore<Bytes, byte[]>>as(Bindings.RATED_MOVIES_STORE)
            .withKeySerde(Serdes.Long())
            .withValueSerde(new JsonSerde<>(RatedMovie.class)));

    return movies.join(ratings, joiner).toStream();
  }
}
tags: