Kafka Offset Monitor

一个监控 kafka 消费者进度及其相对于队列的滞后情况的小程序。「A little app to monitor the progress of kafka consumers and their lag wrt the queue.」

Github星跟蹤圖

卡夫卡偏移监控器

很遗憾!原 repo: https://github.com/quantifind/KafkaOffsetMonitor 在 GitHub 上已经 "404" 了。请换用其 Fork 版或寻找其他替代品。

这是一款用于监控 kafka 消费者及其在队列中的位置(偏移量)的应用程序。

您可以看到当前的消费者组、每个组正在消费的主题以及该组在每个主题队列中的位置。这有助于了解从队列中消费的速度以及队列的增长速度。这有助于调试 kafka 生产者和消费者,或了解系统中正在发生的事情。

该应用程序会记录队列位置和消费者滞后的历史,这样你就能对过去几天发生的事情有一个总体了解。

主要指標

概覽
名稱與所有者quantifind/KafkaOffsetMonitor
主編程語言Java
編程語言Scala (語言數: 10)
平台
許可證Other
所有者活动
創建於2013-09-19 13:17:22
推送於2020-10-13 23:24:55
最后一次提交2013-10-25 04:18:26
發布數41
最新版本名稱 (發布於 )
第一版名稱 (發布於 )
用户参与
星數2.7k
關注者數235
派生數709
提交數716
已啟用問題?
問題數490
打開的問題數215
拉請求數46
打開的拉請求數11
關閉的拉請求數23
项目设置
已啟用Wiki?
已存檔?
是復刻?
已鎖定?
是鏡像?
是私有?

Kafka Offset Monitor

Build Status

This is an app to monitor your kafka consumers and their position (offset) in the queue.

You can see the current consumer groups, for each group the topics that they are consuming and the position of the group in each topic queue. This is useful to understand how quick you are consuming from a queue and how fast the queue is growing. It allows for debuging kafka producers and consumers or just to have an idea of what is going on in your system.

The app keeps an history of queue position and lag of the consumers so you can have an overview of what has happened in the last days.

Here are a few screenshots:

List of Consumer Groups

Consumer Groups

List of Topics for a Group

Topic List

History of Topic position

Position Graph

Offset Types

Kafka is flexible on how the offsets are managed. Consumer can choose arbitrary storage and format to persist offsets. KafkaOffsetMonitor currently
supports following popular storage formats

  • zookeeper built-in high-level consumer (based on Zookeeper)
  • kafka built-in offset management API (based on Kafka internal topic)
  • Storm Kafka Spout (based on Zookeeper by default)

Each runtime instance of KafkaOffsetMonitor can only support a single type of storage format.

Running It

If you do not want to build it manually, just download the current jar.

This is a small webapp, you can run it locally or on a server, as long as you have access to the ZooKeeper nodes controlling kafka.

java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --offsetStorage kafka
     --zk zk-server1,zk-server2 \
     --port 8080 \
     --refresh 10.seconds \
     --retain 2.days

The arguments are:

  • offsetStorage valid options are ''zookeeper'', ''kafka'' or ''storm''. Anything else falls back to ''zookeeper''
  • zk the ZooKeeper hosts
  • port on what port will the app be available
  • refresh how often should the app refresh and store a point in the DB
  • retain how long should points be kept in the DB
  • dbName where to store the history (default 'offsetapp')
  • kafkaOffsetForceFromStart only applies to ''kafka'' format. Force KafkaOffsetMonitor to scan the commit messages from start (see notes below)
  • stormZKOffsetBase only applies to ''storm'' format. Change the offset storage base in zookeeper, default to ''/stormconsumers'' (see notes below)
  • pluginsArgs additional arguments used by extensions (see below)

Special Notes on Kafka Format

With Kafka built-in offset management API, offsets are saved in an internal topic ''__consumer_offsets'' as ''commit'' messages. Because there is no place
to directly query existing consumers, KafkaOffsetMonitor needs to ''discover'' consumers by examining those ''commit'' messages. If consumers are active,
KafkaOffsetMonitor could just listen to new ''commit'' messages and active consumers should be ''discovered'' after a short while. If in case you want to
see the consumers without much load, you can use flag '''kafkaOffsetForceFromStart''' to scan all ''commit'' messages.

Special Notes on Storm Storage

By default, Storm Kafka Spout stores offsets in ZK in a directory specified via ''SpoutConfig''. At same time, Kafka also stores its meta-data inside zookeeper.
In order to monitor Storm Kafka Spout offsets, KafkaOffsetMonitor requires that:

  • Spout and Kafka use the same zookeeper cluster
  • Spout stores the offsets under a sub-directory of Kafka's meta-data directory

This sub-directory can be configured via ''stormZKOffsetBase''. The default value is ''/stormconsumers''

Writing and using plugins

Kafka Offset Monitor allows you to plug-in additional offset info reporters in case you want this information to be logged or stored somewhere. In order to write your own plugin,
all you need to do is to implement OffsetInfoReporter trait:

trait OffsetInfoReporter {
  def report(info: IndexedSeq[OffsetInfo])
  def cleanupOldData() = {}
}

It is also required, that implementation has a constructor with String as the only parameter, and this parameter will be set to pluginsArgs argument value.
Its up to you how you want to utilize this argument and configure your plugin.

When building a plugin you may find it difficult to set up dependency to Kafka Offset Monitor classes, as currently artifacts are not published to public repos.
As long as this is true you will need to use local maven repo and just publish Kafka Offset Monitor artifact with: sbt publishM2

Assuming you have a custom implementation of OffsetInfoReporter in a jar file, running it is as simple as adding the jar to the classpath when running app:

java -cp KafkaOffsetMonitor-assembly-0.3.0.jar:kafka-offset-monitor-another-db-reporter.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --zk zk-server1,zk-server2 \
     --port 8080 \
     --refresh 10.seconds \
     --retain 2.days
     --pluginsArgs anotherDbHost=host1,anotherDbPort=555

For complete working example you can check kafka-offset-monitor-graphite, a plugin reporting offset information to Graphite.

Contributing

The KafkaOffsetMonitor is released under the Apache License and we welcome any contributions within this license. Any pull request is welcome and will be reviewed and merged as quickly as possible.

Because this open source tool is released by Quantifind as a company, if you want to submit a pull request, you will have to sign the following simple contributors agreement: