This is a playbook of the workshop delivered at SpringOne Platform 2019 in Austin, TX. The code of the example application is available on Github.

Let’s start building the app

As always, we’ll begin by generating a project starter. In this starter, you should enable "Spring for Apache Kafka", "Spring for Apache Kafka Streams", "Cloud Stream" and "Spring Web Starter."

Next, update your pom.xml to add support for Schema Registry and Avro. Confluent provides Avro serializers that integrate with Schema Registry.

<project>
    <dependencies>
        <!-- other dependencies -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>   (1)
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>   (2)
            <version>1.8.2</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>  (3)
            <version>5.2.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>5.3.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <repositories>
        <!-- other maven repositories the project -->
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url> (4)
        </repository>
    </repositories>
    <plugins>
        <!-- other maven plugins in the project -->
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                    </goals>
                    <configuration>
                        <sourceDirectory>src/main/resources/avro</sourceDirectory> (5)
                        <outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
                        <stringType>String</stringType>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</project>
1Confluent Schema Registry client
2Avro dependency
3Avro SerDes
4Confluent Maven repository
5Source directory where you put your Avro files and store generated Java POJOs

The architecture of a Spring Boot application

Your application will include the following components:

  • use.avsc: an Avro file where we define a schema for our domain model.

  • SpringAvroApplication.java: the starting point of your application. This class also includes configuration for the new topic that your application is using.

  • Producer.java: a component that encapsulates the Kafka producer

  • Consumer.java: a listener of messages from the Kafka topic

  • KafkaController.java: a RESTful controller that accepts HTTP commands in order to publish a message in the Kafka topic

Creating a user Avro file

{
  "namespace": "io.confluent.developer",    (1)
  "type": "record",
  "name": "User",
  "fields": [   (2)
    {
      "name": "name",
      "type": "string",
      "avro.java.string": "String"
    },
    {
      "name": "age",
      "type": "int"
    }
  ]
}
1An avro-maven-plugin will generate the User POJO in the io.confluent.developer package.
2This POJO has name and age properties.

Creating a Spring Boot application class

@SpringBootApplication
public class SpringAvroApplication {


  @Value("${topic.name}") (1)
  private String topicName;

  @Value("${topic.partitions-num}")
  private Integer partitions;

  @Value("${topic.replication-factor}")
  private short replicationFactor;

  @Bean
  NewTopic newTopic() { (2)
    return new NewTopic(topicName, partitions, replicationFactor);
  }

  public static void main(String[] args) {
    SpringApplication.run(SpringAvroApplication.class, args);
  }
}
1These are the topic parameters injected by Spring from application.yaml file.
2Spring Boot creates a new Kafka topic based on the provided configurations. As an application developer, you’re responsible for creating your topic instead of relying on auto-topic creation, which should be false in production environments.

Creating a producer component

@Service
@CommonsLog(topic = "Producer Logger")
public class Producer {

  @Value("${topic.name}") (1)
  private String TOPIC;

  private final KafkaTemplate<String, User> kafkaTemplate;

  @Autowired
  public Producer(KafkaTemplate<String, User> kafkaTemplate) { (2)
    this.kafkaTemplate = kafkaTemplate;
  }

  void sendMessage(User user) {
    this.kafkaTemplate.send(this.TOPIC, user.getName(), user);    (3)
    log.info(String.format("Produced user -> %s", user));
  }
}
1A topic name will be injected from application.yaml.
2Spring will initialize KafkaTemplate with properties provided in application.yaml.
3We will send messages to the topic using user’s name as the key.

Spring instantiates all these components during the application startup, and the application becomes ready to receive messages via the REST endpoint. The default HTTP port is 9080 and can be changed in the application.yaml configuration file.

Creating a consumer component

@Service
@CommonsLog(topic = "Consumer Logger")
public class Consumer {

  @Value("${topic.name}") (1)
  private String topicName;

  @KafkaListener(topics = "users", groupId = "group_id")  (2)
  public void consume(ConsumerRecord<String, User> record) {
    log.info(String.format("Consumed message -> %s", record.value()));
  }
}
1The topic name will be injected from the application.yaml.
2With the @KafkaListener annotation, a new consumer will be instantiated by the spring-kafka framework.

Creating the KafkaController component

