注記
前提条件: 次のステップを実行する前に、Amazon Managed Streaming for Apache Kafka (Amazon MSK) または Apache Kafka クラスターを起動しておく必要があります。使用するプロデューサーとコンシューマーは、Java 8 以上で実行する必要があります。
SerDe ライブラリは、データのシリアル化と非シリアル化のためのフレームワークを提供します。
データを生成するアプリケーション (総称してシリアライザ) 用の、オープンソースのシリアライザをインストールします。シリアライザは、シリアル化、圧縮、および Schema Registry とのやり取りを処理します。シリアライザは、Schema Registry 対応の送信先 (Amazon MSK など) に書き込まれるレコードから、スキーマを自動的に抽出します。同様に、データを利用するアプリケーションには、オープンソースのデシリアライザをインストールします。
プロデューサとコンシューマにライブラリをインストールするには
プロデューサとコンシューマ両方の pom.xml ファイルの中で、以下のコードにより依存関係を追加します。
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>
または、AWS Glue Schema Registry GitHub リポジトリ
からクローンを作成することもできます。 次の必須プロパティを使用してプロデューサをセットアップします。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
既存のスキーマがない場合は、自動登録を有効にします (次のステップ)。適用できるスキーマがある場合は、「my-schema」をそのスキーマ名に置き換えます。また、スキーマの自動登録が無効になっている場合は、「registry-name」を指定する必要もあります。スキーマが「default-registry」の下に作成されている場合は、レジストリ名を省略できます。
(オプション) これらのオプションのプロデューサープロパティのいずれかを設定します。プロパティの詳細については、ReadMe ファイル
を参照してください。 props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
自動登録では、スキーマのバージョンがデフォルトのレジストリ (default-registry) に登録されます。
SCHEMA_NAME
が前のステップで指定されていない場合、トピック名はSCHEMA_NAME
として推定されます。互換モードの詳細については、「スキーマのバージョニングと互換性」を参照してください。
以下の必須プロパティを使用してコンシューマを設定します。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS リージョン props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
(オプション) これらのオプションのコンシューマプロパティを設定します。プロパティの詳細については、ReadMe ファイル
を参照してください。 properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario