Apache Kafka is a fault tolerant publish-subscribe streaming platform that lets you process streams of records as they occur. If you haven’t installed Kafka yet, see our Kafka Quickstart Tutorial to get up and running quickly.

In this post we will talk about creating a simple Kafka consumer in Java.

Kafka Consumer Code

The example below shows creating a Kafka consumer object and using it to consume messages from the my-topic topic.

package net.bigdatums.kafka.producer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class PrintStringConsumerExample {

    public static void main(String[] args) {

        //consumer properties
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        
        //using auto commit
        props.put("enable.auto.commit", "true");

        //string inputs and outputs
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //kafka consumer object
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //subscribe to topic
        consumer.subscribe(Arrays.asList("my-topic"));

        //infinite poll loop
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
        }

    }

}

In this example we are setting common properties like group.id which is the ID of the Kafka consumer group, and enable.auto.commit which is used for automatic commits.

This consumer creates an infinite loop, and each time through will use consumer.poll(100) to consume up to 100 records. The consumer prints each record’s offset and content.

Running the Kafka Consumer Example

In order to run this example, we need a Zookeeper server and a Kafka server running. For an example of how to do this see our Kafka Quickstart Tutorial to get up and running.

Once we have Zookeeper and Kafka running we can create the my-topic topic:

# creating my-topic topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic 

We are now ready to execute our code from the command line or from directly within an IDE and start consuming Kafka messages. Below is an example of running this code on the command line from our bigdatums-kafka-1.0-SNAPSHOT.jar JAR file.

java -cp bigdatums-kafka-1.0-SNAPSHOT.jar net.bigdatums.kafka.producer.PrintStringConsumerExample

Now that our consumer is running, we will test it by sending some sample messages to Kafka. We can do this in a separate terminal window with the kafka-console-producer. This is a program that is included with Kafka will take input from the command line (STDIN) and send it as messages to Kafka.

$KAFKA_HOME/bin/kafka-console-producer.sh \
>   --broker-list localhost:9092 \
>   --topic my-topic \
>   --property "parse.key=true" \
>   --property "key.separator=:"
key1:value1
key2:value2
key3:value3

As these messages are sent to Kafka, our consumer will print the following lines:

offset = 0, key = key1, value = value1
offset = 1, key = key2, value = value2
offset = 2, key = key3, value = value3

Leave a Reply

Creating a Simple Kafka Consumer