CSharpClient-for-Kafka

Apache Kafka协议的.Net实现,它通过Producer/Consumer类提供基本功能。 该项目还提供平衡的消费者实施。(.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.)

Github星跟蹤圖

This project will be archived to be in read-only mode as no new development/maintenance is expected. Please look at the Note section below for alternate libraries.

Note

This library was created for Kafka 0.8 with an intention to have a native library built from scratch.
With Kafka protocol getting updated frequently with new features (which is expected until it reaches version 1.0), it doesn't seem beneficial to maintain a library built from scratch.
The right approach (and as suggested by Confluent) for now would be to use a C# wrapper around the librdkafka C-Library, which the confluent-kafka-dotnet client is doing.

So, if you are using Kafka 0.9 or higher, please move to using the confluent-kafka-dotnet client library.

CSharpClient-for-Kafka

Join the chat at https://gitter.im/Microsoft/Kafkanet
.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.
The project is a fork from ExactTarget's Kafka-net Client.

Build CSharpClient-for-Kafka

  • Clone CSharpClient-for-Kafka through git clone https://github.com/Microsoft/CSharpClient-for-Kafka.git
  • Open src\KafkaNETLibraryAndConsole.sln in Visual Studio
  • Build Solution

Run Unit Tests

  • Open Test Window in Visual Studio: Test>Windows>Test Explorer
  • Run all

Using Console

  • Setup local Kafka and Zookeeper

Console Options

	topic                           Dump topics metadata, such as: earliest/latest offset, replica, ISR.
    consumesimple                   Consume data in single thread.
    consumegroup                    Monitor latest offset gap and speed of consumer group.
    consumegroupmonitor             Monitor latest offset gap and speed of consumer group.
    producesimple                   Produce data in single thread.
    produceperftest                 Produce data in multiple thread.
    eventserverperftest             Http Post data to event server in multiple thread.
    producemonitor                  Monitor latest offset.
    test                            Run some adhoc test cases.

Using the library

Producer

The Producer can send one message or an entire batch to Kafka. When sending a batch you can send to multiple topics at once

Producer Usage

var brokerConfig = new BrokerConfiguration()
{
    BrokerId = this.brokerId,
    Host = this.kafkaServerName,
    Port = this.kafkaPort
};
var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
kafkaProducer = new Producer(config);
// here you construct your batch or a single message object
var batch=ConstructBatch();
kafkaProducer.Send(batch);

Simple Consumer

The simple Consumer allows full control for retrieving data. You could instantiate a Consumer directly by providing a ConsumerConfiguration and then calling Fetch.
CSharpClient-for-Kafka has a higher level wrapper around Consumer which allows consumer reuse and other benefits

Consumer Usage

// create the Consumer higher level manager
var managerConfig = new KafkaSimpleManagerConfiguration()
{
    FetchSize = FetchSize,
    BufferSize = BufferSize,
    Zookeeper = m_zookeeper
};
m_consumerManager = new KafkaSimpleManager<int, Kafka.Client.Messages.Message>(managerConfig);
// get all available partitions for a topic through the manager
var allPartitions = m_consumerManager.GetTopicPartitionsFromZK(m_topic);
// Refresh metadata and grab a consumer for desired partitions
m_consumerManager.RefreshMetadata(0, m_consumerId, 0, m_topic, true);
var partitionConsumer = m_consumerManager.GetConsumer(m_topic, partitionId);

Balanced Consumer

The balanced consumer manages partition assignment for each instance in the same consumer group. Rebalance are triggered by zookeeper changes.

Balanced Consumer Usage

// Here we create a balanced consumer on one consumer machine for consumerGroupId. All machines consuming for this group will get balanced together
ConsumerConfiguration config = new ConsumerConfiguration
{
    AutoCommit = false,
    GroupId = consumerGroupId
    ConsumerId = uniqueConsumerId
    MaxFetchBufferLength = m_BufferMaxNoOfMessages,
    FetchSize = fetchSize,
    AutoOffsetReset = OffsetRequest.LargestTime,
    NumberOfTries = 20,
    ZooKeeper = new ZooKeeperConfiguration(zookeeperString, 30000, 30000, 2000)
};
var balancedConsumer = new ZookeeperConsumerConnector(config, true, m_ConsumerRebalanceHandler, m_ZKDisconnectHandler, m_ZKExpireHandler);
// grab streams for desired topics 
var streams = m_ZooKeeperConsumerConnector.CreateMessageStreams(m_TopicMap, new DefaultDecoder());
var KafkaMessageStream = streams[m_Topic][0];
// start consuming stream
foreach (Message message in m_KafkaMessageStream.GetCancellable(cancellationTokenSource.Token))
....

Contribute

Contributions to CSharpClient-for-Kafka are welcome. Here is how you can contribute to CSharpClient-for-Kafka:

概覽

名稱與所有者microsoft/CSharpClient-for-Kafka
主編程語言C#
編程語言C# (語言數: 1)
平台Windows
許可證Other
發布數0
創建於2016-02-18 22:41:46
推送於2018-10-11 21:14:08
最后一次提交2018-10-11 14:14:08
星數463
關注者數99
派生數95
提交數96
已啟用問題?
問題數45
打開的問題數33
拉請求數31
打開的拉請求數10
關閉的拉請求數5
已啟用Wiki?
已存檔?
是復刻?
已鎖定?
是鏡像?
是私有?
去到頂部