Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
catatan
Prasyarat: Sebelum menyelesaikan langkah-langkah berikut, Anda harus menjalankan cluster ( Amazon Managed Streaming for Apache Kafka AmazonMSK) atau Apache Kafka. Produsen dan konsumen Anda harus berjalan pada Java 8 atau yang lebih tinggi.
SerDe Pustaka menyediakan kerangka kerja untuk serialisasi dan deserialisasi data.
Anda akan menginstal serializer sumber terbuka untuk aplikasi Anda yang menghasilkan data (secara kolektif “"-serializer"). Serializer menangani serialisasi, kompresi, dan interaksi dengan Registri Skema. Serializer secara otomatis mengekstrak skema dari catatan yang ditulis ke tujuan yang kompatibel dengan Schema Registry, seperti Amazon. MSK Demikian juga, Anda akan menginstal deserializer sumber terbuka pada aplikasi Anda yang mengkonsumsi data.
Untuk menginstal perpustakaan pada produsen dan konsumen:
Di dalam file pom.xml baik dari produsen dan konsumen, tambahkan dependensi ini melalui kode di bawah ini:
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>
Atau, Anda dapat mengkloning Repositori Github Registri Skema AWS Glue
. Siapkan produsen Anda dengan properti yang diperlukan ini:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
Jika tidak ada skema yang ada, maka pendaftaran otomatis harus Anda aktifkan (langkah berikutnya). Jika Anda memiliki skema yang ingin Anda terapkan, maka ganti "my-schema" dengan nama skema Anda. Selain itu, "registry-name" harus disediakan jika skema pendaftaran otomatis dimatikan. Jika skema dibuat menggunakan "default-registry", maka nama registri dapat dihilangkan.
(Opsional) Mengatur salah satu properti produsen opsional ini. Untuk deskripsi properti terperinci, lihat ReadMe file
. props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
Pendaftaran otomatis mendaftarkan versi skema dengan menggunakan registri default ("default-registry"). Jika
SCHEMA_NAME
tidak ditentukan dalam langkah sebelumnya, maka nama topik disimpulkan sebagaiSCHEMA_NAME
.Lihat Versi skema dan kompatibilitas untuk informasi lebih lanjut tentang mode kompatibilitas.
Siapkan konsumen Anda dengan properti yang diperlukan berikut ini:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Wilayah AWS props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
(Opsional) Mengatur properti konsumen opsional ini. Untuk deskripsi properti terperinci, lihat ReadMe file
. properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario