本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
注意
必要條件:在完成下列步驟前,您必須擁有執行中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或 Apache Kafka 叢集。您的生產者和消費者需要在 Java 8 或更高版本上執行。
SerDe 程式庫提供序列化和還原序列化資料的架構。
您將為產生資料的應用程式安裝開源序列化程式 (統稱為「序列化程式」)。序列化程式會處理序列化、壓縮以及與結構描述登錄檔的互動。序列化程式會自動從寫入結構描述登錄檔相容目的地的記錄擷取結構描述,例如 Amazon MSK。同樣地,您將在使用資料的應用程式上安裝開源還原序列化程式。
若要在生產者和消費者上安裝程式庫:
在生產者和消費者的 pom.xml 檔案中,透過下面的程式碼新增此相依性:
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>
或者,您也可以複製 AWS Glue 結構描述登錄檔 Github 儲存庫
。 使用這些必要屬性設定您的生產者:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
如果沒有現有的結構描述,則需要開啟自動註冊 (下一個步驟)。如果您確實有一個想套用的結構描述,那麼請用您的結構描述名稱替換「my-schema」。如果結構描述自動註冊關閉,則也必須提供「registry-name」。如果結構描述是在「default-registry」下建立的,則登錄檔名稱可以省略。
(選用) 設定這些選用生產者屬性中的任何一個。如需詳細的屬性說明,請參閱讀我檔案
。 props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
自動註冊會在預設登錄檔 (「default-registry」) 下註冊結構描述版本。如果在上一個步驟中未指定
SCHEMA_NAME
,則主題名稱會被推斷為SCHEMA_NAME
。如需相容性模式的詳細資訊,請參閱結構描述版本控制和相容性。
使用下列必要屬性設定您的消費者:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS 區域 props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
(選用) 設定這些選用的消費者屬性。如需詳細的屬性說明,請參閱讀我檔案
。 properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario