Pages

December 01, 2019

Kafka Part 4: Consumers

We have learned how to create Kafka Producer in the previous part of Kafka series. Now we will create Kafka consumer.

Reading data from Kafka is a bit different than reading data from any other messaging systems. Applications that need to read data from Kafka use a KafkaConsumer to subscribe to Kafka topics and receive messages from these topics.

In this blog post, we will discuss about the interview questions related to kafka Consumers and we will also create our own consumer.

Following steps need to be performed to create a simple Kafka consumer:
  1. Create Logger
  2. Create consumer properties.
  3. Create a consumer.
  4. Subscribe the consumer to a specific topic.
  5. Poll for some new data
Consumer Properties
Like Kafka Producer, Apache Kafka offers various different properties for creating a consumer as well. Here are the key properties:
  • key.deserializer: It is a Deserializer class for the key, which is used to implement the 'org.apache.kafka.common.serialization.Deserializer' interface.
  • value.deserializer: A Deserializer class for value which implements the 'org.apache.kafka.common.serialization.Desrializer' interface.
  • bootstrap.servers: It is a list of host/port pairs which is used to establish an initial connection with the Kafka cluster. It does not contain a full set of servers that a client requires. Only the servers which are required for bootstrapping are required.
  • group.id: It is a unique string which identifies the consumer of a consumer group. This property is needed when a consumer uses either Kafka based offset management strategy or group management functionality via subscribing to a topic.
  • auto.offset.reset: This property is required when no initial offset is present or if the current offset does not exist anymore on the server. There are the following values used to reset the offset values:
    • earliest: This offset variable automatically reset the value to its earliest offset.
    • latest: This offset variable reset the offset value to its latest offset.
    • none: If no previous offset is found for the previous group, it throws an exception to the consumer.
    • anything else: It throws an exception to the consumer.
Creating and subscribing the consumer
  • To create the object of KafkaConsumer: KafkaConsumer consumer=new KafkaConsumer<>(properties);
  • To read the message from a topic, we need to connect the consumer to the specified topic. A consumer can be subscribed through various subscribe API's. We can use Collections.singleton if there is only one topic or Arrays.asList() to subscribe either to one or multiple topics. 
  • We need to specify the topics name directly or through a string variable to read the messages. There can be multiple topics also separated by the comma. In our case it is consumer.subscribe(Arrays.asList(TOPIC_NAME));
Polling for new data
  • The consumer reads data from Kafka through the poll() method. This method returns the data fetched from the current partition's offset.
  • The time duration is specified till which it waits for the data, else returns an empty ConsumerRecord to the consumer.
  • We can get the key, partitions, record offset and value once we get the 'record'.
Java Consumer
Let us create a new package com.khs.kafka.consumer, inside this package create a ConsumerDemo class.

GIT URL: ConsumerDemo.java


Steps to test the consumer:
1). Start the Zookeeper from command line: zookeeper-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\zookeeper.properties
2). Start the Kafka from command line: kafka-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\server.properties
3). Run ProducerDemo.java (which we created in previous tutorial). You will notice 'Hello World' will be printed on the consumer console.
4). Now run the ConsumerDemo. You will notice below log in the console:
20:17:07.134 [main] INFO com.khs.kafka.consumer.ConsumerDemo - Key:null
Value:Hello World!
Partition:0
Offsets:1

So this completes our Kafka Consumer.
Happy Coding!!

-K Himaanshu Shuklaa..

No comments:

Post a Comment