

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Desarrollar consumidores con KCL
<a name="develop-kcl-consumers"></a>

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones de consumo que procesen datos de los flujos de datos de Kinesis.

KCL está disponible en varios lenguajes. En este tema se describe cómo desarrollar consumidores de KCL en lenguajes Java y otros distintos a Java.
+ Para ver la referencia de Javadoc de la Kinesis Client Library, consulte [Javadoc de Amazon Kinesis Client Library](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html).
+ Para descargar KCL para Java desde GitHub, consulte la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) para Java.
+ Para localizar la KCL para Java en Apache Maven, consulte el [Repositorio central de KCL Maven](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client).

**Topics**
+ [Desarrollar consumidores con KCL en Java](develop-kcl-consumers-java.md)
+ [Desarrollar consumidores con KCL en lenguajes distintos de Java](develop-kcl-consumers-non-java.md)

# Desarrollar consumidores con KCL en Java
<a name="develop-kcl-consumers-java"></a>

## Requisitos previos
<a name="develop-kcl-consumers-java-prerequisites"></a>

Antes de comenzar con KCL 3.x, asegúrese de que dispone de lo siguiente:
+ Java Development Kit (JDK) 8 o posterior,
+ AWS SDK para Java 2.x
+ Maven o Gradle para la administración de dependencias.

KCL recopila métricas de uso de la CPU, como el uso de la CPU del host de cómputo en el que se ejecutan los procesos de trabajo para equilibrar la carga y lograr un nivel de uso de recursos uniforme entre ellos. Para permitir que KCL recopile métricas de uso de CPU desde los procesos de trabajo, debe cumplir los siguientes requisitos previos:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ El sistema operativo debe ser Linux.
+ Debe habilitarlo [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)en su instancia EC2.

 **Amazon Elastic Container Service (Amazon ECS) en Amazon EC2**
