clickhouse_sinker
clickhouse_sinker 是一个 sinker 程序,可将 kafka 消息传输到 ClickHouse 中。
特性
- 易于使用和部署,您无需编写任何硬代码,只需关心配置文件
- 自定义解析器支持。
- 支持多个 sinker 任务,每个 sinker 任务并行运行。
- 支持多重 kafka 和 ClickHouse 集群。
- 批量插入(通过配置 bufferSize 和 flushInterval)。
- 循环写入(当某些节点崩溃时,它将重试将数据写入另一个健康的节点)
- 使用本机 ClickHouse 客户端-服务器(client-server)TCP 协议,具有比 HTTP 更高的性能。
安装并运行
通过二进制文件(建议)
从发行版下载二进制文件,根据您的环境选择可执行的二进制文件,修改conf文件,然后运行./clickhouse_sinker -conf conf
通过源代码
- 安装 Golang
- Go Get
go get -u github.com/housepower/clickhouse_sinker
- 构建及运行
make build ## 修改配置文件,设置配置目录,然后运行它 ./dist/clickhouse_sinker -conf conf
示例
- look at the integration test.
- there is a simple tutorial in Chinese which created by user @taiyang-li.
支持解析器(parsers)
- Json
- Csv
支持的数据类型
- UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
- Float32, Float64
- String
- FixedString
- DateTime(UInt32), Date(UInt16)
- Array(UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64)
- Array(Float32, Float64)
- Array(String)
- Array(FixedString)
- Array(DateTime(UInt32), Date(UInt16))
- Nullable
- ElasticDateTime => Int64 (2019-12-16T12:10:30Z => 1576498230)
配置
请参阅配置示例
自定义指标解析器(Custom metric parser)
您只需要自己实现解析器接口
type Parser interface { Parse(bs []byte) model.Metric }
请参阅 json 解析器
调试
echo '{"date": "2019-07-11T12:10:30Z", "level": "info", "message": "msg4"}' | kafkacat -b 127.0.0.1:9093 -P -t logstash clickhouse-client -q 'select * from default.logstash' 2019-12-16 info msg4 2019-07-11 info msg4 2015-05-11 info msg4 2019-07-11 info msg4