Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Installation de SerDe bibliothèques
Note
Conditions préalables : Avant d'effectuer les étapes suivantes, vous devez avoir un cluster Amazon Managed Streaming for Apache Kafka (AmazonMSK) ou Apache Kafka en cours d'exécution. Vos applications producteur et consommateur doivent utiliser Java 8 ou une version supérieure.
Les SerDe bibliothèques fournissent un cadre pour la sérialisation et la désérialisation des données.
Vous allez installer le sérialiseur open source pour vos applications produisant des données (collectivement les « sérialiseurs »). Le sérialiseur gère la sérialisation, la compression et l'interaction avec le registre de schémas. Le sérialiseur extrait automatiquement le schéma d'un enregistrement en cours d'écriture vers une destination compatible avec le registre des schémas, telle qu'Amazon. MSK De même, vous allez installer le désérialiseur open source sur vos applications consommant des données.
Pour installer les bibliothèques sur les applications producteur et consommateur :
Dans les fichiers pom.xml des applications producteur et consommateur, ajoutez cette dépendance via le code ci-dessous :
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>
Vous pouvez également cloner le référentiel Github du registre de schémas AWS Glue
. Configurez vos applications producteur avec les propriétés requises suivantes :
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
S'il n'existe aucun schéma existant, l'enregistrement automatique doit être activé (étape suivante). Si vous avez un schéma que vous souhaitez appliquer, remplacez « my-schema » par le nom de votre schéma. La valeur « registry-name » doit également être fournie si l'enregistrement automatique du schéma est désactivé. Si le schéma est créé sous la valeur « default-registry », le nom du registre peut être omis.
(Facultatif) Définissez l'une de ces propriétés de producteur facultatives. Pour une description détaillée des propriétés, consultez le ReadMe fichier
. props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
L'enregistrement automatique enregistre la version du schéma sous le registre par défaut (« default-registry »). Si une valeur
SCHEMA_NAME
n'est pas spécifiée à l'étape précédente, le nom de la rubrique est alors déduit comme étantSCHEMA_NAME
.Pour de plus amples informations sur les modes de compatibilité, veuillez consulter Gestion des versions et compatibilité des schémas.
Configurez vos applications consommateur avec les propriétés requises suivantes :
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Région AWS props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
(Facultatif) Définissez ces propriétés d'application consommateur facultatives. Pour une description détaillée des propriétés, consultez le ReadMe fichier
. properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario