December 01, 2019

Kafka Part 3: Kafka Producer, Callbacks and Keys

Let us create a Spring Starter Maven project.
  • Name: KafkaCourse
  • Group id: com.khs.kafka
  • Java Version: 1.8
  • Artifact: kafka-course
  • Version: 1.0
  • Package: com.khs.kafka
  • Dependencies: spring-boot-starter-web, kafka-clients, slf4j-simple (GIT URL for pom.xml)
  • GIT URL for the project: Kafka Course

Kafka Producer Demo

Let's create a new package com.khs.kafka.producer and create a class with name ProducerDemo inside that package.

To write our own producer, we follow three steps:
1. Create Producer properties
2. Create the producer
3. Create a producer record and Send data

GIT URL: Producer Demo



Apache Kafka offers various Kafka Properties which are used for creating a producer. We will set the required properties like bootstrap.servers, key.serializer, value.serializer.
  • bootstrap.servers: It is a list of the port pairs which are used for establishing an initial connection to the Kafka cluster. The users can use the bootstrap servers only for making an initial connection only. This server is present in the host:port, host:port,... form.
  • key.serializer: It is a type of Serializer class of the key which is used to implement the 'org.apache.kafka.common.serialization.Serializer' interface.
  • value.serializer: It is a type of Serializer class which implements the 'org.apache.kafka.common.serialization.Serializer' interface.
Check all the producer propertiesProducer Configs in Kafka Documentation.

Kafka depends on Serializer and Deserializer so that Producer and Consumer both know how to communicate and understand the messages. In order to prepare the message for transmission from the producer to the broker, we use serializers. In other words, before transmitting the entire message to the broker, let the producer know how to convert the message into byte array we use serializers. Similarly, to convert the byte array back to the object we use the deserializers by the consumer. Kafka offers serializers and deserializers for only a few data types, such as String, Long, Double, Integer and Bytes.

Instead of bootstrap.servers, key.serializer, value.serializer we can use ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG and ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG respectively.

Producer Record
In order to send the data to Kafka, the user need to create a ProducerRecord. It is because all the producers lie inside a producer record. In the producer record, the producer specifies the topic name as well as the message which is to be delivered to Kafka.

ProducerRecord < String, String > record=new ProducerRecord < String, String > (TOPIC_NAME, MESSAGE);  

Steps to test the producer:
1). Start the Zookeeper from command line: zookeeper-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\zookeeper.properties
2). Start the Kafka from command line: kafka-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\server.properties
3). Run the consumer from command line: kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic
(Since from the producer we are sending data for 'first_topic' we need to mention the same while running the consumer)
4). Run ProducerDemo.java. You will notice 'Hello World' will be printed on the consumer console.

Is producer.flush() a must?
The data produced by a producer is asynchronous. Therefore, two additional functions, i.e., flush() and close() are required (as seen in the above snapshot). The flush() will force all the data to get produced and close() stops the producer. If these functions are not executed, data will never be sent to the Kafka, and the consumer will not be able to read it.

The new Producer API provides a flush() call, that client can optionally choose to invoke. If we use this, the key number of bytes between two flush() calls is key factor for good performance. Microbenchmarking shows that around 4MB we get good perf (we used event of 1KB size).

Thumb rule to set batch size when using flush()
batch.size = total bytes between flush() // partition count.

Difference between flush() and poll()
poll() polls the producer for events and calls the corresponding callbacks (if registered). Calling poll() just after a send() does not make the producer synchronous as it's unlikely the message just sent will already have reached the broker and a delivery report was already sent back to the client.

Instead flush() will block until the previously sent messages have been delivered (or errored), effectively making the producer synchronous.

Kafka Producer Callbacks

Kafka provides some callback methods, by using them we can check whether the data was correctly produced, where it was produced, about its offset and partition value, etc.

For performing the callbacks, we need to implement a callback function, which is implemented for asynchronously handling the request completion. That's why it's return type will be void. This function will be implemented in the block where the producer sends data to the Kafka. The callback function used by the producer is the onCompletion(). This callback method takes two arguments:
Metadata of the Record: Metadata of the record means fetching the information regarding the partition and its offsets. If it is not null, an error will be thrown.
Exception

Let's create ProducerCallbackDemo class inside com.khs.kafka.producer.

GIT URL: Producer Callback Demo



Steps to test the producer:
1). Start the Zookeeper from command line: zookeeper-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\zookeeper.properties
2). Start the Kafka from command line: kafka-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\server.properties
3). Run the consumer from command line: kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic
(Since from the producer we are sending data for 'first_topic' we need to mention the same while running the consumer)
4). Run ProducerCallBackDemo.java. You will notice 'Hello World' will be printed on the consumer console and log from the callback method is printed on producer console.

Kafka Producer using Keys

FYI, if the key is not send the data from a producer are sent to partitions in a round robin order. What producers can do besides just sending messages is to add a key that goes along with it. All the messages that come with the specific key will go to the same partition.

GIT URL: Kafka Producer using Keys

Steps to test the producer:
1). Start the Zookeeper from command line: zookeeper-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\zookeeper.properties
2). Start the Kafka from command line: kafka-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\server.properties
3). Open another command prompt from where we will create a new topic with name 'my_topic', this topic will have 3 partitions. Execute the command, 'kafka-topics --zookeeper 127.0.0.1:2181 --topic my_topic --create --partitions 3 --replication-factor 1'
4). To verify if the topic is created with 3 partitions, execute: 'kafka-topics --zookeeper 127.0.0.1:2181 --topic my_topic --describe'
5). Run the consumer from command line: kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic
(Since from the producer we are sending data for 'my_topic' we need to mention the same while running the consumer)
6). Run ProducerDemoKeys.java. You will notice 'Hello World' will be printed on the consumer console. And on the producer console, you can see message 1, 2 and 3 will go in same partition, 4-6 in other and 7-9 will go in some other partition because of Unique keys.

So this completes our Kafka Producer.
Happy Coding!!

-K Himaanshu Shuklaa..

No comments:

Post a Comment

RSSChomp Blog Directory