Apache Kafka is a fault tolerant publish-subscribe streaming platform that lets you process streams of records as they occur. If you haven’t installed Kafka yet, see our Kafka Quickstart Tutorial to get up and running quickly.
In this post we will talk about creating a simple Kafka consumer in Java.
Kafka Consumer Code
The example below shows creating a Kafka consumer object and using it to consume messages from the my-topic
topic.
package net.bigdatums.kafka.producer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class PrintStringConsumerExample {
public static void main(String[] args) {
//consumer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
//using auto commit
props.put("enable.auto.commit", "true");
//string inputs and outputs
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//kafka consumer object
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//subscribe to topic
consumer.subscribe(Arrays.asList("my-topic"));
//infinite poll loop
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
In this example we are setting common properties like group.id
which is the ID of the Kafka consumer group, and enable.auto.commit
which is used for automatic commits.
This consumer creates an infinite loop, and each time through will use consumer.poll(100)
to consume up to 100 records. The consumer prints each record’s offset and content.
Running the Kafka Consumer Example
In order to run this example, we need a Zookeeper server and a Kafka server running. For an example of how to do this see our Kafka Quickstart Tutorial to get up and running.
Once we have Zookeeper and Kafka running we can create the my-topic
topic:
# creating my-topic topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
We are now ready to execute our code from the command line or from directly within an IDE and start consuming Kafka messages. Below is an example of running this code on the command line from our bigdatums-kafka-1.0-SNAPSHOT.jar
JAR file.
java -cp bigdatums-kafka-1.0-SNAPSHOT.jar net.bigdatums.kafka.producer.PrintStringConsumerExample
Now that our consumer is running, we will test it by sending some sample messages to Kafka. We can do this in a separate terminal window with the kafka-console-producer
. This is a program that is included with Kafka will take input from the command line (STDIN) and send it as messages to Kafka.
$KAFKA_HOME/bin/kafka-console-producer.sh \
> --broker-list localhost:9092 \
> --topic my-topic \
> --property "parse.key=true" \
> --property "key.separator=:"
key1:value1
key2:value2
key3:value3
As these messages are sent to Kafka, our consumer will print the following lines:
offset = 0, key = key1, value = value1
offset = 1, key = key2, value = value2
offset = 2, key = key3, value = value3