Stream Processing Like You Have Never Seen Before
Written by  Viktor Gamov -
Table of Contents
TL;DR
This is playbook for «Stream Processing like you have never seen before» talk
Full source code is available
|
Version | Date | Comments |
---|---|---|
v1.1 |
11/05/2019 |
Updated version, presented at NYC Cloud Native meetup |
v1.0 |
09/05/2019 |
Initial revision, presented at DC Spring Meetup |
Spring Kafka Application
Getting started
Go to https://start.spring.io and generate project using «Spring for Apache Kafka», «Spring for Apache Kafka Streams», «Lombok», «Cloud Streams»
Add dependencies
build.gradle
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.15.1'
}
}
repositories {
jcenter()
maven {
url 'http://packages.confluent.io/maven'
}
}
dependencies {
implementation 'org.apache.avro:avro:1.8.2'
implementation 'org.apache.kafka:kafka-streams:2.2.0'
implementation 'io.confluent:kafka-streams-avro-serde:5.2.0'
}
apply plugin: 'com.commercehub.gradle.plugin.avro'
apply plugin: 'idea'
Add Avro schemas
Place avro schema files in src/main/avro
movie.avsc
{
"namespace": "io.confluent.demo",
"type": "record",
"name": "Movie",
"fields": [
{
"name": "movie_id",
"type": "long"
},
{
"name": "title",
"type": "string"
},
{
"name": "release_year",
"type": "int"
},
{
"name": "country",
"type": "string"
},
{
"name": "genres",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "actors",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "directors",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "composers",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "screenwriters",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "cinematographer",
"type": "string"
},
{
"name": "production_companies",
"type": {
"type": "array",
"items": "string"
}
}
]
}
Add ratings schema
rating.avsc
{
"namespace": "io.confluent.demo",
"type": "record",
"name": "Rating",
"fields": [
{
"name": "movie_id",
"type": "long"
},
{
"name": "rating",
"type": "double"
}
]
}
Add Parser
KafkaApplication
class Parser {
private static List<String> parseArray(String text) {
return Collections.list(new StringTokenizer(text, "|")).stream()
.map(token -> (String) token)
.collect(Collectors.toList());
}
public static Movie parseMovie(String text) {
String[] tokens = text.split("\\:\\:");
String id = tokens[0];
String title = tokens[1];
String releaseYear = tokens[2];
String country = tokens[4];
String genres = tokens[7];
String actors = tokens[8];
String directors = tokens[9];
String composers = tokens[10];
String screenwriters = tokens[11];
String cinematographer = tokens[12];
String productionCompanies = "";
if (tokens.length > 13) {
productionCompanies = tokens[13];
}
Movie movie = new Movie();
movie.setMovieId(Long.parseLong(id));
movie.setTitle(title);
movie.setReleaseYear(Integer.parseInt(releaseYear));
movie.setCountry(country);
movie.setGenres(Parser.parseArray(genres));
movie.setActors(Parser.parseArray(actors));
movie.setDirectors(Parser.parseArray(directors));
movie.setComposers(Parser.parseArray(composers));
movie.setScreenwriters(Parser.parseArray(screenwriters));
movie.setCinematographer(cinematographer);
movie.setProductionCompanies(Parser.parseArray(productionCompanies));
return movie;
}
}
Add Producer class
KafkaApplication.java
@Log4j2
@Component
@RequiredArgsConstructor
class Producer {
private static final String MOVIES_TOPIC = "movies";
private static final String RATINGS_TOPIC = "ratings";
private final KafkaTemplate kafkaTemplate;
@Value(value = "classpath:movies.dat")
private Resource moviesFile;
@EventListener(ApplicationStartedEvent.class)
public void process() throws InterruptedException {
try (Stream<String> stream = Files.lines(Paths.get(moviesFile.getURI()))) {
stream.forEach(s -> {
Movie movie = Parser.parseMovie(s);
log.info("sending " + movie.getMovieId() + " for movie " + movie.toString() + " to " + MOVIES_TOPIC);
kafkaTemplate.send(MOVIES_TOPIC, movie.getMovieId(), movie);
});
} catch (IOException e) {
e.printStackTrace();
}
Random ran = new Random();
while (true) {
int movieId = ran.nextInt(920) + 1;
int rating = 5 + ran.nextInt(6);
Rating rat = new Rating((long) movieId, (double) rating);
log.info(rat.toString());
Thread.sleep(1_000);
this.kafkaTemplate.send(RATINGS_TOPIC, rat.getMovieId(), rat);
}
}
}
Create topics
KafkaApplication.java
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
@Bean
NewTopic ratingsTopic() {
return new NewTopic(Producer.MOVIES_TOPIC, 1, (short) 1);
}
@Bean
NewTopic moviesTopic() {
return new NewTopic(Producer.RATINGS_TOPIC, 1, (short) 1);
}
}
Spring Cloud Streams Application
Bindings
Bindings.java
interface Bindings {
String RATINGS = "ratings";
String AVG_RATINGS = "avg-ratings";
String MOVIES = "movies";
String AVG_TABLE = "avg-table";
String RATED_MOVIES = "rated-movies";
//
// this is the for the HTTP endpoint
String RATED_MOVIES_STORE = "rated-movies-store";
@Input(RATINGS)
KStream<Long, Rating> ratingsIn();
@Output(AVG_RATINGS)
KStream<Long, Double> averageRatingsOut();
@Input(MOVIES)
KTable<Long, Movie> moviesIn();
@Input(AVG_TABLE)
KTable<Long, Double> averageRatingsIn();
@Output(RATED_MOVIES)
KStream<Long, RatedMovie> ratedMoviesOut();
}
processors
RatingsAverager.java
@Component
class RatingsAverager {
@StreamListener
@SendTo(Bindings.AVG_RATINGS)
KStream<Long, Double> averageRatingsFor(@Input(Bindings.RATINGS) KStream<Long, Rating> ratings) {
KGroupedStream<Long, Double> ratingsGrouped =
ratings
.mapValues(Rating::getRating)
.groupByKey();
KTable<Long, Long> count = ratingsGrouped.count();
KTable<Long, Double>
reduce = ratingsGrouped.reduce(Double::sum, Materialized.with(Serdes.Long(), Serdes.Double()));
KTable<Long, Double>
join = reduce.join(count, (sum, count1) -> sum / count1, Materialized.with(Serdes.Long(), Serdes.Double()));
return join.toStream();
}
}
MovieProcessor.java
@Component
class MovieProcessor {
@StreamListener
@SendTo(Bindings.RATED_MOVIES)
KStream<Long, RatedMovie> rateMoviesFor(@Input(Bindings.AVG_TABLE) KTable<Long, Double> ratings,
@Input(Bindings.MOVIES) KTable<Long, Movie> movies) {
ValueJoiner<Movie, Double, RatedMovie> joiner = (movie, rating) ->
new RatedMovie(movie.getMovieId(), movie.getReleaseYear(), movie.getTitle(), rating);
movies
.join(ratings, joiner, Materialized
.<Long, RatedMovie, KeyValueStore<Bytes, byte[]>>as(Bindings.RATED_MOVIES_STORE)
.withKeySerde(Serdes.Long())
.withValueSerde(new JsonSerde<>(RatedMovie.class)));
return movies.join(ratings, joiner).toStream();
}
}