December 01, 2019

Kafka Part 3: Kafka Producer, Callbacks and Keys

What is the role of Kafka producer?
The primary role of a Kafka producer is to take producer properties and record as inputs and write it to an appropriate Kafka broker. Producers serialize, partitions, compresses and load balances data across brokers based on partitions.

The workflow of a producer involves five important steps:
  1. Serialize
  2. Partition
  3. Compress
  4. Accumulate records
  5. Group by broker and send
Kafka producer delivery semantics
There are three delivery semantics:
  • At most once: A message should be delivered maximum only once. It's acceptable to lose a message rather than delivering a message twice in this semantic. Applications adopting at most semantics can easily achieve higher throughput and low latency.
  • At least once: It is acceptable to deliver a message more than once, but no message should be lost. The producer ensures that all messages are delivered for sure even though it may result in message duplication. This is mostly preferred semantics out of all. Applications adopting at least once semantics may have moderate throughput and moderate latency.
  • Exactly once: In this a message is delivered only once and no message should be lost. This is the most difficult delivery semantic of all. Applications adopting exactly once semantics may have lower throughput and higher latency compared other two semantics.
Acknowledgments
The acks setting is a client (producer) configuration. This configuration denotes the number of brokers that must receive the record before we consider the write as successful. Value of acks can be 0, 1 or all.
  • When 'acks=0’, the producer won’t even wait for a response/acknowledgment from the broker. It immediately considers the write successful the moment the record is sent out. In this case, no guarantee can be made that the record was received by the broker. retries config does not take effect as there is no way to know if any failure occurred. When acks property is set to zero you get at most once delivery semantics. 
  • With 'acks=1', the producer will consider the write successful when the leader receives the record. The leader broker will know to immediately respond the moment it receives the record and not wait any longer. However, it does not wait for the replication to finish. This means that in case the leader goes down before the replication is complete, the record is lost. When this property is set to 1 you can achieve at least once delivery semantics.
  • When 'acks= all or -1', the producer will consider the write successful when all of the in-sync replicas receive the record. This is achieved by the leader broker being smart as to when it responds to the request — it’ll send back a response once all the in-sync replicas receive the record themselves. This mode, when used in conjunction with config min.insync.replicas offers high durability. It defines the minimum number of replicas that must acknowledge a write for the write to be considered successful. When acks property is set to all, you can achieve exactly once delivery semantics. 
Adding acks to your producer configs can increase latency, since the producer needs to wait for the acknowledgment

Retries and Timeouts
In case the producer does not receive any acknowledgment, it waits for a time equal to 'request.timeout.ms' and the request is retried. However, one issue during retry is that the order of request may change.

If the ordering of messages doesn't matter then you are good to go. Otherwise, this issue can be resolved by setting a correct value for 'max.in.flight.requests.per.connection'. This config determines the max number of unacknowledged request that can be sent on a connection before blocking. Setting this value to 1 means that only request will be sent at a time thus preserving ordering.

Idempotency/ Duplicate message detection
In the real world, sometimes the broker send the acknowledgment, but there might be a chance that Kafka producer do not receive it(maybe due to network failure), in such case the producer retries the request even though the data was committed on the broker.

Because of this there is a duplication of data. To tackle this situation, you need to make your producer idempotent.

To make producer idempotent we need to set the config enable.idempotence = true.

FRom Kafka 0.11 to avoid message duplicacy, each producer gets assigned a Producer Id (PID) and it includes its PID every time it sends messages to the broker. Also, each message gets an increasing sequence number(SqNo).

There is another sequence for each topic partition on the broker side. The broker keeps track of the largest PID-SqNo combination on a per partition basis. When a lower sequence number is received, it is discarded.

Idempotency comes with the following configurations:
acks = all
retries = Integer.MAX
max.in.flight.requests.per.connection = 1 (0.11 >= Kafka < 1.1) OR 5 (Kafka >= 1.1)

We cannot have conflicting values of acks and max.in.flight.requests.per.connection while using idempotency. e.g we cannot set idempotency = true and acks = none for the producer since idempotenct expects acks = all

Minimum In-Sync Replica
min.insync.replicas is a config on the broker that denotes the minimum number of in-sync replicas required to exist for a broker to allow acks=all requests. That is, all requests with acks=all won’t be processed and receive an error response if the number of in-sync replicas is below the configured minimum amount.

It's a misconception that min.insync.replicas denotes how many replicas need to receive the record in order for the leader to respond to the producer. However, the config is the minimum number of in-sync replicas required to exist in order for the request to be processed. e.g if there are three in-sync replicas and min.insync.replicas=2, the leader will respond only when all three replicas have the record.

When there are not enough nodes to replicate as per min.insync.replica property then the broker would return an exception instead of acknowledgment.

How do you define a Partitioning Key?
Within the Producer, the role of a Partitioning Key is to indicate the destination partition of the message. By default, a hashing-based Partitioner is used to determine the partition ID given the key. Alternatively, users can also use customized Partitions.

The producer decides which partition of the topic the record should get written to. By default murmur2 algorithm is used for partitioning. Murmur 2 algorithm generates a unique hash code based on the Key passed and the appropriate partition is decided. In case the key is not passed the partitions are chosen in a round-robin fashion.

In the Producer, when does QueueFullException occur?
QueueFullException typically occurs when the Producer attempts to send messages at a pace that the Broker cannot handle. Since the Producer doesn’t block, users will need to add enough brokers to collaboratively handle the increased load.

Explain the role of the Kafka Producer API.
The role of Kafka’s Producer API is to wrap the two producers – kafka.producer.SyncProducer and the kafka.producer.async.AsyncProducer. The goal is to expose all the producer functionality through a single API to the client.

Kafka Producer Demo
Let us create a Spring Starter Maven project.
  • Name: KafkaCourse
  • Group id: com.khs.kafka
  • Java Version: 1.8
  • Artifact: kafka-course
  • Version: 1.0
  • Package: com.khs.kafka
  • Dependencies: spring-boot-starter-web, kafka-clients, slf4j-simple (GIT URL for pom.xml)
  • GIT URL for the project: Kafka Course
Let's create a new package com.khs.kafka.producer and create a class with name ProducerDemo inside that package.

To write our own producer, we follow three steps:
1. Create Producer properties
2. Create the producer
3. Create a producer record and Send data

GIT URL: Producer Demo



Apache Kafka offers various Kafka Properties which are used for creating a producer. We will set the required properties like bootstrap.servers, key.serializer, value.serializer.
  • bootstrap.servers: It is a list of the port pairs which are used for establishing an initial connection to the Kafka cluster. The users can use the bootstrap servers only for making an initial connection only. This server is present in the host:port, host:port,... form.
  • key.serializer: It is a type of Serializer class of the key which is used to implement the 'org.apache.kafka.common.serialization.Serializer' interface.
  • value.serializer: It is a type of Serializer class which implements the 'org.apache.kafka.common.serialization.Serializer' interface.
Check all the producer propertiesProducer Configs in Kafka Documentation.

Kafka depends on Serializer and Deserializer so that Producer and Consumer both know how to communicate and understand the messages. In order to prepare the message for transmission from the producer to the broker, we use serializers. In other words, before transmitting the entire message to the broker, let the producer know how to convert the message into byte array we use serializers. Similarly, to convert the byte array back to the object we use the deserializers by the consumer. Kafka offers serializers and deserializers for only a few data types, such as String, Long, Double, Integer and Bytes.

Instead of bootstrap.servers, key.serializer, value.serializer we can use ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG and ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG respectively.

Producer Record
In order to send the data to Kafka, the user need to create a ProducerRecord. It is because all the producers lie inside a producer record. In the producer record, the producer specifies the topic name as well as the message which is to be delivered to Kafka.

ProducerRecord < String, String > record=new ProducerRecord < String, String > (TOPIC_NAME, MESSAGE);  

Steps to test the producer:
1). Start the Zookeeper from command line: zookeeper-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\zookeeper.properties
2). Start the Kafka from command line: kafka-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\server.properties
3). Run the consumer from command line: kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic
(Since from the producer we are sending data for 'first_topic' we need to mention the same while running the consumer)
4). Run ProducerDemo.java. You will notice 'Hello World' will be printed on the consumer console.

Is producer.flush() a must?
The data produced by a producer is asynchronous. Therefore, two additional functions, i.e., flush() and close() are required (as seen in the above snapshot). The flush() will force all the data to get produced and close() stops the producer. If these functions are not executed, data will never be sent to the Kafka, and the consumer will not be able to read it.

The new Producer API provides a flush() call, that client can optionally choose to invoke. If we use this, the key number of bytes between two flush() calls is key factor for good performance. Microbenchmarking shows that around 4MB we get good perf (we used event of 1KB size).

Thumb rule to set batch size when using flush()
batch.size = total bytes between flush() // partition count.

Difference between flush() and poll()
poll() polls the producer for events and calls the corresponding callbacks (if registered). Calling poll() just after a send() does not make the producer synchronous as it's unlikely the message just sent will already have reached the broker and a delivery report was already sent back to the client.

Instead flush() will block until the previously sent messages have been delivered (or errored), effectively making the producer synchronous.

Kafka Producer Callbacks

Kafka provides some callback methods, by using them we can check whether the data was correctly produced, where it was produced, about its offset and partition value, etc.

For performing the callbacks, we need to implement a callback function, which is implemented for asynchronously handling the request completion. That's why it's return type will be void. This function will be implemented in the block where the producer sends data to the Kafka. The callback function used by the producer is the onCompletion(). This callback method takes two arguments:
Metadata of the Record: Metadata of the record means fetching the information regarding the partition and its offsets. If it is not null, an error will be thrown.
Exception

Let's create ProducerCallbackDemo class inside com.khs.kafka.producer.

GIT URL: Producer Callback Demo



Steps to test the producer:
1). Start the Zookeeper from command line: zookeeper-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\zookeeper.properties
2). Start the Kafka from command line: kafka-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\server.properties
3). Run the consumer from command line: kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic
(Since from the producer we are sending data for 'first_topic' we need to mention the same while running the consumer)
4). Run ProducerCallBackDemo.java. You will notice 'Hello World' will be printed on the consumer console and log from the callback method is printed on producer console.

Kafka Producer using Keys

FYI, if the key is not send the data from a producer are sent to partitions in a round robin order. What producers can do besides just sending messages is to add a key that goes along with it. All the messages that come with the specific key will go to the same partition.

GIT URL: Kafka Producer using Keys

Steps to test the producer:
1). Start the Zookeeper from command line: zookeeper-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\zookeeper.properties
2). Start the Kafka from command line: kafka-server-start.bat D:\Softwares\kafka_2.12-2.3.0\config\server.properties
3). Open another command prompt from where we will create a new topic with name 'my_topic', this topic will have 3 partitions. Execute the command, 'kafka-topics --zookeeper 127.0.0.1:2181 --topic my_topic --create --partitions 3 --replication-factor 1'
4). To verify if the topic is created with 3 partitions, execute: 'kafka-topics --zookeeper 127.0.0.1:2181 --topic my_topic --describe'
5). Run the consumer from command line: kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic my_topic
(Since from the producer we are sending data for 'my_topic' we need to mention the same while running the consumer)
6). Run ProducerDemoKeys.java. You will notice 'Hello World' will be printed on the consumer console. And on the producer console, you can see message 1, 2 and 3 will go in same partition, 4-6 in other and 7-9 will go in some other partition because of Unique keys.

Is it possible to get the message offset after producing?
You cannot do that from a class that behaves as a producer like in most queue systems, its role is to fire and forget the messages. The broker will do the rest of the work like appropriate metadata handling with id’s, offsets, etc. As a consumer of the message, you can get the offset from a Kafka broker.

So this completes our Kafka Producer.
Happy Coding!!

-K Himaanshu Shuklaa..

No comments:

Post a Comment