+ El sistema operativo debe ser Linux.
+ Debe habilitar la [versión 4 del punto de conexión de metadatos de tareas de ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La versión del agente de contenedor de Amazon ECS debe ser 1.39.0 o posterior.

 **Amazon ECS en AWS Fargate**
+ Debe habilitar la [versión 4 del punto de conexión de metadatos de tareas de Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). Si utiliza la versión 1.4.0 o una posterior de la plataforma Fargate, se habilitará de forma predeterminada. 
+ Versión de la plataforma de Fargate 1.4.0 o posterior.

 **Amazon Elastic Kubernetes Service (Amazon EKS) en Amazon EC2** 
+ El sistema operativo debe ser Linux.

 **Amazon EKS en AWS Fargate**
+ Plataforma de Fargate 1.3.0 o posterior.

**importante**  
Si KCL no puede recopilar las métricas de uso de la CPU de los procesos de trabajo, volverá a utilizar el rendimiento por proceso de trabajo para asignar los arrendamientos y equilibrar la carga entre los procesos de trabajo de la flota. Para obtener más información, consulte [Cómo KCL asigna los arrendamientos a los procesos de trabajo y equilibra la carga](kcl-dynamoDB.md#kcl-assign-leases).

## Instale y agrege dependencias
<a name="develop-kcl-consumers-java-installation"></a>

Si está utilizando Maven, agregue la siguiente dependencia a su archivo `pom.xml`. Asegúrese de reemplazar la versión 3.x.x por la versión más reciente de KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Si está utilizando Gradle, agregue lo siguiente a su archivo `build.gradle`. Asegúrese de reemplazar la versión 3.x.x por la versión más reciente de KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Puede buscar la versión más reciente del KCL en el [Repositorio central de Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Implementar el consumidor
<a name="develop-kcl-consumers-java-implemetation"></a>

Una aplicación de consumidor de KCL consta de los siguientes componentes clave:

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Programador](#implementation-scheduler)
+ [Aplicación de consumo principal](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor es el componente principal en el que reside la lógica empresarial para procesar los registros de transmisión de datos de Kinesis. Define la forma en que procesa la aplicación los datos que recibe del flujo de Kinesis.

Responsabilidades principales:
+ Inicializar el procesamiento de una partición
+ Procesar lotes de registros del flujo de Kinesis
+ Cerrar el procesamiento de una partición (por ejemplo, cuando la partición se divide o fusiona, o cuando el arrendamiento se transfiere a otro host)
+ Controlar el registro de puntos de control para realizar un seguimiento del progreso

A continuación, se muestra un ejemplo de implementación:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Ahora se explicará con detalle cada método utilizado en el ejemplo:

**inicializar (InitializationInput) InitializationInput**
+ Objetivo: configurar los recursos o estados necesarios para procesar los registros.
+ Cuándo se llama: una vez, cuando KCL asigna una partición a este procesador de registros.
+ Puntos clave:
  + `initializationInput.shardId()`: el ID de la partición que gestionará este procesador.
  + `initializationInput.extendedSequenceNumber()`: el número de secuencia desde el que se iniciará el procesamiento.

**ProcessRecords () ProcessRecordsInput processRecordsInput**
+ Objetivo: procesar los registros entrantes y, de manera opcional, comprobar el progreso del punto de control.
+ Cuando se llama: repetidamente, siempre y cuando el procesador de registros sea el propietario del arrendamiento de la partición.
+ Puntos clave:
  + `processRecordsInput.records()`: lista de registros que se van a procesar.
  + `processRecordsInput.checkpointer()`: se utiliza para comprobar el progreso.
  + Asegúrese de haber gestionado cualquier excepción durante el procesamiento para evitar que la KCL falle.
  + Este método debe ser idempotente, ya que el mismo registro puede procesarse más de una vez en algunos escenarios, por ejemplo, cuando los datos no se han registrado en puntos de control antes de que el proceso de trabajo fallara o reinicie de forma inesperada.
  + Vacíe siempre los datos almacenados en el búfer antes de registrar los puntos de control para garantizar la coherencia de datos.

**Arrendamiento perdido () LeaseLostInput leaseLostInput**
+ Objetivo: limpiar cualquier recurso específico para procesar esta partición.
+ Cuándo se llama: cuando otro programador se hace cargo del arrendamiento de esta partición.
+ Puntos clave:
  + El registro de puntos de control no está permitido en este método.

**Fragmentado () ShardEndedInput shardEndedInput**
+ Objetivo: finalizar el procesamiento de esta partición y este punto de control.
+ Cuándo se llama: cuando la partición se divide o se fusiona, lo que indica que se han procesado todos los datos de esta partición.
+ Puntos clave:
  + `shardEndedInput.checkpointer()`: se utiliza para realizar el registro final de puntos de control.
  + El registro de puntos de control de este método es obligatorio para completar el procesamiento.
  + Si no se vacían los datos y puntos de control, es posible que se pierdan los datos o se duplique el procesamiento cuando se vuelva a abrir la partición.

**Cierre solicitado () ShutdownRequestedInput shutdownRequestedInput**
+ Objetivo: registrar un punto de control y limpiar los recursos cuando KCL se está cerrando.
+ Cuándo se llama: cuando KCL se cierra (por ejemplo, cuando la aplicación se cierra).
+ Puntos clave:
  + `shutdownRequestedInput.checkpointer()`: se utiliza para realizar el registro de puntos de control antes del cierre.
  + Asegúrese de haber implementado el registro de puntos de control en el método para guardar el progreso antes de que la aplicación se detenga.
  + Si no se vacían los datos y puntos de control, se podrían perder los datos o volver a procesar los registros cuando se reinicie la aplicación.

**importante**  
KCL 3.x garantiza un menor reprocesamiento de datos cuando el arrendamiento se transfiere de un proceso de trabajo a otro mediante puntos de control antes de que el proceso de trabajo anterior se cierre. Si no implementa la lógica de registro de puntos de control en el método `shutdownRequested()`, no verá este beneficio. Asegúrese de haber implementado una lógica de registro de puntos de control dentro del método `shutdownRequested()`.

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory es responsable de crear nuevas RecordProcessor instancias. KCL usa esta fábrica para crear una nueva RecordProcessor para cada fragmento que la aplicación necesite procesar.

Responsabilidades principales:
+ Cree nuevas RecordProcessor instancias bajo demanda
+ Asegúrese de que cada una RecordProcessor esté inicializada correctamente

A continuación, se muestra un ejemplo de implementación:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

En este ejemplo, la fábrica crea una nueva SampleRecordProcessor cada vez que se llama a shardRecordProcessor (). Puede ampliarlo para incluir cualquier lógica de inicialización necesaria.

### Programador
<a name="implementation-scheduler"></a>

El programador es un componente de alto nivel que coordina todas las actividades de la aplicación KCL. Es responsable de la orquestación general del procesamiento de datos.

Responsabilidades principales:
+ Gestione el ciclo de vida de RecordProcessors
+ Gestionar la administración de los arrendamientos de particiones
+ Coordinar el registro de puntos de control
+ Equilibrar la carga de procesamiento de la partición entre varios procesos de trabajo de su aplicación
+ Gestionar correctamente las señales de cierre y cierre de las aplicaciones

Por lo general, el programar se crea en la aplicación principal y se inicia en ella. Puede consultar el ejemplo de implementación del programador en la siguiente sección, Aplicación de consumo principal 

### Aplicación de consumo principal
<a name="implementation-main"></a>

La aplicación de consumo principal une todos los componentes. Es responsable de configurar el consumo de KCL, crear los clientes necesarios, configurar el programador y administrar el ciclo de vida de la aplicación.

Responsabilidades principales:
+ Configurar clientes de AWS servicio (Kinesis, DynamoDB,) CloudWatch
+ Configurar la aplicación de KCL
+ Crear e iniciar el programador
+ Gestionar el apagado de aplicaciones

A continuación, se muestra un ejemplo de implementación:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 Por defecto, KCL crea un consumidor de distribución ramificada mejorada (EFO) con un rendimiento dedicado. Para obtener más información sobre la distribución ramificada mejorada, consulte [Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado](enhanced-consumers.md). Si tiene menos de 2 consumidores o no necesita retrasos de propagación de la lectura inferiores a 200 ms, debe establecer la siguiente configuración en el objeto del programador para utilizar consumidores de rendimiento compartido:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

El siguiente código es un ejemplo de cómo crear un objeto del programador que utiliza consumidores de rendimiento compartido:

**Importaciones:**

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Código**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Desarrollar consumidores con KCL en lenguajes distintos de Java
<a name="develop-kcl-consumers-non-java"></a>

En esta sección se describe la implementación de los consumidores que utilizan Kinesis Client Library (KCL) en Python, Node.js, .NET y Ruby.

KCL es una biblioteca de Java. El soporte para lenguajes distintos de Java se proporciona mediante una interfaz multilingüe llamada `MultiLangDaemon`. Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza con un lenguaje de KCL distinto de Java. Por tanto, si instala KCL para lenguajes distintos de Java y escribe completamente su aplicación de consumo en lenguajes distintos de Java, seguirá necesitando tener Java instalado en su sistema debido al `MultiLangDaemon`. Además, `MultiLangDaemon` tiene algunos ajustes predeterminados que podría tener que personalizar para su caso de uso (por ejemplo, la región de AWS a la que se conecta). Para obtener más información `MultiLangDaemon` sobre él GitHub, consulte el [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Si bien los conceptos principales siguen siendo los mismos en todos los lenguajes, existen algunas consideraciones e implementaciones específicas de cada uno. Para conocer los conceptos básicos sobre el desarrollo de los consumidores de KCL, consulte [Desarrollar consumidores con KCL en Java](develop-kcl-consumers-java.md). Para obtener información más detallada sobre cómo desarrollar consumidores de KCL en Python, Node.js, .NET y Ruby y las últimas actualizaciones, consulte los siguientes GitHub repositorios:
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**importante**  
No utilice las siguientes versiones de la biblioteca KCL que no sean de Java si utiliza JDK 8. Estas versiones contienen una dependencia (logback) que es incompatible con JDK 8.  
KCL Python 3.0.2 y 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
Recomendamos utilizar versiones publicadas antes o después de estas versiones afectadas cuando trabaje con JDK 8.