Skip to main content

Kafka - Spring Boot


In this blog post i'm going to explain how to integrate Kafka with Spring boot. We use Spring boot configuration to send Kafka message in String format and consume it. Let's begin.
(complete example can be found here.)

Starting up Kafka

First of all we need to run Kafka cluster. For this i'm using landoop docker image.
Here is the docker command to run landoop docker container.
docker container run --rm -it \
-p 2181:2181 -p 3030:3030 -p 8081:8081 \
-p 8082:8082 -p 8083:8083 -p 9092:9092 \
-e ADV_HOST= \


Generate the application

I'm using Intellij idea IDE to generate the Spring boot application. I have selected web, lombok and
kafka dependencies.
Let's rename to application.yml to use yaml format.
Here is my application.yml file configuration values.
  port: 9000

      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      bootstrap-servers: localhost:9092
      group-id: test-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
It is pretty simple. I have specified running Kafka instance url in boostrap-servers. For key and value serializers I use in built StringSerializer. To deserialize the message I use StringDeserializer provided by kafka.
  • bootstrap-servers - kafka server instance
  • - consumer group id which will be used by consumers.
  • - consumers will start reading messages from the earliest one available when there is no existing offset for that consumer.

Kafka Configuration

We already have configured basic properties. In addition to that we are going to create our Topic.
public class KafkaConfiguration {

    public static final String TOPIC_NAME = "kafka-spring";

    public NewTopic topic() {
        return new NewTopic(TOPIC_NAME, 3, (short) 1);

Kafka's AdminClient bean is already in the context. It will create a topinc using NewTopic instance which we have given kafka-spring as the topic name, number of partions as 3 and replication factor is 1.

Produce Messages

Spring provides easy to use KafkaTemplate to send messages to Kafka. We need to provide topic name and our message.

public class KafkaMessageProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageProducer.class);

    private final KafkaTemplate kafkaTemplate;

    public KafkaMessageProducer(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;

    public void send(String message) {":: Produce Message :: %s", message));
        kafkaTemplate.send(TOPIC_NAME, message);


Consume Messages

With Spring's KafkaListener, we can easily consume messages by specifying topic name and group id.

public class KafkaMessageConsumer {

    private final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageConsumer.class);

    @KafkaListener(topics = TOPIC_NAME, groupId = "test-id")
    public void consume(String message) {":: Consume Message :: %s", message));


Test it

Now all set. Let's create a simple end point to send few messages.

public class MessageController {

    private final MessageService messageService;

    public MessageController(MessageService messageService) {
        this.messageService = messageService;

    public void sendMessage(@RequestBody Message message) {


curl -X POST \
  http://localhost:9000/send \
  -H 'Content-Type: application/json' \
  -d '{ "text": "Hello Kafka" }'

See the console. You should be able to see something like this.

2019-05-01 22:09:28.062  INFO 6045 --- [nio-9000-exec-4] c.s.k.message.KafkaMessageProducer       : :: Produce Message :: Hello World
2019-05-01 22:09:28.069  INFO 6045 --- [ntainer#0-0-C-1] c.s.k.message.KafkaMessageConsumer       : :: Consume Message :: Hello World

Navigate to and select topics where you can see our topic.



Post a Comment