December 01, 2019

Kafka Part 5: Consumer Group, Partition Rebalance, Heartbeat

What is a Consumer Group?
Consumer Groups is a concept exclusive to Kafka. Every Kafka consumer group consists of one or more consumers that jointly consume a set of subscribed topics.

Let's say we have an application, which read messages from a Kafka topic, perform some validations and so some calculations, and write the results to another data store.

In this case our application will create a consumer object, subscribe to the appropriate topic, and start receiving messages, validating them and writing the results.

This may work well for a while, but imagine a scenario when the rate at which producers write messages to the topic exceeds the rate at which your application can validate them?

Well it might create trouble for us. If we are dependent on a single consumer for reading and processing the data, our application may fall farther and farther behind, because it might be unable to keep up with the rate of incoming messages.

To resolve this we can use multiple consumers. Just like multiple producers (which can write to the same topic), we use multiple consumers to read from the same topic. Multiple consumers will split the data between them.

All the kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic.In the previous post we had created a simple

Kafka consumer in Java. If we execute ConsumerDemo second time it will not poll data from the kafka (unless new data is produced).

We had mentioned consumer group as 'my-fourth-application', once its started it will not fetch the same data again even if we use 'earliest' as auto.offset.reset.

Let's understand this by taking an example:

Assume our topic has three partitions. Now suppose we created a new consumer which is the only consumer in group and we use it to subscribe to topic.

When there is only one instance of Consumer is running, it will get all messages from all three partitions.
If we add another consumer to the same group, one of the consumer will get messages from two partitions and the second consumer will get message from the third partition. Perhaps messages from partition 0 and 1 go to first consumer and messages from partitions 3 go to second consumer.

If we add one more consumer, each consumer will get message from one partition.

If we add fourth consumer, it will remain inactive/ idle since number of partition is less then number of consumers. The idle/inactive consumer will get no messages at all.

In this way we can scale the data computation from Kafka topic by adding more consumers to a consumer group. Mostly Kafka consumers perform high-latency operations (such as write to a database). In such cases, a single consumer might slow down the consumption rate. Adding more consumers that share the load by having each consumer own just a subset of the partitions and messages is our main method of scaling. That's why mostly we create topics with a large number of partitions, as it allows adding more consumers when the load increases.

In some of the cases, we have multiple applications that need to read data from the same topic. In such scenario's, we want each application to get all of the messages, rather than just a subset. To make sure an application gets all the messages in a topic, ensure the application has its own consumer group.

Partition Rebalance
  • Each consumer in a consumer group share ownership of the partitions in the topics they subscribe to. When we add a new consumer to the group, it starts consuming messages from partitions previously consumed by another consumer. When any consumer shuts down/crashes it leaves the group, and the partitions it used to consume will be consumed by one of the remaining consumers. This is reassignment of partitions.
  • Reassignment of partitions to consumers also happen when the topics the consumer group is consuming are modified (we can add new partition at any point of time). 
  • Moving partition ownership from one consumer to another is called a rebalance.
  • Re-balancing is important because it provide the consumer group with high availability and scalability (allowing us to easily and safely add or remove consumers). However, during a rebalance, consumers can’t consume messages, so a rebalance is basically a short window of unavailability of the entire consumer group. 
  • In addition, when partitions are moved from one consumer to another, the consumer loses its current state. If it was caching any data, it will need to refresh its caches—slowing down the application until the consumer sets up its state again.
Heartbeats
  • By sending heartbeats to a Kafka broker designated as the group coordinator (this broker can be different for different consumer groups), all the consumers maintain membership in a consumer group and ownership of the partitions assigned to them.
  • As long as the consumer is sending heartbeats at regular intervals, it is assumed to be alive, and processing messages from its partitions. Heartbeats are sent when the consumer polls data (retrieves records) and also when it commits records it has consumed.
  • A consumer will be considered as dead when it stops sending heartbeats for long time. In this case its session will time out and the group coordinator will trigger a rebalance.
  • If a consumer crashed and stopped processing messages, it will take the group coordinator a few seconds without heartbeats to decide it is dead and trigger the rebalance. During those seconds, no messages will be processed from the partitions owned by the dead consumer. 
  • When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing.
session.timeout.ms

  • It is used to detect consumer failures when using Kafka’s group management facility. 
  • The consumer sends periodic heartbeats to indicate its liveness to the broker. 
  • If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.
  • By default its 10 seconds.

heartbeat.interval.ms

  • Since Kafka 0.10.1.0, the heartbeat happens from a separate, background thread, different to the thread where Poll() runs.
  • Heartbeat.interval.ms is the expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. 
  • Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
  • The default value is 3 seconds.

max.poll.interval.ms

  • This is introduced with Kafka 0.10.1.0.
  • It is the maximum amount of time between two polls() calls before declaring the consumer dead.
  • The default value is 5 minutes, except for Kafka Streams, which increases it to Integer.MAX_VALUE.

-K Himaanshu Shuklaa..

No comments:

Post a Comment