Availability-Monitor-for-Kafka

A tool to monitor Kafka availability and latency and report these metrics to SQL Server, JMX, CSV, SLF4J endpoints.

  • 所有者: microsoft/Availability-Monitor-for-Kafka
  • 平台:
  • 许可证: MIT License
  • 分类:
  • 主题:
  • 喜欢:
    0
      比较:

Github星跟踪图

Availability monitor for Kafka allows you to monitor the end to end availability and latency for sending and reading data from Kafka.
Producer Availability is defined as (Number of successful message sends accross all Topics and Partitions)/(Number of message send attempts)
Consumer Availability is defined as (Number of successfull tail data reads accross all Topics and Partitions)/(Number of read attempts)

The tool measures Producer availability and latency by sending canary messages to all partitions. The message prefix is "CanaryMessage_" and can be changed in the producerProperties.json resource file.
The tool measures Consumer availability and latency by reading data from the tail of all partitions.

Usage

  1. Clone the repository locally

    	git clone https://github.com/Microsoft/Availability-Monitor-for-Kafka.git
    
  2. Change the resource file(s) KafkaAvailability / src / main / resources / metadatamanagerProperties.json and add your zookeeperhost ip and ports

    	"zooKeeperHosts": "1.1.1.1:2181,2.2.2.2:2181",
    

Also modify the following files to set up SQL/CSV/SLF4J/JMX reporting. The frequency of sending canary messages, reading tail data and the frequency of reporting are all controlled by the commandline argument "sleep" in milliseconds. The default value is 300000 milliseconds (ie. 5 mins).
KafkaAvailability / src / main / resources / log4j.properties KafkaAvailability / src / main / resources / appProperties.json KafkaAvailability / src / main / resources / reporterProperties.json

  1. If using precompiled jar, inject the modified resource file(s) into the jar
    	jar uf KafkaAvailability-1.0-SNAPSHOT-jar-with-dependencies.jar metadatamanagerProperties.json
    

If assembling jar from source, if you want to report metrics to MS SQL Server, you must download and install the jdbc driver into your maven repo:
Visit the MSDN site for SQL Server and download the latest version of the JDBC driver for your operating system. Unzip the package. Open a command prompt and switch into the expanded directory where the jar file is located. Execute the following command. Be sure to modify the jar file name and version as necessary
mvn install:install-file -Dfile=sqljdbc4.jar -Dpackaging=jar -DgroupId=com.microsoft.sqlserver -DartifactId=sqljdbc4 -Dversion=4.0
If you are not interested in SQL Server reporting, please remove the dependency on com.microsoft.sqlserver from the pom file before assembling the jar.
Finally, assemble jar using maven:
mvn clean compile assembly:single

  1. Run the assembled jar like this:
    	java.exe -jar KafkaAvailability-1.0-SNAPSHOT-jar-with-dependencies.jar -c ClusterName
    
  2. If you are logging to SQL Server, create tables in your db with the following names:
	[dbo].[Consumer.Availability]
	[dbo].[Consumer.Latency]
	[dbo].[Consumer.Topic.Latency]
	[dbo].[Consumer.Partition.Latency]
	[dbo].[Producer.Availability]
	[dbo].[Producer.Latency]
	[dbo].[Producer.Topic.Latency]
	[dbo].[Producer.Partition.Latency]

The availability and latency tables have the following schemas:

	CREATE TABLE [dbo].[Consumer.Availability](
	    [Environment] [varchar](100) NOT NULL,
		[Timestamp] [datetime] NOT NULL,
		[Tag] [varchar](100) NOT NULL,
		[Availability] [float] NOT NULL
	)
	CREATE TABLE [dbo].[Consumer.Latency](
		[Environment] [varchar](100) NOT NULL,
		[Timestamp] [datetime] NOT NULL,
		[Tag] [varchar](100) NOT NULL,
		[count] [float] NOT NULL,
		[max] [float] NOT NULL,
		[mean] [float] NOT NULL,
		[min] [float] NOT NULL,
		[stddev] [float] NOT NULL,
		[median] [float] NOT NULL,
		[75perc] [float] NOT NULL,
		[95perc] [float] NOT NULL,
		[98perc] [float] NOT NULL,
		[99perc] [float] NOT NULL,
		[999perc] [float] NOT NULL
	)

