December 01, 2019

Kafka Part 6: Assign and Seek

Assign

When we work with consumer groups, the partitions are assigned automatically to consumers and are rebalanced automatically when consumers are added or removed from the group.

ALSO READ: Kafka Consumer Group, Partition Rebalance, Heartbeat

Sometimes we need a single consumer that always read data from all the partitions in a topic, or from a specific partition in a topic. In this case, there is no reason for groups or rebalancing. We just assign the consumer-specific topic and/or partitions, consume messages, and commit offsets on occasion.

When we are aware about the details of partitions from where the consumer is supposed to read, we don’t need to subscribe to a topic. Instead of this we can assign the partitions to the consumer. A consumer can either subscribe to topics (and be part of a consumer group), or assign itself partitions, but not both at the same time.

NOTE: If someone adds new partitions to the topic, the consumer will not be notified. We need to handle this by checking consumer.partitionsFor() periodically or simply by bouncing the application whenever partitions are added.

E.g:
1). We start by asking the cluster for the partitions available in the topic by calling 'consumer.partitionsFor()'. In case if we want to consume a specific partition, we can skip this part.
2). Once we know which partitions we want, we call assign() with the list.

GIT URL: ConsumerDemoAssignSeek.testAssign()


Seek: Consuming Records with Specific Offsets

The poll() method start consuming the messages from the last committed offset in each partition and  proceed in processing all messages in sequence. In real life sometimes we want to start reading at a different offset, in such cases we need to use seek() method.

seekToBeginning(Collection topicPartitions) and seekToEnd(Collection topicPartitions): When we want to start reading all messages from the beginning of the partition, or you want to skip all the way to the end of the partition and start consuming only new messages, we can use these methods.

The Kafka API also lets you seek a specific offset. With this feature we can go back a few messages or skip ahead a few messages. Also, this feature of Kafka to read from a specific offset will help us when we store offsets are stored in a system other than Kafka

If the offset is stored in a database and not in Kafka, how will our consumer know where to start reading when it is assigned a partition? In such case we need to use seek(). When the consumer starts or when new partitions are assigned, it can look up the offset in the database and seek() to that location.

NOTE: When the consumer first starts, after we subscribe to topics, we call poll() once to make sure we join a consumer group and get assigned partitions, and then we immediately seek() to the correct offset in the partitions we are assigned to. The seek() only updates the position we are consuming from, so the next poll() will fetch the right messages. If there was an error in seek() (e.g., the offset does not exist), the exception will be thrown by poll().

Example of seek and assign:

GIT URL: ConsumerDemoAssignSeek.java


-K Himaanshu Shuklaa..

No comments:

Post a Comment