December 01, 2019

Kafka Part 10: Implement Exactly Once Processing in Kafka

Let's say we are designing a system using Apache Kafka which will send some kind of messages from one system to another. While designing to need to consider below questions:
  • How do we guarantee all messages are processed?
  • How do we avoid/handle duplicate messages?
A timeout could occur publishing messages to Kafka. Our consumer process could run out of memory or crash while writing to a downstream database. Or may be our broker could run out of disk space, a network partition may form between ZooKeeper instances.

How we handle these failures determines the processing guarantee of the system as a whole.

By using consumer API we can pull data from Kafka, and while creating the consumer, a client may specify a consumer group.

The consumer group identifies a collection of consumers that coordinate to read data from a set of topic partitions. The partitions of any topics subscribed to by consumers in a consumer group are guaranteed to be assigned to at most one individual consumer in that group at any time.

The messages from each topic partition are delivered to the assigned consumer strictly in the order they are stored in the log.

A consumer needs to save the offset of the next message it will read in each topic partition it is assigned to. FYI, Consumers can store their offsets anywhere they want, but by default and for all Kafka Streams applications, these are stored back in Kafka itself in an internal topic called _consumer_offsets.

To do this consumers either enable automatic periodic commitment of offsets back to Kafka by setting the configuration flag 'enable.auto.commit' to true or by making an explicit call to commit the offsets.

The message processing guarantees fall under the following categories:
1). No guarantee: A system that provides no guarantee means any given message could be processed once, multiple times or not at all.

This is done in Kafka by using a consumer with enable.auto.commit set to true (which is the default) and for each batch of messages we asynchronously process and save the results to a database.

When auto commit is enabled, the consumer will save offsets back to Apache Kafka periodically at the start of subsequent poll calls. The frequency of these commits is set by the configuration parameter 'auto.commit.interval.ms'.

There might be a chance when we are saving the messages in our database, after which application crashed before the progress is saved. In such scenarios we will end upreporcessing those messages again the next run and save them to the database twice.

What if progress is saved prior to the results being saved to the database? The program crashes, these messages will not be reprocessed in the next run meaning you have data loss.

2). At most once: This will guarantee that the message will be processed exactly once, or not at all. This guarantee is often known as 'best effort' semantics.
  • A producer performs a ‘fire-and-forget’ approach sending a message to Kafka with no retries and ignoring any response from the broker. This approach is useful where progress is a higher priority than completeness.
  • A producer saves its progress reading from a source system first, then writes data into Apache Kafka. If the producer crashes before the second step, the data will never be delivered to Kafka.
  • A consumer receives a batch of messages from Apache Kafka, and writes it to a database. The consumer application has 'enable.auto.commit' set to false, that means offsets are committed back to Kafka prior to writing to the database. If the consumer fails after saving the offsets back to Kafka, but before writing the data to the database, it will skip these records next time it runs and data will be lost.
3). At least once: This means we will definitely receive and process every message, but we may process some messages multiple times in case of a failure.

4). Effectively once: It is also known as exactly once, this promises consumers will process every message once. We can achieve this by idempotent writes and transactions.

Idempotent Writes
A producer is unable to determine if a previous publish call succeeded, so that it canpush the batch of messages again.

Sadly in previous versions of Apache Kafka, the broker had no means of determining if the second batch is a retry of the previous batch.

But from Kafka 0.11 onwards, producers can opt-in to idempotent writes (which is disabled by default), by setting the configuration flag enable.idempotence to true.

This will cause the client to request a producer id (pid) from a broker. This pid helps the Kafka cluster to uniquly identify the producer.

The producer sends the pid along with a sequence number with each batch of records. The sequence number logically increases by one for each record sent by the same producer.

With the sequence number of the first record in the batch along with the batch record count, the broker can easily figure out all the sequence numbers for a batch.

When the idempotence is enabled, and the broker receives a new batch of records, it will check if the sequence numbers provided are ones it has already committed? If yes than the batch is treated as a retry and ignored. Also a ‘duplicate’ acknowledgement is sent back to the client.

In the previous version of Kafka, for idempotent writes the brokers could only deal with one inflight batch at a time per producer in order to guarantee ordering of messages from the same producer.

But from Kafka 1.0.0, support for idempotent writes with up to 5 concurrent requests (this is done by setting 'max.in.flight.requests.per.connection=5') from the same producer are now supported. Which means you can have up to 5 inflight requests and still be sure they will be written to the log in the correct order.

Atomic Transactions
Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be.

The transactions are enabled through producer configuration by providing the 'transactional.id'. But to do this we need to first enable idempotent writes (enable.idempotence=true).

Kafka will use this 'transaction id' as part of its algorithm to deduplicate any message this producer sends, ensuring idempotency.

If the producer accidentally sends the same message to Kafka more than once, 'enable.idempotence' and 'transactional.id' enable it to notice. We need to make sure the transaction id is distinct for each producer, though consistent across restarts.

Also, the producer then needs to register itself with the Kafka cluster by calling initTransactions.

The transactional id is used to identify the same producer across process restarts. When reconnecting with the same transactional id, a producer will be assigned the same pid and an epoch number associated with that pid will be incremented. Kafka will then guarantee that any pending transactions from previous sessions for that pid will either be committed or aborted before the producer can send any new data. Any attempt by an old zombie instance of the producer with an older epoch number to perform operations will now fail.

Effectively once production of data to Kafka by Transaction Aware Producer
Let's create a Kafka Producer:

Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", "localhost:9092");
producerProperties.put("enable.idempotence", "true");
producerProperties.put("transactional.id", "prod-1");
 
KafkaProducer < String, String > producer = new KafkaProducer(producerProperties);
producer.initTransactions();
producer.beginTransaction();
sourceOffsetRecords.forEach(producer::send);
outputRecords.forEach(producer::send);

producer.commitTransaction();


Effectively once consuming of data from Kafka by writing Transaction-Aware Consumer
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", "localhost:9092");
consumerProperties.put("group.id", "my-group-id");
consumerProperties.put("enable.auto.commit", "false");
consumerProperties.put("isolation.level", "read_committed");

KafkaConsumer < String, String > consumer = new KafkaConsumer < > (consumerProperties);
consumer.subscribe(singleton(TOPIC_NAME));

The above consumer is reading from topic with name 'TOPIC_NAME'. We have set isolation.level as read_committed, this will ensure that we don't read any transactional messages before the transaction completes. The default value of isolation.level is read_uncommitted.

Since we used read_committed, it means that no messages that were written to the input topic in the same transaction will be read by this consumer until they are all written.

For Kafka clients reading from Kafka there is another option to achieve effectively-once semantics by performing the idempotent operation then saving the progress. An idempotent operation is one where if you perform it multiple times it has the same effect as if it were performed once. If the application fails after performing the operation but before saving progress, it won’t matter that the next time the application is restarted the operation is run again.

-K Himaanshu Shuklaa..

No comments:

Post a Comment