Desenvolva consumidores com KCL Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolva consumidores com KCL Java

Pré-requisitos

Antes de começar a usar o KCL 3.x, verifique se você tem o seguinte:

  • Java Development Kit (8JDK) ou posterior

  • AWS SDK for Java 2. x

  • Maven ou Gradle para gerenciamento de dependências

KCLcoleta métricas de CPU utilização, como a CPU utilização do host de computação em que os trabalhadores estão executando, para equilibrar a carga e alcançar um nível uniforme de utilização de recursos entre os trabalhadores. Para permitir KCL a coleta de métricas de CPU utilização dos trabalhadores, você deve atender aos seguintes pré-requisitos:

Amazon Elastic Compute Cloud(AmazonEC2)

  • Seu sistema operacional deve ser Linux OS.

  • Você deve habilitar IMDSv2em sua EC2 instância.

Amazon Elastic Container Service (AmazonECS) na Amazon EC2

Amazon ECS em AWS Fargate

  • Você deve habilitar a versão 4 do endpoint de metadados de tarefas Fargate. Se você usa a plataforma Fargate versão 1.4.0 ou posterior, isso é ativado por padrão.

  • Plataforma Fargate versão 1.4.0 ou posterior.

Amazon Elastic Kubernetes Service (Amazon) na Amazon EKS EC2

  • Seu sistema operacional deve ser Linux OS.

Amazon EKS em AWS Fargate

  • Plataforma Fargate 1.3.0 ou posterior.

Importante

Se KCL não puder coletar métricas de CPU utilização dos trabalhadores, KCL voltará a usar a produtividade por trabalhador para atribuir arrendamentos e equilibrar a carga entre os trabalhadores da frota. Para obter mais informações, consulte Como KCL atribui arrendamentos aos trabalhadores e equilibra a carga.

Instalar e adicionar dependências

Se você estiver usando o Maven, adicione a seguinte dependência ao seu pom.xml arquivo. Certifique-se de ter substituído 3.x.x pela versão mais recenteKCL.

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Se você estiver usando o Gradle, adicione o seguinte ao seu build.gradle arquivo. Certifique-se de ter substituído 3.x.x pela versão mais recenteKCL.

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

Você pode verificar a versão mais recente do KCL no Repositório Central do Maven.

Implementar o consumidor

Um aplicativo para KCL consumidores consiste nos seguintes componentes principais:

RecordProcessor

RecordProcessor é o componente principal em que reside sua lógica de negócios para processar registros de stream de dados do Kinesis. Ele define como seu aplicativo processa os dados que recebe do stream do Kinesis.

Principais responsabilidades:

  • Inicializar o processamento de um fragmento

  • Processe lotes de registros do stream do Kinesis

  • Encerrar o processamento de um fragmento (por exemplo, quando o fragmento é dividido ou mesclado, ou quando a concessão é entregue a outro host)

  • Controle o controle para acompanhar o progresso

Veja a seguir um exemplo de implementação:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

Veja a seguir uma explicação detalhada de cada método usado no exemplo:

inicializar () InitializationInput initializationInput

  • Objetivo: configurar todos os recursos ou estados necessários para processar registros.

  • Quando é chamado: Uma vez, quando KCL atribui um fragmento a esse processador de registro.

  • Principais pontos:

    • initializationInput.shardId(): o ID do fragmento que esse processador manipulará.

    • initializationInput.extendedSequenceNumber(): O número de sequência a partir do qual iniciar o processamento.

processRecords(ProcessRecordsInput processRecordsInput)

  • Objetivo: processar os registros recebidos e, opcionalmente, verificar o progresso do ponto.

  • Quando é chamado: Repetidamente, desde que o processador de registros mantenha o contrato de arrendamento do fragmento.

  • Principais pontos:

    • processRecordsInput.records(): Lista de registros a serem processados.

    • processRecordsInput.checkpointer(): Usado para verificar o progresso.

    • Certifique-se de ter tratado todas as exceções durante o processamento para KCL evitar falhas.

    • Esse método deve ser idempotente, pois o mesmo registro pode ser processado mais de uma vez em alguns cenários, como dados que não foram verificados antes de falhas ou reinicializações inesperadas do trabalhador.

    • Sempre limpe todos os dados armazenados em buffer antes do ponto de verificação para garantir a consistência dos dados.

