Talk about kafka client chunkQueue and MaxLag values



The previous article discussed the difference between the MaxLag value of ConsumerFetcherManager and the lag value of ConsumerOffsetChecker. However, the value of MaxLag has not been fully explained, so let’s go further here to find out how to make the MaxLag of ConsumerFetcherManager have a value.


kafka_2.10-! /kafka/server/AbstractFetcherThread.scala

override def doWork() {
    inLock(partitionMapLock) {
      if (partitionMap.isEmpty)
        partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
      partitionMap.foreach {
        case((topicAndPartition, offset)) =>
          fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                           offset, fetchSize)

    val fetchRequest =
    if (!fetchRequest.requestInfo.isEmpty)

It is worth noting that fetchRequest is built here.
Here’s partitionMap,key is TopicAndPartition, and value is the largest offset in the locality.
Each time a fetch is made, the fetchRequest is constructed with the maximum value already taken locally and the fetch size.


kafka_2.10-! /kafka/api/FetchRequest.scala

def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
    requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))

It can be seen that offset and fetchSize here determine the starting position and number of data fetcher pulls from broker.


kafka_2.10-! /kafka/consumer/ConsumerFetcherThread.scala

class ConsumerFetcherThread(name: String,
                            val config: ConsumerConfig,
                            sourceBroker: Broker,
                            partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                            val consumerFetcherManager: ConsumerFetcherManager)
        extends AbstractFetcherThread(name = name, 
                                      clientId = config.clientId,
                                      sourceBroker = sourceBroker,
                                      socketTimeout = config.socketTimeoutMs,
                                      socketBufferSize = config.socketReceiveBufferBytes,
                                      fetchSize = config.fetchMessageMaxBytes,
                                      fetcherBrokerId = Request.OrdinaryConsumerId,
                                      maxWait = config.fetchWaitMaxMs,
                                      minBytes = config.fetchMinBytes,
                                      isInterruptible = true) {

FetchSize used here comes from config.fetchMessageMaxBytes

kafka_2.10-! /kafka/consumer/ConsumerConfig.scala

class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
    /** the number of byes of messages to attempt to fetch */
      val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
object ConsumerConfig extends Config {
  val RefreshMetadataBackoffMs = 200
  val SocketTimeout = 30 * 1000
  val SocketBufferSize = 64*1024
  val FetchSize = 1024 * 1024
  val MaxFetchSize = 10*FetchSize
  val NumConsumerFetchers = 1
  val DefaultFetcherBackoffMs = 1000
  val AutoCommit = true
  val AutoCommitInterval = 60 * 1000
  val MaxQueuedChunks = 2
  val MaxRebalanceRetries = 4
  val AutoOffsetReset = OffsetRequest.LargestTimeString
  val ConsumerTimeoutMs = -1
  val MinFetchBytes = 1
  val MaxFetchWaitMs = 100
  val MirrorTopicsWhitelist = ""
  val MirrorTopicsBlacklist = ""
  val MirrorConsumerNumThreads = 1
  val OffsetsChannelBackoffMs = 1000
  val OffsetsChannelSocketTimeoutMs = 10000
  val OffsetsCommitMaxRetries = 5
  val OffsetsStorage = "zookeeper"

  val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
  val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
  val ExcludeInternalTopics = true
  val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */
  val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
  val DefaultClientId = ""

This fetchSize defaults to 1024 * 1024, that is 1048576, that is, 1048576 pieces are pulled every fetch.


private def processFetchRequest(fetchRequest: FetchRequest) {
    val partitionsWithError = new mutable.HashSet[TopicAndPartition]
    var response: FetchResponse = null
    try {
      trace("Issuing to broker %d of fetch request %s".format(, fetchRequest))
      response = simpleConsumer.fetch(fetchRequest)
    } catch {
      case t: Throwable =>
        if (isRunning.get) {
          warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
          partitionMapLock synchronized {
            partitionsWithError ++= partitionMap.keys

    if (response != null) {
      // process fetched data
      inLock(partitionMapLock) { {
          case(topicAndPartition, partitionData) =>
            val (topic, partitionId) = topicAndPartition.asTuple
            val currentOffset = partitionMap.get(topicAndPartition)
            // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
            if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
              partitionData.error match {
                case ErrorMapping.NoError =>
                  try {
                    val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
                    val validBytes = messages.validBytes
                    val newOffset = messages.shallowIterator.toSeq.lastOption match {
                      case Some(m: MessageAndOffset) => m.nextOffset
                      case None => currentOffset.get
                    partitionMap.put(topicAndPartition, newOffset)
                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset

                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                    processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                  } catch {
                    case ime: InvalidMessageException =>
                      // we log the error and continue. This ensures two things
                      // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                      // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                      //    should get fixed in the subsequent fetches
                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
                    case e: Throwable =>
                      throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                                               .format(topic, partitionId, currentOffset.get), e)
                case ErrorMapping.OffsetOutOfRangeCode =>
                  try {
                    val newOffset = handleOffsetOutOfRange(topicAndPartition)
                    partitionMap.put(topicAndPartition, newOffset)
                    error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                      .format(currentOffset.get, topic, partitionId, newOffset))
                  } catch {
                    case e: Throwable =>
                      error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId,, e)
                      partitionsWithError += topicAndPartition
                case _ =>
                  if (isRunning.get) {
                    error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId,,
                    partitionsWithError += topicAndPartition

    if(partitionsWithError.size > 0) {
      debug("handling partitions with error for %s".format(partitionsWithError))


kafka_2.10-! /kafka/consumer/ConsumerFetcherThread.scala

// process fetched data
  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
    val pti = partitionMap(topicAndPartition)
    if (pti.getFetchOffset != fetchOffset)
      throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
                                .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))


kafka_2.10-! /kafka/consumer/PartitionTopicInfo.scala

   * Enqueue a message set for processing.
  def enqueue(messages: ByteBufferMessageSet) {
    val size = messages.validBytes
    if(size > 0) {
      val next = messages.shallowIterator.toSeq.last.nextOffset
      trace("Updating fetch offset = " + fetchedOffset.get + " to " + next)
      chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
      debug("updated fetch offset of (%s) to %d".format(this, next))
    } else if(messages.sizeInBytes > 0) {
      chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))

If the data is empty, it will not be put into the queue.

ChunkQueue size

kafka_2.10-! /kafka/consumer/ZookeeperConsumerConnector.scala

def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
      : Map[String,List[KafkaStream[K,V]]] = {
    debug("entering consume ")
    if (topicCountMap == null)
      throw new RuntimeException("topicCountMap is null")

    val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)

    val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

    // make a list of (queue,stream) pairs, one pair for each threadId
    val queuesAndStreams = => => {
        val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
        val stream = new KafkaStream[K,V](
          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
        (queue, stream)

    val dirs = new ZKGroupDirs(config.groupId)
    registerConsumerInZK(dirs, consumerIdString, topicCount)
    reinitializeConsumer(topicCount, queuesAndStreams)

    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]

Queue was created here with the size config.queuedMaxMessages

/** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/
  val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks)
  val MaxQueuedChunks = 2

The default queue can only have a maximum of 2 FetchedDataChunk
The maximum number of messages in each FetchedDataChunk is the fetchSize size, which is 1024*1024
In other words, the default maximum number of messages in chunkQueue per consumer thread is 210241024

When this number is exceeded, enquue will block, thus forming the control of the fetch speed of the entire fetch.

MaxLag of ConsumerFetcherManager

To make this valuable, it is to modify the value of fetch.message.max.bytes to be smaller. such as


Then only 10 messages are pulled at a time, assuming the current lag is as follows

Group  Topic                Pid Offset          logSize         Lag             Owner
mgroup mtopic              0   353             8727            8374            demo-1514550322182-6d67873d-0
mgroup mtopic              1   258             8702            8444            demo-1514550322182-6d67873d-1
mgroup mtopic              2   307             8615            8308            demo-1514550322182-6d67873d-2

After pulling once

                    val newOffset = messages.shallowIterator.toSeq.lastOption match {
                      case Some(m: MessageAndOffset) => m.nextOffset
                      case None => currentOffset.get
                    partitionMap.put(topicAndPartition, newOffset)
                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset

NextOffset = offset+1 here, that is, the maximum offset+1 pulled back = 259, hw is 8702, then lag value is 8702-259=8443
Here, for the sake of reappearance, let the consumer thread pull one and then throw the exception to exit.


The following parameters shall be configured according to the message size and environment memory in the production environment, otherwise OOM will easily be triggered.

  • Queued.max.message.chunks, which is 2 by default, controls the capacity of chunkQueue.
  • Fetch.message.max.bytes, default 1024*1024, controls the maximum number of messages per chunk

In addition, regarding the MaxLag of ConsumerFetcherManager, only if the above two parameters are reasonably set can it be of some help to the monitoring (The smaller chunkQueue is, the more it can reflect the lag of consumer consumption from MaxLag. Otherwise, it can only reflect the lag of message pulling of clientfetchthread; However, if the setting is too small, it has to be pulled frequently, which affects consumer consumption and can be adjusted according to the situation.)。 Judging from the actual scene, if the parameters are generally changed less, then the lag value of ConsumerOffsetChecker can be used to monitor the lag of consumer consumption accurately.