选择您的 Cookie 首选项

我们使用必要 Cookie 和类似工具提供我们的网站和服务。我们使用性能 Cookie 收集匿名统计数据,以便我们可以了解客户如何使用我们的网站并进行改进。必要 Cookie 无法停用,但您可以单击“自定义”或“拒绝”来拒绝性能 Cookie。

如果您同意,AWS 和经批准的第三方还将使用 Cookie 提供有用的网站功能、记住您的首选项并显示相关内容,包括相关广告。要接受或拒绝所有非必要 Cookie,请单击“接受”或“拒绝”。要做出更详细的选择,请单击“自定义”。

安装 SerDe 库

聚焦模式
安装 SerDe 库 - AWS Glue
注意

先决条件:完成以下步骤之前,您需要先运行 Amazon Managed Streaming for Apache Kafka(Amazon MSK)或 Apache Kafka 集群。您的创建器和使用器需要在 Java 8 或更高版本上运行。

Serde 库提供用于序列化和反序列化数据的框架。

您将为生成数据的应用程序(统称为“序列化程序”)安装开源序列化程序。序列化程序处理序列化、压缩以及与架构注册表的交互。序列化程序自动从正在写入架构注册表兼容目标(如 Amazon MSK)的记录中提取架构。同样,您将在使用数据的应用程序上安装开源反序列化程序。

在创建器和使用器上安装库:

  1. 在创建器和使用器的 pom.xml 文件中,通过下面的代码添加此依赖关系:

    <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>

    或者,您可以克隆 AWS Glue 架构注册表

  2. 使用这些必填属性设置您的创建器:

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"

    如果没有现有架构,则需要打开自动注册(下一步)。如果您确实有要应用的架构,则将“my-schema”替换为您的架构名称。此外,如果架构自动注册处于关闭状态,则必须提供“registry-name”。如果架构在“default-registry”下创建,则可以省略注册表名称。

  3. (可选)设置这些可选的创建器属性。有关详细属性说明,请参阅自述文件

    props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed

    自动注册会在默认注册表(“default-registry”)下注册架构版本。如果上一步未指定 SCHEMA_NAME,则主题名称被推断为 SCHEMA_NAME

    有关兼容性模式的更多信息,请参阅架构版本控制和兼容性

  4. 使用这些必填属性设置您的使用器:

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS 区域 props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
  5. (可选)设置这些可选的使用器属性。有关详细属性说明,请参阅自述文件

    properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario

下一主题:

创建注册表

上一主题:

开始使用
隐私网站条款Cookie 首选项
© 2025, Amazon Web Services, Inc. 或其附属公司。保留所有权利。