Talk about kafka’s group coordinator

  kafka

Order

This article mainly talks about a group coordinator of kafka. At the time of Kafka version 0.9.0, the new consumer config was started. The new consumer config uses bootstrap.servers to replace the previous version of zookeeper.connect, mainly to gradually weaken zk dependence and hide zk dependence behind broker.

group coordinator

Using bootstrap.servers to replace the previous version of zookeeper.connect involves the following two changes:

  • The role of GroupCoordinator has been added to the Server side.
  • The offset information of topic is stored in zookeeper (/consumers/< group.id >/offsets/< topic >/< partitionid >, ZK write performance is not high) to a special topic (__consumer_offsets)

Starting from version 0.8.2, Kafka supports saving consumer’s displacement information in topic within Kafka (starting from version 0.9.0, offset is stored in system topic by default)

The Coordinator generally refers to the group Coordinator running on broker, which is used to manage each member of the Consumer Group. each KafkaServer has a GroupCoordinator instance, which manages multiple consumer groups, and is mainly used for offset displacement management and Consumer Rebalance.

Rebalance timing

Partition is redistributed in consumer under the following conditions:

  • Condition 1: New consumer Joins
  • Condition 2: The old consumer is dead
  • Condition 3: The coordinator is dead, and the cluster elects a new coordinator.
  • Condition 4: partition of 4:topic is newly added
  • Condition 5: Consumer Calls unsubscrible (), Unsubscribes topic

__consumer_offsets

Consumer submits the offset by sending an OffsetCommitRequest request to the specified broker (offset manager). This request includes a series of partitions and the consumption locations (offsets) in these partitions. The offset manager appends a message in the form of a key-value to a specified topic(__consumer_offsets). Key is composed of consumerGroup-topic-partition, and value is offset.
图片描述

A recent record will also be maintained in memory, in order to quickly give OffsetFetchRequests without scanning all offset topic logs under the condition of specifying key. If the offset manager fails for some reason, the new broker will become the offset manager and regenerate the offset cache by scanning the offset topic.
图片描述

Clear offset log

Configuration

log.cleaner.enable=true

compact

图片描述

doc