leaseLost(LeaseLostInput leaseLostInput)

  • Objetivo: limpar todos os recursos específicos para processar esse fragmento.

  • Quando é chamado: quando outro Scheduler assume a concessão desse fragmento.

  • Principais pontos:

    • O checkpoint não é permitido neste método.

shardEnded(ShardEndedInput shardEndedInput)

  • Objetivo: Concluir o processamento desse fragmento e ponto de verificação.

  • Quando é chamado: quando o fragmento é dividido ou mesclado, indicando que todos os dados desse fragmento foram processados.

  • Principais pontos:

    • shardEndedInput.checkpointer(): Usado para realizar a verificação final.

    • A verificação nesse método é obrigatória para concluir o processamento.

    • Deixar de liberar os dados e o ponto de verificação aqui pode resultar na perda de dados ou no processamento duplicado quando o fragmento for reaberto.

shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)

  • Objetivo: Verificar e limpar os recursos quando KCL estiver desligado.

  • Quando é chamado: Quando KCL está sendo encerrado, por exemplo, quando o aplicativo está sendo encerrado).

  • Principais pontos:

    • shutdownRequestedInput.checkpointer(): Usado para realizar o checkpoint antes do desligamento.

    • Certifique-se de ter implementado o ponto de verificação no método para que o progresso seja salvo antes que o aplicativo pare.

    • A falha na liberação dos dados e do ponto de verificação aqui pode resultar na perda de dados ou no reprocessamento de registros quando o aplicativo for reiniciado.

Importante

KCLO 3.x garante menos reprocessamento de dados quando o contrato é entregue de um trabalhador para outro por meio de um ponto de verificação antes que o funcionário anterior seja encerrado. Se você não implementar a lógica de ponto de verificação no shutdownRequested() método, não verá esse benefício. Certifique-se de ter implementado uma lógica de ponto de verificação dentro do shutdownRequested() método.

RecordProcessorFactory

RecordProcessorFactory é responsável pela criação de novas RecordProcessor instâncias. KCLusa essa fábrica para criar um novo RecordProcessor para cada fragmento que o aplicativo precisa processar.

Principais responsabilidades:

  • Crie novas RecordProcessor instâncias sob demanda

  • Certifique-se de que cada um RecordProcessor esteja inicializado corretamente

Veja a seguir um exemplo de implementação:

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

Neste exemplo, a fábrica cria um novo SampleRecordProcessor cada vez que shardRecordProcessor () é chamado. Você pode estender isso para incluir qualquer lógica de inicialização necessária.

Scheduler

O Scheduler é um componente de alto nível que coordena todas as atividades do KCL aplicativo. É responsável pela orquestração geral do processamento de dados.

Principais responsabilidades:

  • Gerencie o ciclo de vida do RecordProcessors

  • Gerencie o gerenciamento de arrendamento de fragmentos

  • Ponto de verificação coordenado

  • Equilibre a carga de processamento de fragmentos entre vários funcionários do seu aplicativo

  • Gerencie sinais de desligamento e encerramento de aplicativos sem problemas

Normalmente, o agendador é criado e iniciado no aplicativo principal. Você pode verificar o exemplo de implementação do Scheduler na seção a seguir, Main Consumer Application.

Aplicação principal do consumidor

O aplicativo principal do consumidor une todos os componentes. É responsável por configurar o KCL consumidor, criar os clientes necessários, configurar o Agendador e gerenciar o ciclo de vida do aplicativo.

Principais responsabilidades:

  • Configurar clientes AWS de serviço (Kinesis, DynamoDB,) CloudWatch

  • Configurar o KCL aplicativo

  • Crie e inicie o Scheduler

  • Gerenciar o desligamento do aplicativo

Veja a seguir um exemplo de implementação:

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

KCLcria um consumidor Enhanced Fan-out (EFO) com taxa de transferência dedicada por padrão. Para obter mais informações sobre o Enhanced Fan-out, consulte. Desenvolva consumidores de distribuição aprimorados com taxa de transferência dedicada Se você tiver menos de 2 consumidores ou não precisar de atrasos de propagação de leitura abaixo de 200 ms, defina a seguinte configuração no objeto do agendador para usar consumidores de taxa de transferência compartilhada:

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

O código a seguir é um exemplo de criação de um objeto agendador que usa consumidores de taxa de transferência compartilhada:

Importações:

import software.amazon.kinesis.retrieval.polling.PollingConfig;

Código:

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/