Errors can be logged using log4j (configure log4j.properties under resources) with this schema:

CREATE TABLE [dbo].[Errors](
	[USER_ID] [varchar](100) NOT NULL,
	[DATED] [datetime] NOT NULL,
	[LOGGER] [varchar](100) NOT NULL,
	[LEVEL] [varchar](100) NOT NULL,
	[MESSAGE] [varchar](max) NOT NULL
)
  1. Google Guice is used to dynamically manage reporters that can be used. At runtime, Guice will find all methods that have @ProvidesIntoMap annotation and return ScheduledReporter object.

Each reporter is named with @StringMapKey annotation, all these reporters will be injected into a map, key being reporter name and value being ScheduledReporter objects.

    @ProvidesIntoMap
    @StringMapKey("consoleReporter")
    public ScheduledReporter consoleReporter() {

        return ConsoleReporter
                .forRegistry(metricRegistry)
                .convertRatesTo(TimeUnit.SECONDS)
                .convertDurationsTo(TimeUnit.SECONDS)
                .build();
    }

To make use of the injected map, annotate a constructor with @Inject annotation and put Map<String, ScheduledReporter> as a variable.

Reporters actually being used is configured by "reportersCommaSeparatedList" field of reporterProperties.json file.

    @Inject
    public ScheduledReporterCollector(AppProperties appProperties, ReporterProperties reporterProperties,
                                      MetricRegistry metricRegistry,
                                      Map<String, ScheduledReporter> allReporters) throws Exception {
        ...
    }

This package is also designed to support plugin pattern of reporters, meaning reporters can be put into separate Java packages, reflection is used to find all Guice Module classes and scan them.

The benefit of doing so is to allow changing details of reporters without changing the main package.

To make use of this feature, define new module class in reporter package and annotate reporter provider methods with @ProvidesIntoMap and @StringMapKey as example above.

Runtime variables can be passed to reporter package through Guice, see below as an example.

       @Override
       protected void configure() {
           bind(String.class).annotatedWith(Names.named(ENVIRONMENT_NAME_CONSTANT_NAME)).toInstance(appProperties.environmentName);
           bind(String.class).annotatedWith(Names.named(STATSD_ENDPOINT_CONSTANT_NAME)).toInstance(reporterProperties.statsdEndpoint);
           bind(Integer.class).annotatedWith(Names.named(STATSD_PORT_CONSTANT_NAME)).toInstance(reporterProperties.statsdPort);
           bind(String.class).annotatedWith(Names.named(METRICS_NAMESPACE_CONSTANT_NAME)).toInstance(reporterProperties.metricsNamespace);
       }

Access these variables in reporter package through injection.

@ProvidesIntoMap
    @StringMapKey("mdmStatsdReporter")
    public ScheduledReporter mdmStatsdReporter(MetricRegistry metricRegistry, Statsd statsd,
                                               @Named("environmentName") String environmentName,
                                               @Named("metricsNamespace") String namespace) {

        return ReporterFactory.createMDMStatsdReporterWithStatd(metricRegistry, statsd, environmentName, namespace);
    }

Community

  • The Availability-Monitor-for-Kafka project welcomes contributions. To contribute, follow the instructions in CONTRIBUTING.md

  • Options to ask your question to the Availability-Monitor-for-Kafka community

License

License

Availability Monitor for Kafka is licensed under the MIT license. See LICENSE file for full license information.

主要指标

概览
名称与所有者microsoft/Availability-Monitor-for-Kafka
主编程语言Java
编程语言Java (语言数: 3)
平台
许可证MIT License
所有者活动
创建于2016-02-15 19:49:49
推送于2023-04-17 01:20:53
最后一次提交2022-11-28 19:13:34
发布数0
用户参与
星数133
关注者数25
派生数35
提交数127
已启用问题?
问题数3
打开的问题数1
拉请求数39
打开的拉请求数4
关闭的拉请求数6
项目设置
已启用Wiki?
已存档?
是复刻?
已锁定?
是镜像?
是私有?