Starting with version 1.1 of Spring Kafka, @KafkaListener methods can be configured to receive a batch of consumer records from the consumer poll operation.

The following example shows how to setup a batch listener using Spring Kafka, Spring Boot, and Maven.

If you want to learn more about Spring Kafka - head on over to the Spring Kafka tutorials page.

General Project Setup

Tools used:

  • Spring Kafka 1.2
  • Spring Boot 1.5
  • Maven 3.5

The general project and Sender configuration are identical to a previous Spring Boot Kafka example. As such we won’t go into detail on how these are setup.

Configuring a Batch Listener and Batch Size

Enabling batch receiving of messages can be achieved by setting the batchListener property. This is done by calling the setBatchListener() method on the listener container factory (ConcurrentKafkaListenerContainerFactory in this example) with a value true as shown below.

By default, the number of records received in each batch is dynamically calculated. By setting the 'MAX_POLL_RECORDS_CONFIG' property on the ConsumerConfig we can set an upper limit for the batch size. For this example, we define a maximum of 10 messages to be returned per poll.

package com.codenotfound.kafka.consumer;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
@EnableKafka
public class ReceiverConfig {

  @Value("${kafka.bootstrap-servers}")
  private String bootstrapServers;

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    // maximum records per poll
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

    return props;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // enable batch listening
    factory.setBatchListener(true);

    return factory;
  }

  @Bean
  public Receiver receiver() {
    return new Receiver();
  }
}

The receive() method of the Receiver listener POJO needs to be updated to receive a List of payloads (in this example these are simple String objects). Alternatively a list of Message<?> or ConsumerRecord<?, ?> objects can be configured.

For logging purposes, we also add the partition and offset headers of each message to the receive() method. These headers are available in a list and map to the received messages based on the index within the list.

The CountDownLatch value that is used in the unit test case is increased so that we can send out a batch of 20 messages.

package com.codenotfound.kafka.consumer;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;

public class Receiver {

  private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

  public static final int COUNT = 20;

  private CountDownLatch latch = new CountDownLatch(COUNT);

  public CountDownLatch getLatch() {
    return latch;
  }

  @KafkaListener(id = "batch-listener", topics = "${kafka.topic.batch}")
  public void receive(List<String> data,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

    LOGGER.info("start of batch receive");
    for (int i = 0; i < data.size(); i++) {
      LOGGER.info("received message='{}' with partition-offset='{}'", data.get(i),
          partitions.get(i) + "-" + offsets.get(i));
      // handle message

      latch.countDown();
    }
    LOGGER.info("end of batch receive");
  }
}

Testing the Batch Listener

The SpringKafkaApplicationTest test case starts an embedded Kafka and ZooKeeper server using a JUnit ClassRule.

Using @Before we wait until all the partitions are assigned to our Receiver by looping over the available ConcurrentMessageListenerContainer (if we don’t do this the message will already be sent before the listeners are assigned to the topic).

The testReceiver() method uses a for loop to send out as many messages as were configured on the CountDownLatch in the Receiver. The result is that our listener starts receiving batches of messages from the Kafka broker partitions (2 partitions are created by default on the embedded broker).

package com.codenotfound.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.context.junit4.SpringRunner;

import com.codenotfound.kafka.consumer.Receiver;
import com.codenotfound.kafka.producer.Sender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {

  private static String BATCH_TOPIC = "batch.t";

  @Autowired
  private Sender sender;

  @Autowired
  private Receiver receiver;

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @ClassRule
  public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, BATCH_TOPIC);

  @Before
  public void setUp() throws Exception {
    // wait until the partitions are assigned
    for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
        .getListenerContainers()) {
      ContainerTestUtils.waitForAssignment(messageListenerContainer,
          embeddedKafka.getPartitionsPerTopic());
    }
  }

  @Test
  public void testReceive() throws Exception {
    //
    int numberOfMessages = Receiver.COUNT;
    for (int i = 0; i < numberOfMessages; i++) {
      sender.send(BATCH_TOPIC, "message " + i);
    }

    receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
    assertThat(receiver.getLatch().getCount()).isEqualTo(0);
  }
}

Run the test case by entering following Maven command at the command prompt:

mvn test

The result should be 20 message that get sent and received from a 'batch.t' topic as shown below:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.4.RELEASE)

16:46:18.654 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - Starting SpringKafkaApplicationTest on cnf-pc with PID 3636 (started by CodeNotFound in c:\codenotfound\spring-kafka\spring-kafka-batch-listener)
16:46:18.655 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - No active profile set, falling back to default profiles: default
16:46:19.344 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - Started SpringKafkaApplicationTest in 0.995 seconds (JVM running for 5.206)
16:46:20.774 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 0' to topic='batch.t'
16:46:20.821 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 1' to topic='batch.t'
16:46:20.821 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 2' to topic='batch.t'
16:46:20.822 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 3' to topic='batch.t'
16:46:20.822 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 4' to topic='batch.t'
16:46:20.822 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 5' to topic='batch.t'
16:46:20.823 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 6' to topic='batch.t'
16:46:20.824 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 7' to topic='batch.t'
16:46:20.824 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 8' to topic='batch.t'
16:46:20.824 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 9' to topic='batch.t'
16:46:20.824 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 10' to topic='batch.t'
16:46:20.825 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 11' to topic='batch.t'
16:46:20.828 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 12' to topic='batch.t'
16:46:20.829 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 13' to topic='batch.t'
16:46:20.829 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 14' to topic='batch.t'
16:46:20.829 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 15' to topic='batch.t'
16:46:20.830 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 16' to topic='batch.t'
16:46:20.830 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 17' to topic='batch.t'
16:46:20.830 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 18' to topic='batch.t'
16:46:20.831 [main] INFO  c.codenotfound.kafka.producer.Sender - sending data='message 19' to topic='batch.t'
16:46:20.855 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - start of batch receive
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 1' with partition-offset='0-0'
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 3' with partition-offset='0-1'
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 5' with partition-offset='0-2'
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 7' with partition-offset='0-3'
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 9' with partition-offset='0-4'
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 11' with partition-offset='0-5'
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 13' with partition-offset='0-6'
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 15' with partition-offset='0-7'
16:46:20.856 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - end of batch receive
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - start of batch receive
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 0' with partition-offset='1-0'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 2' with partition-offset='1-1'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 4' with partition-offset='1-2'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 6' with partition-offset='1-3'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 8' with partition-offset='1-4'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 10' with partition-offset='1-5'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 12' with partition-offset='1-6'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 14' with partition-offset='1-7'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 16' with partition-offset='1-8'
16:46:20.861 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 18' with partition-offset='1-9'
16:46:20.862 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - end of batch receive
16:46:20.862 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - start of batch receive
16:46:20.862 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 17' with partition-offset='0-8'
16:46:20.862 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - received message='message 19' with partition-offset='0-9'
16:46:20.862 [batch-listener-0-C-1] INFO  c.c.kafka.consumer.Receiver - end of batch receive
16:46:22.857 [main] ERROR o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 9.168 sec - in com.codenotfound.kafka.SpringKafkaApplicationTest

Results :

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.360 s
[INFO] Finished at: 2017-07-14T16:46:23+02:00
[INFO] Final Memory: 27M/218M
[INFO] ------------------------------------------------------------------------

github mark If you would like to run the above code sample you can get the full source code here.

This concludes setting up a Spring Kafka batch listener on a Kafka topic.

If you liked this tutorial or have any questions, leave a comment below.

Leave a Comment