Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Instalación de bibliotecas SerDe
nota
Requisitos previos: antes de completar los siguientes pasos, deberá tener un clúster de Amazon Managed Streaming for Apache Kafka (Amazon MSK) o Apache Kafka en ejecución. Sus productores y consumidores deben ejecutarse en Java 8 o superior.
Las bibliotecas SerDe proporcionan un marco para serializar y deserializar datos.
Instalará el serializador de código abierto para sus aplicaciones que producen datos (colectivamente los “serializadores”). El serializador controla la serialización, la compresión y la interacción con Schema Registry. El serializador extrae automáticamente el esquema de un registro que se está escribiendo en un destino compatible con Schema Registry, como Amazon MSK. Del mismo modo, instalará el deserializador de código abierto en sus aplicaciones que consumen datos.
Para instalar las bibliotecas en productores y consumidores:
Dentro de los archivos pom.xml de los productores y consumidores, agregue esta dependencia a través del siguiente código:
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>
También puede clonar el repositorio Github de AWS Glue Schema Registry
. Configure sus productores con estas propiedades requeridas:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
Si no hay esquemas existentes, debe activarse el registro automático (siguiente paso). Si tiene un esquema que desea aplicar, reemplace “my-schema (mi esquema)” con su nombre de esquema. También debe proporcionarse el “registry-name (nombre del registro)” si el registro automático del esquema está desactivado. Si el esquema se crea como “default-registry (registro predeterminado)”, entonces el nombre del registro se puede omitir.
(Opcional) configure cualquiera de estas propiedades de productor opcionales. Para ver descripciones detalladas de propiedades, consulte el archivo ReadMe (Léame)
. props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
El registro automático registra la versión del esquema en el registro predeterminado (“default-registry”). Si no se especifica un
SCHEMA_NAME
en el paso anterior, entonces el nombre del tema se infiere como elSCHEMA_NAME
.Consulte Compatibilidad y control de versiones de esquemas para obtener más información sobre los modos de compatibilidad.
Configure sus consumidores con estas propiedades obligatorias:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Región de AWS props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
(Opcional) configure estas propiedades de consumidores opcionales. Para ver descripciones detalladas de propiedades, consulte el archivo ReadMe (Léame)
. properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario