Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Desarrolle consumidores con Java KCL
Requisitos previos
Antes de empezar a utilizar la KCL versión 3.x, asegúrese de disponer de lo siguiente:
-
Java Development Kit (8JDK) o posterior
-
AWS SDK for Java 2.x
-
Maven o Gradle para la gestión de dependencias
KCLrecopila métricas de CPU utilización, como la CPU utilización del host de cómputo en el que están trabajando los trabajadores, para equilibrar la carga y lograr un nivel uniforme de utilización de los recursos entre los trabajadores. KCLPara poder recopilar las métricas de CPU utilización de los trabajadores, debe cumplir los siguientes requisitos previos:
Amazon Elastic Compute Cloud(AmazonEC2)
-
Su sistema operativo debe ser Linux.
-
Debe habilitarlo IMDSv2en su EC2 instancia.
Amazon Elastic Container Service (AmazonECS) en Amazon EC2
-
Su sistema operativo debe ser Linux.
-
Debe habilitar la versión 4 del punto final de metadatos de ECS tareas.
-
La versión ECS del agente de contenedores de Amazon debe ser 1.39.0 o posterior.
Amazon ECS en AWS Fargate
-
Debe habilitar la versión 4 del punto final de metadatos de tareas de Fargate. Si usa la versión 1.4.0 o posterior de la plataforma Fargate, está habilitada de forma predeterminada.
-
Plataforma Fargate versión 1.4.0 o posterior.
Amazon Elastic Kubernetes Service (Amazon) en Amazon EKS EC2
-
Su sistema operativo debe ser Linux.
Amazon EKS en AWS Fargate
-
Plataforma Fargate 1.3.0 o posterior.
importante
Si KCL no puede recopilar las métricas de CPU utilización de los trabajadores, KCL recurrirá a utilizar el rendimiento por trabajador para asignar los arrendamientos y equilibrar la carga entre los trabajadores de la flota. Para obtener más información, consulte ¿Cómo se asignan los arrendamientos a los trabajadores y se equilibra la carga KCL.
Instale y añada dependencias
Si usas Maven, agrega la siguiente dependencia a tu pom.xml
archivo. Asegúrese de reemplazar la versión 3.x.x por la última versión. KCL
<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>
Si usas Gradle, agrega lo siguiente a tu archivo. build.gradle
Asegúrate de reemplazar la versión 3.x.x por la última versión. KCL
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
Puede buscar la última versión de KCL en el repositorio central de Maven
Implementar el consumidor
Una aplicación KCL para consumidores consta de los siguientes componentes clave:
Componentes principales
RecordProcessor
RecordProcessor es el componente principal en el que reside la lógica empresarial para procesar los registros de transmisión de datos de Kinesis. Define el modo en que la aplicación procesa los datos que recibe de la transmisión de Kinesis.
Responsabilidades clave:
-
Inicialice el procesamiento de un fragmento
-
Procese lotes de registros de la transmisión de Kinesis
-
Cierre el procesamiento de un fragmento (por ejemplo, cuando el fragmento se divide o se fusiona, o cuando la concesión se transfiere a otro host)
-
Controle los puntos de control para realizar un seguimiento del progreso
A continuación se muestra un ejemplo de implementación:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }
La siguiente es una explicación detallada de cada método utilizado en el ejemplo:
inicializar () InitializationInput initializationInput
-
Propósito: configurar los recursos o estados necesarios para procesar los registros.
-
Cuándo se llama: una vez, cuando se KCL asigna un fragmento a este procesador de registros.
-
Puntos clave:
-
initializationInput.shardId()
: El ID del fragmento que gestionará este procesador. -
initializationInput.extendedSequenceNumber()
: El número de secuencia desde el que se empezará a procesar.
-
processRecords(ProcessRecordsInput processRecordsInput)
-
Finalidad: procesar los registros entrantes y, opcionalmente, comprobar el progreso de los puntos.
-
Cuando se llama: repetidamente, siempre y cuando el procesador de registros sea el propietario del arrendamiento del fragmento.
-
Puntos clave:
-
processRecordsInput.records()
: Lista de registros que se van a procesar. -
processRecordsInput.checkpointer()
: Se utiliza para comprobar el progreso. -
Asegúrese de haber gestionado todas las excepciones durante el procesamiento para evitar que se KCL produzcan errores.
-
Este método debe ser ideal, ya que el mismo registro puede procesarse más de una vez en algunos escenarios, como los datos que no han sido objeto de controles antes de que un trabajador se bloquee o se reinicie inesperadamente.
-
Vacíe siempre los datos almacenados en el búfer antes de realizar los controles para garantizar la coherencia de los datos.
-
leaseLost(LeaseLostInput leaseLostInput)
-
Propósito: Limpiar todos los recursos específicos para procesar este fragmento.
-
Cuándo se llama: cuando otro programador se hace cargo del arrendamiento de este fragmento.
-
Puntos clave:
-
Los puntos de control no están permitidos en este método.
-
shardEnded(ShardEndedInput shardEndedInput)
-
Finalidad: Finalizar el procesamiento de este fragmento y punto de control.
-
Cuándo se llama: cuando el fragmento se divide o se fusiona, lo que indica que se han procesado todos los datos de este fragmento.
-
Puntos clave:
-
shardEndedInput.checkpointer()
: Se utiliza para realizar el punto de control final. -
Los puntos de control utilizados en este método son obligatorios para completar el procesamiento.
-
Si no se vacían los datos y se comprueba aquí, es posible que se pierdan los datos o se duplique el procesamiento cuando se vuelva a abrir el fragmento.
-
shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)
-
Propósito: Controlar y limpiar los recursos cuando se está apagando. KCL
-
Cuándo se llama: Cuándo KCL se cierra (por ejemplo, cuando la aplicación se cierra).
-
Puntos clave:
-
shutdownRequestedInput.checkpointer()
: Se utiliza para realizar controles antes de apagarla. -
Asegúrese de haber implementado los puntos de control en el método para guardar el progreso antes de que la aplicación se detenga.
-
Si no se vacían los datos y el punto de control aquí, se podrían perder los datos o volver a procesar los registros cuando se reinicie la aplicación.
-
importante
KCL3.x garantiza un menor reprocesamiento de los datos cuando el contrato de arrendamiento se transfiere de un trabajador a otro mediante un control antes de cerrar al trabajador anterior. Si no implementa la lógica de puntos de control en el shutdownRequested()
método, no verá este beneficio. Asegúrese de haber implementado una lógica de puntos de control dentro del shutdownRequested()
método.
RecordProcessorFactory
RecordProcessorFactory es responsable de crear nuevas RecordProcessor instancias. KCLusa esta fábrica para crear una nueva RecordProcessor para cada fragmento que la aplicación necesite procesar.
Responsabilidades clave:
-
Cree nuevas RecordProcessor instancias bajo demanda
-
Asegúrese de que cada una RecordProcessor esté inicializada correctamente
El siguiente es un ejemplo de implementación:
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }
En este ejemplo, la fábrica crea una nueva SampleRecordProcessor cada vez que se llama a shardRecordProcessor (). Puede ampliarlo para incluir cualquier lógica de inicialización necesaria.
Programador
El programador es un componente de alto nivel que coordina todas las actividades de la KCL aplicación. Es responsable de la organización general del procesamiento de datos.
Responsabilidades clave:
-
Gestione el ciclo de vida de RecordProcessors
-
Gestione la gestión de arrendamientos de fragmentos
-
Coordine los puntos de control
-
Equilibre la carga de procesamiento de fragmentos entre varios trabajadores de su aplicación
-
Gestione correctamente las señales de cierre y cierre de las aplicaciones
El programador normalmente se crea e inicia en la aplicación principal. Puede consultar el ejemplo de implementación de Scheduler en la siguiente sección, Aplicación principal para el consumidor.
Aplicación principal para el consumidor
La aplicación principal para el consumidor une todos los componentes. Es responsable de configurar el KCL consumidor, crear los clientes necesarios, configurar el programador y gestionar el ciclo de vida de la aplicación.
Responsabilidades clave:
-
Configurar clientes de AWS servicio (Kinesis, DynamoDB,) CloudWatch
-
Configure la aplicación KCL
-
Cree e inicie el Scheduler
-
Gestione el cierre de la aplicación
El siguiente es un ejemplo de implementación:
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }
KCLcrea un consumidor Fan-Out (EFO) mejorado con un rendimiento dedicado de forma predeterminada. Para obtener más información sobre la salida de ventilación mejorada, consulte. Desarrolle consumidores con una distribución mejorada con un rendimiento dedicado Si tiene menos de 2 consumidores o no necesita retrasos de propagación de la lectura inferiores a 200 ms, debe establecer la siguiente configuración en el objeto programador para utilizar consumidores de rendimiento compartido:
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
El siguiente código es un ejemplo de cómo crear un objeto planificador que utiliza consumidores de rendimiento compartido:
Importaciones:
import software.amazon.kinesis.retrieval.polling.PollingConfig;
Código:
Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/