nota
Pré-requisitos: antes de concluir as etapas a seguir, é necessário ter um cluster do Amazon Managed Streaming for Apache Kafka (Amazon MSK) ou do Apache Kafka em execução. Seus produtores e consumidores precisam estar em execução em Java 8 ou superior.
As bibliotecas SerDe fornecem um framework para serialização e desserialização de dados.
Você instalará o serializador de código aberto para suas aplicações que produzem dados (coletivamente os “serializadores”). O serializador manipula serialização, compactação e interação com o registro de esquemas. O serializador extrai automaticamente o esquema de um registro sendo gravado em um destino compatível com o registro de esquemas, como o Amazon MSK. Da mesma forma, você instalará o desserializador de código aberto em suas aplicações que consomem dados.
Para instalar as bibliotecas em produtores e consumidores:
Dentro dos arquivos pom.xml, tanto dos produtores quanto dos consumidores, adicione essa dependência com o código abaixo:
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>
Como alternativa, você pode clonar o repositório do registro de esquemas do AWS Glue do GitHub
. Configure seus produtores com estas propriedades obrigatórias:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"
Se não houver esquemas existentes, o registro automático precisará ser ativado (próxima etapa). Se você tiver um esquema que gostaria de aplicar, substitua “my-schema” (meu esquema) pelo nome do seu esquema. Além disso, o “registry-name” (nome do registro) deve ser fornecido se o registro automático do esquema estiver desativado. Se o esquema é criado sob o “default-registry” (registro padrão), o nome do registro pode ser omitido.
(Opcional) defina qualquer uma destas propriedades opcionais do produtor. Para obter descrições detalhadas das propriedades, consulte o arquivo ReadMe
. props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed
O registro automático registra a versão do esquema no registro padrão (“default-registry”). Se um
SCHEMA_NAME
não for especificado na etapa anterior, então o nome do tópico será inferido comoSCHEMA_NAME
.Consulte Versionamento e compatibilidade de esquema, para obter mais informações sobre os modos de compatibilidade.
Configure seus consumidores com estas propriedades obrigatórias:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Região da AWS props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
(Opcional) defina essas propriedades opcionais do consumidor. Para obter descrições detalhadas das propriedades, consulte o arquivo ReadMe
. properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario