Talking about the monitoring of kafka consumer offset lag

  kafka

Order

This article mainly discusses the monitoring of kafka consumer offset lag.

Programme

  • Using official class libraries

ConsumerOffsetChecker
ConsumerGroupCommand

  • Using the official JMX

ConsumerOffsetChecker

In version 0.8.2.2, it is as follows
kafka_2.10-0.8.2.2-sources.jar! /kafka/tools/ConsumerOffsetChecker.scala

object ConsumerOffsetChecker extends Logging {

  private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
  private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()
  private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()

  private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
    //...
  }

  private def processPartition(zkClient: ZkClient,
                               group: String, topic: String, pid: Int) {
    //...
  }

  private def processTopic(zkClient: ZkClient, group: String, topic: String) {
    topicPidMap.get(topic) match {
      case Some(pids) =>
        pids.sorted.foreach {
          pid => processPartition(zkClient, group, topic, pid)
        }
      case None => // ignore
    }
  }

  private def printBrokerInfo() {
    println("BROKER INFO")
    for ((bid, consumerOpt) <- consumerMap)
      consumerOpt match {
        case Some(consumer) =>
          println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
        case None => // ignore
      }
  }

  def main(args: Array[String]) {
    //...
    try {
      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)

      val topicList = topics match {
        case Some(x) => x.split(",").view.toList
        case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir +  "/owners").toList
      }

      topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)
      val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
      val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)

      debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
      channel.send(OffsetFetchRequest(group, topicPartitions))
      val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
      debug("Received offset fetch response %s.".format(offsetFetchResponse))

      offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
        if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
          val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
          // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
          // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
          try {
            val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
            offsetMap.put(topicAndPartition, offset)
          } catch {
            case z: ZkNoNodeException =>
              if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
                offsetMap.put(topicAndPartition,-1)
              else
                throw z
          }
        }
        else if (offsetAndMetadata.error == ErrorMapping.NoError)
          offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
        else {
          println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
        }
      }
      channel.disconnect()

      println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
      topicList.sorted.foreach {
        topic => processTopic(zkClient, group, topic)
      }

      if (options.has("broker-info"))
        printBrokerInfo()

      for ((_, consumerOpt) <- consumerMap)
        consumerOpt match {
          case Some(consumer) => consumer.close()
          case None => // ignore
        }
    }
    catch {
      case t: Throwable =>
        println("Exiting due to: %s.".format(t.getMessage))
    }
    finally {
      for (consumerOpt <- consumerMap.values) {
        consumerOpt match {
          case Some(consumer) => consumer.close()
          case None => // ignore
        }
      }
      if (zkClient != null)
        zkClient.close()

      if (channel != null)
        channel.disconnect()
    }
  }
}

The disadvantage is that this class is called to the command line, and every time it is called, the zkClient is called new. for monitoring purposes, it is not suitable. it needs to be modified and extracted from the zkClient.

ConsumerGroupCommand

The above version 0.8.2.2 uses ConsumerGroupCommand instead of ConsumerOffsetChecker.
kafka_2.11-0.10.2.1-sources.jar! /kafka/admin/ConsumerGroupCommand.scala

object ConsumerGroupCommand extends Logging {
  //...
  def main(args: Array[String]) {
    val opts = new ConsumerGroupCommandOptions(args)

    if (args.length == 0)
      CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")

    // should have exactly one action
    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
    if (actions != 1)
      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")

    opts.checkArgs()

    val consumerGroupService = {
      if (opts.useOldConsumer) {
        System.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).\n")
        new ZkConsumerGroupService(opts)
      } else {
        System.err.println("Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).\n")
        new KafkaConsumerGroupService(opts)
      }
    }

    try {
      if (opts.options.has(opts.listOpt))
        consumerGroupService.listGroups().foreach(println(_))
      else if (opts.options.has(opts.describeOpt)) {
        val (state, assignments) = consumerGroupService.describeGroup()
        val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head
        assignments match {
          case None =>
            // applies to both old and new consumer
            printError(s"The consumer group '$groupId' does not exist.")
          case Some(assignments) =>
            if (opts.useOldConsumer)
              printAssignment(assignments, false)
            else
              state match {
                case Some("Dead") =>
                  printError(s"Consumer group '$groupId' does not exist.")
                case Some("Empty") =>
                  System.err.println(s"Consumer group '$groupId' has no active members.")
                  printAssignment(assignments, true)
                case Some("PreparingRebalance") | Some("AwaitingSync") =>
                  System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
                  printAssignment(assignments, true)
                case Some("Stable") =>
                  printAssignment(assignments, true)
                case other =>
                  // the control should never reach here
                  throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
              }
        }
      }
      else if (opts.options.has(opts.deleteOpt)) {
        consumerGroupService match {
          case service: ZkConsumerGroupService => service.deleteGroups()
          case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")
        }
      }
    } catch {
      case e: Throwable =>
        printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
    } finally {
      consumerGroupService.close()
    }
  }
}

It is also designed based on the command line.

JMX

This is JMX data written by kafka itself, so you don’t need to use ConsumerOffsetChecker to connect yourself and get it. such as

            ObjectName oName = new ObjectName("kafka.producer:*");
            Set<ObjectName> metricsBeans = mBeanServer.queryNames(oName, null);
            for (ObjectName mBeanName : metricsBeans) {
                MBeanInfo metricsBean = mBeanServer.getMBeanInfo(mBeanName);
                    MBeanAttributeInfo[] metricsAttrs = metricsBean.getAttributes();
                    for (MBeanAttributeInfo metricsAttr : metricsAttrs) {
                           //get value
                           Object value = mBeanServer.getAttribute(mBeanName, metricsAttr.getName());
                          //process ...
                    }
            }

Summary

You can modify ConsumerOffsetChecker or ConsumerGroupCommand yourself, and then report to statsd or Prometheus. Of course, it is the most convenient to use JMX.

doc