注意
先决条件:完成以下步骤之前,您需要先运行 Amazon Managed Streaming for Apache Kafka(Amazon MSK)或 Apache Kafka 集群。您的创建器和使用器需要在 Java 8 或更高版本上运行。
Serde 库提供用于序列化和反序列化数据的框架。
您将为生成数据的应用程序(统称为“序列化程序”)安装开源序列化程序。序列化程序处理序列化、压缩以及与架构注册表的交互。序列化程序自动从正在写入架构注册表兼容目标(如 Amazon MSK)的记录中提取架构。同样,您将在使用数据的应用程序上安装开源反序列化程序。
在创建器和使用器上安装库:
在创建器和使用器的 pom.xml 文件中,通过下面的代码添加此依赖关系:
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>
或者,您可以克隆 AWS Glue 架构注册表
。 使用这些必填属性设置您的创建器:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
如果没有现有架构,则需要打开自动注册(下一步)。如果您确实有要应用的架构,则将“my-schema”替换为您的架构名称。此外,如果架构自动注册处于关闭状态,则必须提供“registry-name”。如果架构在“default-registry”下创建,则可以省略注册表名称。
(可选)设置这些可选的创建器属性。有关详细属性说明,请参阅自述文件
。 props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
自动注册会在默认注册表(“default-registry”)下注册架构版本。如果上一步未指定
SCHEMA_NAME
,则主题名称被推断为SCHEMA_NAME
。有关兼容性模式的更多信息,请参阅架构版本控制和兼容性。
使用这些必填属性设置您的使用器:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS 区域 props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
(可选)设置这些可选的使用器属性。有关详细属性说明,请参阅自述文件
。 properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario