Schema Registry

Kafka 的 Confluent Schema 注册表。(Confluent Schema Registry for Kafka)

Github stars Tracking Chart

Schema Registry

Confluent Schema Registry 为您的元数据提供了一个服务层。它提供了一个 RESTful 接口,用于存储和检索 Avro®、JSON Schema 和 Protobuf 模式。它根据指定的主题名称策略存储所有模式的版本历史,提供多种兼容性设置,并允许根据配置的兼容性设置和对这些模式类型的扩展支持来演化模式。它提供了插入 Apache Kafka® 客户端的序列化器,这些序列化器可以处理以任何支持的格式发送的 Kafka 消息的模式存储和检索。

文档

这里有几个 Confluent 文档中的 Schema Registry 页面链接:

快速启动 API 使用实例

以下假设你有 Kafka 和一个使用默认设置运行的 Schema Registry 实例。这些例子以及更多的例子也可以在 docs.confluent.io 上的 API 使用实例 中找到。

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key/versions
  {"id":1}

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
     http://localhost:8081/subjects/Kafka-value/versions
  {"id":1}

# List all subjects
$ curl -X GET http://localhost:8081/subjects
  ["Kafka-value","Kafka-key"]

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  [1]

# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
  {"schema":"\"string\""}

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
  3

# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1, 2, 3, 4, 5]

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key
  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  {"is_compatible":true}

# Get top level config
$ curl -X GET http://localhost:8081/config
  {"compatibilityLevel":"BACKWARD"}

# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://localhost:8081/config
  {"compatibility":"NONE"}

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-value
  {"compatibility":"BACKWARD"}

安装

作为 Confluent 平台 的一部分,您可以下载预制版本的模式注册表。要从源代码安装,请按照开发部分的说明进行安装。

部署

schema 注册表的 REST 接口包括一个内置的 Jetty 服务器。包装脚本 bin/schema-registry-start 和 bin/schema-registry-stop 是启动和停止服务的推荐方法。

开发

要建立一个开发版本,你可能需要一个开发版本的 commonrest-utils。安装这些之后,你可以用 Maven 构建 Schema Registry。

这个项目使用 Google Java 代码风格 来保持代码的干净和一致。

构建

mvn compile

运行单元和集成测试

mvn test

在本地 Kafka 集群中运行一个 Schema Registry 实例(使用 Kafka 的默认配置)。

mvn exec:java -pl :kafka-schema-registry -Dexec.args="config/schema-registry.properties"

创建一个打包的版本,可以选择跳过测试。

mvn package [-DskipTests]

这将在 package/target/kafka-schema-registry-package-$VERSION-package 中产生一个准备生产的版本,其目录布局与打包的二进制版本类似。

你也可以使用 standalone profile 制作一个独立的胖 JAR。

mvn package -P standalone [-DskipTests]

这将生成 pack/target/kafka-schema-registry-package-$VERSION-standalone.jar,其中也包括所有的依赖关系。

OpenAPI 规格

OpenAPI(原名 Swagger)规范是在编译阶段使用 swagger-maven-plugin 自动构建的。

贡献

感谢您的帮助,让我们的 Schema Registry 变得更好。

许可证

本项目采用 Confluent Community License 授权,除了客户端库采用 Apache 2.0 授权。详细的许可协议请参见每个子文件夹中的 LICENSE 文件。


Overview

Name With Ownerconfluentinc/schema-registry
Primary LanguageJava
Program languageJava (Language Count: 4)
PlatformDocker, Linux, Microsoft Azure
License:Other
Release Count15484
Last Release Namev7.8.0-202 (Posted on )
First Release Namev1.0 (Posted on )
Created At2014-12-09 22:38:11
Pushed At2024-04-19 23:32:25
Last Commit At2024-04-19 19:00:27
Stargazers Count2.1k
Watchers Count369
Fork Count1.1k
Commits Count13.1k
Has Issues Enabled
Issues Count1097
Issue Open Count284
Pull Requests Count1669
Pull Requests Open Count29
Pull Requests Close Count282
Has Wiki Enabled
Is Archived
Is Fork
Is Locked
Is Mirror
Is Private

Schema Registry

Schema Registry provides a serving layer for your metadata. It provides a
RESTful interface for storing and retrieving Avro schemas. It stores a versioned
history of all schemas, provides multiple compatibility settings and allows
evolution of schemas according to the configured compatibility setting. It
provides serializers that plug into Kafka clients that handle schema storage and
retrieval for Kafka messages that are sent in the Avro format.

Quickstart

The following assumes you have Kafka and an instance of the Schema Registry running using the default settings.

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key/versions
  {"id":1}

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
     http://localhost:8081/subjects/Kafka-value/versions
  {"id":1}

# List all subjects
$ curl -X GET http://localhost:8081/subjects
  ["Kafka-value","Kafka-key"]

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  [1]

# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
  {"schema":"\"string\""}

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
  3

# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1, 2, 3, 4, 5]

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key
  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  {"is_compatible":true}

# Get top level config
$ curl -X GET http://localhost:8081/config
  {"compatibilityLevel":"BACKWARD"}

# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://localhost:8081/config
  {"compatibility":"NONE"}

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-value
  {"compatibility":"BACKWARD"}

Installation

You can download prebuilt versions of the schema registry as part of the
Confluent Platform. To install from source,
follow the instructions in the Development section.

Deployment

The REST interface to schema registry includes a built-in Jetty server. The
wrapper scripts bin/schema-registry-start and bin/schema-registry-stop
are the recommended method of starting and stopping the service.

Development

To build a development version, you may need a development versions of
common and
rest-utils. After
installing these, you can build the Schema Registry
with Maven.

This project uses the Google Java code style
to keep code clean and consistent.

OpenAPI Spec

OpenAPI (formerly known as Swagger) specifications are built automatically using swagger-maven-plugin
on compile phase.

Contribute

License

The project is licensed under the Confluent Community License, except for client libs,
which is under the Apache 2.0 license.
See LICENSE file in each subfolder for detailed license agreement.

To the top