@RestController
@RequestMapping(value = "/user")  (1)
public class KafkaController {

  private final Producer producer;

  @Autowired
  KafkaController(Producer producer) {  (2)
    this.producer = producer;
  }

  @PostMapping(value = "/publish")
  public void sendMessageToKafkaTopic(@RequestParam("name") String name, @RequestParam("age") Integer age) {
    this.producer.sendMessage(new User(name, age));  (3)
  }
}
1KafkaController is mapped to the /user HTTP endpoint.
2Spring injects the producer component. <3 >When a new request comes to the /user/publish endpoint, the producer sends it to Kafka.

Running the example

Prerequisites

In this guide, I assume that you have the Java Development Kit (JDK) installed. If you don’t, I highly recommend using SDKMAN! to install it.
  • You’ll also need Confluent Platform 5.3 or newer installed locally. If you don’t already have it, follow the Confluent Platform Quick Start. Be sure to install the Confluent CLI as well (see step 4 in this section of the quick start).

Start Kafka and Schema Registry

confluent local start schema-registry

The Confluent CLI provides local mode for managing your local Confluent Platform installation. The Confluent CLI starts each component in the correct order.

TBD sample output

Building and running your Spring Boot application

In the examples directory, run ./mvnw clean install -DskipTests=true to compile and produce a runnable JAR. After that, you can run the following command:

java -jar target/kafka-avro-0.0.1-SNAPSHOT.jar

Testing the producer/consumer REST service

For simplicity, I like to use the curl command, but you can use any REST client (like Postman or the REST client in IntelliJ IDEA to):

curl -X POST -d 'name=vik&age=33' http://localhost:9080/user/publish

2019-06-06 22:52:59.485  INFO 28910 --- [nio-9080-exec-1] Producer Logger                          : Produced user -> {"name": "vik", "age": 33}
2019-06-06 22:52:59.559  INFO 28910 --- [ntainer#0-0-C-1] Consumer Logger                          : Consumed message -> {"name": "vik", "age": 33}

Running the application using Confluent Cloud

To use this demo application with Confluent Cloud, you are going to need the endpoint of your managed Schema Registry and an API key/secret. Both can be easily retrieved from the Confluent Cloud UI once you select an environment.

At least one Kafka cluster must be created to access your managed Schema Registry. Once you select the Schema Registry option, you can retrieve the endpoint and create a new API/secret.

An example Confluent Cloud configuration can find in application-cloud.yaml:

topic:
  name: users
  partitions-num: 6
  replication-factor: 3
server:
  port: 9080
spring:
  kafka:
    bootstrap-servers:
      - mybootstrap.confluent.cloud:9092  (1)
    properties:
      # CCloud broker connection parameters
      ssl.endpoint.identification.algorithm: https
      sasl.mechanism: PLAIN
      request.timeout.ms: 20000
      retry.backoff.ms: 500
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="ccloud_key" password="ccloud_secret";  (2)
      security.protocol: SASL_SSL

      # CCloud Schema Registry Connection parameter
      schema.registry.url: https://schema-registry.aws.confluent.cloud  (3)
      basic.auth.credentials.source: USER_INFO    (4)
      schema.registry.basic.auth.user.info: sr_ccloud_key:sr_ccloud_key (5)
    consumer:
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    template:
      default-topic:
logging:
  level:
    root: info
1Cloud bootstrap server
2Broker key and secret
3Confluent Cloud Schema Registry URL
4Schema Registry authentication configuration
5Cloud Schema Registry key and secret

To run this application in cloud mode, activate the cloud Spring profile. In this case, Spring Boot will pick up application-cloud.yaml configuration file that contains the connection to data in Confluent Cloud.

java -jar -Dspring.profiles.active=cloud target/kafka-avro-0.0.1-SNAPSHOT.jar

Processing with Kafka Streams and Spring Cloud Streams

Let’s create a new application in io.confluet.developer.spring.streams.

@EnableBinding(Bindings.class)  (1)
@SpringBootApplication
public class KafkaStreamsApp {

  // topic parameters injection

  public static void main(String[] args) {
    final SpringApplication application = new SpringApplication(KafkaStreamsApp.class);
    application.setWebApplicationType(WebApplicationType.NONE); (2)
    application.run(args);
  }

  @Bean
  NewTopic filteredTopic() {    (3)
    return new NewTopic(topicName, partitions, replicationFactor);
  }
}
1With annotation @EnableBinding we’re activating Spring Cloud Streams integration.
2We don’t need web/rest interface in this app
3Create topic for filtered stream

The Bindings interface defines input and output channels for Kafka Streams application.

interface Bindings {

  String USERS_FILTERED = "users-filtered";
  String USERS = "users";

  @Input(USERS)     (1)
  KStream<String, User> usersI();

  @Output(USERS_FILTERED)   (2)
  KStream<String, User> filteredUsers();
}
1The @Input annotation defines input stream.
2The @output annotation defines output stream.
@Component  (1)
class UserProcessor {

  @StreamListener (2)
  @SendTo(USERS_FILTERED) (3)
  KStream<String, User> processUsers(@Input(USERS) KStream<String, User> inputStream) { (4)
    return inputStream
        .filter((key, user) -> user.getAge() < 40)  (5)
        .mapValues(user -> new User(user.getName().toUpperCase(), user.getAge())) (6)
        .peek((key, user) -> log.info("New entry in filtered stream => Key = " + key + " Value = " + user)); (7)
  }
}
1Spring-managed component.
2This annotation used by Spring Cloud stream to identify managed methods.
3A result KStream<String, User> will be send to users-filtered topic.
4Using @Input annotation, Spring framework will inject instantiated input stream as a parameter.
5A filter method receives a predicate that defines if we should pass message to the downstream.
6A mapValues method allows to transform a value. In this case, we changing change user’s name to upper case.
7A peek method doesn’t do anything with the stream, rather allows us to peek inside the stream. This technique is very useful for logging / debugging.

How to Run Apache Kafka with Spring Boot on Pivotal Application Service (PAS)

This tutorial describes how to set up a sample Spring Boot application in Pivotal Application Service (PAS), which consumes and produces events to an Apache Kafka® cluster running in Pivotal Container Service (PKS). With this tutorial, you can set up your PAS and PKS configurations so that they work with Kafka.

For a tutorial on how to set up a Kafka cluster in PKS, please see How to Deploy Confluent Platform on Pivotal Container Service (PKS) with Confluent Operator. If you’d like more background on working with Kafka from Spring Boot, you can also check out How to Work with Apache Kafka in your Spring Boot Application.

Methodology

Starting with the requirements, this tutorial will then go through the specific tasks required to connect PAS applications to Kafka. The sample Spring Boot app is pre-configured to make the setup steps as streamlined as possible.

You’ll review the configuration settings that streamline the deployment, so you know what to change for your environment. Afterward, the tutorial will run through some ways to verify your PAS app to Kafka in your PKS setup.

Requirements

  1. Run a Kafka cluster in Enterprise PKS. To set up Kafka in PKS via Confluent Operator and expose external endpoints, you can refer to part 1.

  2. Access the springboot-kafka-avro repo.

  3. Install the Cloud Foundry (cf) CLI.

  4. Your PAS environment username, password, and fully qualified domain name (FQDN). At the time of this writing, you can obtain a PAS environment if you sign up for a free Pivotal Web Services account.

Cloud Foundry (cf) CLI prerequisites

If this is your first time deploying an application to PAS, you’ll need to do the following in order to perform the later steps. If you have already set up your PAS environment, or are familiar with PAS, feel free to adjust accordingly. Performing the following steps will create a ~/.cf/config.json` file if you don’t have one created already.

  1. Log in with cf l -a <my-env> -u <my-username> -p <my-password> --skip-ssl-validation, then exit and execute the commands below.

    1. Substitute <my-*> with settings that are appropriate for your PAS environment. For example, based on my Pivotal Web Services account setup, I used api.run.pivotal.io for the <my-env>

cf create-org confluent
cf target -o confluent (2)
cf create-space dev
cf target -s dev

The commands in step 2 are optional, depending on you how to like to keep things organized. In any case, you should be all set at this point with a ~/.cf/config.json file and may proceed to set up the sample PAS app with Kafka in PKS.

For more details on the cf CLI, see the documentation.

Deploy a Sample Spring Boot Microservice App with Kafka to Pivotal Application Service (PAS)

Run all command-line tasks in a terminal unless explicitly stated otherwise.

  1. Clone springboot-kafka-avro and enter the directory.

    1. For example: git clone https://github.com/confluentinc/springboot-kafka-avro && cd springboot-kafka-avro.

  2. Create a Pivotal user-provider service instance (USPI) with the following command:

    cf create-user-provided-service cp -p kafka.json
    kafka.json
    {
      "brokers": "kafka.supergloo.com:9092",
      "jaasconfig": "org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';",
      "sr": "http://schemaregistry.supergloo.com:8081"
    }

    This USPI delivers dynamic configuration values to our sample application upon startup. USPI is an example of the aforementioned PAS-specific requirements. The username and password values of test and test123 used above were the defaults used in the Helm Chart during Confluent Platform installation. These settings might depend on your environment, so adjust accordingly.

    The brokers and sr variable settings and their related brokers and sr variable values in the src/main/resources/application-pass.yaml file.
  3. Push the sample Spring Boot microservice app to PAS with:

mvn clean package -DskipTests=true && cf push --no-start

Notice how the --no-start option is sent, as the previously created USPI service has not yet been bound, and attempting to start the application would result in failure.

You should see something similar to the following. Pay attention to the routes output, which you’ll need in later steps. In the following example, my routes output was spring-kafka-avro-fluent-hyrax.cfapps.io, but yours will be different.

  1. Next, as you probably already guessed, perform the binding: cf bind-service spring-kafka-avro cp. This command binds the cp service to the spring-kafka-avro app that was deployed earlier. You should see something similar to the following in the Pivotal console under your cp service settings:

  2. Perform cf start spring-kafka-avro. After about 30 seconds, the spring-kafka-avro state should be running.

Verification

  1. Determine the external URL of your newly deployed app with cf apps. Look to the urls column. As previously noted, mine is spring-kafka-avro-fluent-hyrax.cfapps.io.

  2. The sample app code shows one available REST endpoint in KafkaController.java. You can post to this endpoint with different age and name parameters such as:

curl -X POST -d 'name=vik&age=33' spring-kafka-avro-fluent-hyrax.cfapps.io/user/publish

Or, change up the name and age values:

curl -X POST -d 'name=todd&age=22' spring-kafka-avro-fluent-hyrax.cfapps.io/user/publish

Or, to flex your Schema Registry integration, notice what happens when you attempt to send values that are not appropriate for the user schema (see src/main/avro/user.avsc):

curl -X POST -d 'name=todd&age=much_younger_than_vik_gotogym' spring-kafka-avro-fluent-hyrax.cfapps.io/user/publish

  1. Check out any topics created by the sample app with `

bin/kafka-topics --list --command-config kafka.properties --bootstrap-server kafka.supergloo.com:9092
  1. You can consume the users topic via a command:

kafka-avro-console-consumer \
--bootstrap-server kafka.supergloo.com:9092 \
--consumer.config kafka.properties \
--topic users \
--from-beginning \
--property schema.registry.url=http://schemaregistry.supergloo.com:8081

Noteworthy configuration and source code

Now that you’ve verified your app is up and running and communicating with Kafka (and Schema Registry), let’s examine the configuration and source code by breaking down the setup steps above.

How does your PAS app know which Kafka cluster to use and how to authorize? How does the app know which Schema Registry to use?

First, look to the manifest.yaml file for the env stanza setting of SPRING_PROFILES_ACTIVE: paas. This will force Spring Boot to reference the src/main/resources/application-pass.yaml for environment configuration settings. In application-pass.yaml, the values for brokers, sr, and jaasconfig appear to be dynamically set, e.g., ${vcap.services.cp.credentials.brokers}. So if you’re thinking there must be string interpolation action happening somehow, I say loudly, “You are correct!” (That was my poor attempt of a Phil Hartman impersonation by the way). The interpolation magic happens on app startup via the USPI that we created and used to bind our app in step 2 above.

But why does your POST attempt fail when you send an age value that isn’t a number? How/where this set in the Java code is not visible.

This is due to the schema.registry.url property setting in application-paas.yaml. For more information on Schema Registry, check out How to Use Schema Registry and Avro in Spring Boot Applications.

Tutorial completed ✅

This tutorial covered how to deploy a Spring Boot microservice app to PAS that produces and consumes from a Kafka cluster running in Pivotal PKS.