

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Uso de Kinesis Client Library
<a name="kcl"></a>

## ¿Qué es Kinesis Client Library?
<a name="kcl-library-what-is"></a>

Kinesis Client Library (KCL) es una biblioteca de software Java independiente diseñada para facilitar el proceso de consumo y procesamiento de datos de Amazon Kinesis Data Streams. KCL gestiona numerosas tareas complejas que están asociadas a la computación distribuida, lo que le permite a los desarrolladores implementar su lógica empresarial para procesar datos. Administra actividades como equilibrar la carga entre varios procesos de trabajo, responder a los fallos de los procesos de trabajo, integrar puntos de control de los registros procesados y responder a los cambios en la cantidad de particiones en el flujo.

KCL se actualiza con frecuencia para incorporar versiones más recientes de las bibliotecas subyacentes, mejoras de seguridad y correcciones de errores. Recomendamos utilizar la versión más reciente de KCL para evitar los problemas ya conocidos y beneficiarse de las mejoras actuales. Para encontrar la versión más reciente de KCL, consulte [KCL GitHub](https://github.com/awslabs/amazon-kinesis-client). 

**importante**  
Recomendamos utilizar la versión más reciente de KCL para evitar errores y problemas ya conocidos. Si utiliza KCL 2.6.0 o una versión anterior, actualice a KCL 2.6.1, o a una versión posterior, para evitar bloquear el procesamiento de las particiones cuando varía la capacidad del flujo (caso poco común). 
KCL es una biblioteca de Java. Support para lenguajes distintos de Java se proporciona mediante un daemon basado en Java llamado. MultiLangDaemon MultiLangDaemoninteractúa con la aplicación KCL a través de STDIN y STDOUT. Para obtener más información sobre el encendido, consulte. MultiLangDaemon GitHub [Desarrollar consumidores con KCL en lenguajes distintos de Java](develop-kcl-consumers-non-java.md)
No utilice las AWS SDK para Java versiones 2.27.19 a 2.27.23 con KCL 3.x. Estas versiones cuentan con un problema que provoca un error de excepción relacionado con el uso de DynamoDB por parte de KCL. Le recomendamos que utilice la AWS SDK para Java versión 2.28.0 o posterior para evitar este problema. 

## Características y ventajas clave de KCL
<a name="kcl-benefits"></a>

A continuación se indican las características claves y los beneficios relacionados de KCL:
+ **Escalabilidad**: KCL permite que las aplicaciones se escalen de forma dinámica al distribuir la carga de procesamiento entre varios procesos de trabajo. Puede escalar su aplicación hacia dentro o hacia fuera, de forma manual o mediante escalado automático, sin preocuparse por la redistribución de la carga.
+ **Equilibrador de carga**: KCL equilibra automáticamente la carga de procesamiento entre los procesos de trabajo disponibles, lo que resulta en una distribución uniforme del trabajo entre ellos.
+ **Puntos de control**: KCL administra el registro de los puntos de control de los registros procesados, lo que permite a las aplicaciones reanudar el procesamiento desde la última posición en la que se procesaron correctamente.
+ **Tolerancia a errores**: KCL proporciona mecanismos de tolerancia a errores integrados, lo que garantiza que el procesamiento de datos continúe incluso si los procesos de trabajo fallan. KCL también proporciona servicios de entrega. at-least-once
+ **Gestión de los cambios a nivel de flujo**: KCL se adapta a las divisiones y fusiones de particiones que pueden producirse debido a cambios en el volumen de datos. Mantiene el orden asegurándose de que las particiones secundarias se procesen solo después de que la partición principal se haya completado y registrado su punto de control.
+ **Supervisión**: KCL se integra con Amazon CloudWatch para la supervisión a nivel de consumidor.
+ **Soporte multilingüe**: KCL es compatible de forma nativa con Java y permite utilizar varios lenguajes de programación distintos de Java. MultiLangDaemon

# Conceptos de KCL
<a name="kcl-concepts"></a>

En esta sección se explican los conceptos básicos y las interacciones de Kinesis Client Library (KCL). Estos conceptos son esenciales para desarrollar y administrar las aplicaciones de consumo de KCL.
+ **Aplicación de consumo de KCL**: una aplicación creada a medida y diseñada para leer y procesar registros de flujos de datos de Kinesis mediante Kinesis Client Library.
+ **Proceso de trabajo**: las aplicaciones de consumo de KCL suelen estar distribuidas, y uno o más procesos de trabajo se ejecutan simultáneamente. KCL coordina los procesos de trabajo para que consuman los datos del flujo de manera distribuida y equilibra la carga de manera uniforme entre varios procesos de trabajo.
+ **Programador**: clase de alto nivel que utiliza un proceso de trabajo de KCL para empezar a procesar datos. Cada proceso de trabajo de KCL tiene un programador. El programador inicializa y supervisa diversas tareas, como la sincronización de la información sobre las particiones de los flujos de datos de Kinesis, el seguimiento de las asignaciones de las particiones entre los procesos de trabajo y el procesamiento de los datos desde el flujo según las particiones asignadas al proceso de trabajo. El programador puede adoptar diversas configuraciones que afectan al comportamiento de este, como el nombre del flujo que se va a procesar y las credenciales de AWS , El programador inicia la entrega de los registros de datos del flujo a los procesadores de registros.
+ **Procesador de registros**: define la lógica de la forma en que su aplicación de consumo de KCL procesa los datos que obtiene de los flujos de datos. Debe implementar su propia lógica de procesamiento de datos personalizada en el procesador de registros. Un proceso de trabajo de KCL crea una instancia de un programador. Luego, el programador crea una instancia de un procesador de registros por cada partición que mantiene un arrendamiento. Un proceso de trabajo puede ejecutar varios procesadores de registros.
+ **Arrendamiento**: define la asignación entre un proceso de trabajo y una partición. Las aplicaciones de consumo de KCL utilizan los arrendamientos para dividir el procesamiento de registro de datos entre varios procesos de trabajo. Cada partición está vinculada a un solo proceso de trabajo mediante un arrendamiento en un momento dado y cada proceso de trabajo puede mantener uno o más arrendamientos simultáneamente. Cuando un proceso de trabajo deja de mantener un arrendamiento debido a que se detuvo o ocurrió un error, KCL asigna a otro para que se quede con el arrendamiento. Para obtener más información sobre el arrendamiento, consulte la [Documentación de Github: Lease Lifecycle](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle).
+ **Tabla de arrendamiento**: es una tabla exclusiva de Amazon DynamoDB que se utiliza para realizar un seguimiento de todos los arrendamientos de la aplicación de consumo de KCL. Cada aplicación de consumo de KCL crea su propia tabla de arrendamiento. La tabla de arrendamiento se utiliza para mantener el estado de todos los procesos de trabajo y coordinar el procesamiento de los datos. Para obtener más información, consulte [Tablas de metadatos de DynamoDB y equilibrio de carga en KCL](kcl-dynamoDB.md).
+ **Registro de puntos de control**: es el proceso de almacenar de forma persistente en una partición la posición del último registro procesado correctamente. KCL administra el registro de puntos de control para garantizar que el procesamiento se pueda reanudar desde la última posición del punto de control en caso de que un proceso de trabajo falle o se reinicie la aplicación. Los puntos de control se almacenan en la tabla de arrendamiento de DynamoDB como parte de los metadatos del arrendamiento. Esto permite a los procesos de trabajo continuar procesando desde donde se detuvo el proceso de trabajo anterior.

# Tablas de metadatos de DynamoDB y equilibrio de carga en KCL
<a name="kcl-dynamoDB"></a>

KCL administra los metadatos, como los arrendamientos y las métricas de uso de la CPU de los procesos de trabajo. KCL rastrea estos metadatos mediante tablas de DynamoDB. Para cada aplicación de Amazon Kinesis Data Streams, KCL crea tres tablas de DynamoDB para administrar los metadatos: la tabla de arrendamiento, la tabla de métricas de procesos de trabajo y la de estado de coordinadores.

**nota**  
KCL 3.x presentó dos nuevas tablas de metadatos: las tablas de *métricas de procesos de trabajo* y de *estado de coordinadores*.

**importante**  
 Debe agregar los permisos adecuados para que las aplicaciones de KCL puedan crear y administrar tablas de metadatos en DynamoDB. Para obtener más información, consulte [Permisos de IAM necesarios para las aplicaciones de consumo de KCL](kcl-iam-permissions.md).  
La aplicación de consumo KCL no elimina automáticamente estas tres tablas de metadatos de DynamoDB. Asegúrese de eliminar estas tablas de metadatos creadas por la aplicación de consumo de KCL al retirar la aplicación de consumo para evitar costos innecesarios.

## Tabla de arrendamiento
<a name="kcl-leasetable"></a>

Una tabla de arrendamiento es una tabla exclusiva de Amazon DynamoDB que se utiliza para realizar un seguimiento de las particiones que los programadores de la aplicación de consumo de KCL están arrendando y procesando. Cada aplicación de consumo de KCL crea su propia tabla de arrendamiento. La KCL, por defecto, utiliza el nombre de la aplicación de consumo para el nombre de la tabla de arrendamiento. Puede definir un nombre de tabla personalizado con la configuración. KCL también crea un [índice secundario global](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html) en la tabla de arrendamiento con la clave de partición de leaseOwner para detectar los arrendamientos de manera eficiente. El índice secundario global refleja el atributo leaseKey de la tabla de arrendamientos base. Si la tabla de arrendamiento de la aplicación de consumo de KCL no existe cuando se inicia la aplicación, uno de los procesos de trabajo creará la tabla de arrendamiento para su aplicación.

Puede consultar la tabla con la [consola de Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) mientras se ejecuta la aplicación de consumo.

**importante**  
El nombre de cada aplicación de consumo de KCL debe ser único para evitar que se duplique el nombre de la tabla de arrendamiento. 
Se le realizará el cobro de los costos de su cuenta asociados a la tabla de DynamoDB, además de los costos propios asociados a Kinesis Data Streams. 

Cada fila de la tabla de arrendamiento representa una partición que procesan los programadores de su aplicación de consumo. Algunos de los campos claves son los siguientes:
+ **leaseKey**: este es el ID de partición para el procesamiento de un solo flujo. Para el procesamiento de varios flujos con KCL, se estructura como `account-id:StreamName:streamCreationTimestamp:ShardId`. leaseKey es la clave de partición de la tabla de arrendamiento. Para obtener más información sobre el procesamiento de varios flujos, consulte [Procesamiento de varios flujos con KCL](kcl-multi-stream.md).
+ **checkpoint:** el número secuencial de punto de comprobación más reciente del fragmento. 
+ **checkpointSubSequenceNúmero:** cuando se utiliza la función de agregación de la biblioteca de productores de Kinesis, se trata de una extensión del **punto de control** que rastrea los registros de los usuarios individuales dentro del registro de Kinesis.
+ **leaseCounter:** se utiliza para comprobar si un proceso de trabajo está procesando el arrendamiento de forma activa. leaseCounter aumenta si la propiedad del arrendamiento se transfiere a otro proceso de trabajo.
+ **leaseOwner:** el proceso de trabajo actual que mantiene este arrendamiento.
+ **ownerSwitchesSincePunto de control:** cuántas veces en este contrato de arrendamiento se han cambiado de trabajadores desde el último punto de control.
+ **parentShardId:** ID del padre de este fragmento. Se asegura de que la partición origen esté completamente procesada antes de que comience el procesamiento en las particiones secundarias, lo que mantiene el orden de procesamiento de registros correcto.
+ **childShardId:** Lista de fragmentos secundarios IDs resultantes de la división o fusión de este fragmento. Se utiliza para rastrear el linaje de la partición y administrar el orden de procesamiento durante las operaciones de repartición.
+ **startingHashKey:** El límite inferior del rango de claves hash de este fragmento.
+ **endingHashKey:** El límite superior del rango de claves de hash de este fragmento.

Si utiliza el procesamiento de varios flujos con KCL, verá los dos siguientes campos adicionales en la tabla de arrendamiento. Para obtener más información, consulte [Procesamiento de varios flujos con KCL](kcl-multi-stream.md).
+ **shardID:** ID de la partición.
+ **streamName:** identificador del flujo de datos en el siguiente formato: `account-id:StreamName:streamCreationTimestamp`.

## Tabla de métricas de procesos de trabajo
<a name="kcl-worker-metrics-table"></a>

La tabla de métricas de procesos de trabajo es una tabla única de Amazon DynamoDB para cada aplicación de KCL que se utiliza para registrar las métricas de uso de la CPU en cada proceso de trabajo. KCL utilizará estas métricas para realizar asignaciones de arrendamiento eficientes a fin de lograr un uso equilibrado de los recursos entre los procesos de trabajo. Por defecto, KCL utiliza `KCLApplicationName-WorkerMetricStats` para el nombre de la tabla de métricas de procesos de trabajo.

## Tabla de estado de coordinadores
<a name="kcl-coordinator-state-table"></a>

Una tabla de estado de coordinadores es una tabla de Amazon DynamoDB exclusiva para cada aplicación de KCL y se utiliza para almacenar información de estado interna de los procesos de trabajo. Por ejemplo, la tabla de estado de coordinadores almacena datos relacionados con la elección del líder o los metadatos asociados a la migración local de KCL 2.x a KCL 3.x. Por defecto, KCL utiliza `KCLApplicationName-CoordinatorState` como el nombre de la tabla de estado de coordinadores.

## Modo de capacidad de DynamoDB para las tablas de metadatos creadas por KCL
<a name="kcl-capacity-mode"></a>

Por defecto, Kinesis Client Library (KCL) crea tablas de metadatos de DynamoDB, como la tabla de arrendamiento, la tabla de métricas de procesos de trabajo y la tabla de estado de coordinadores, mediante el [modo de capacidad bajo demanda](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html). Este modo escala la capacidad de lectura y escritura de forma automática para adaptarse al tráfico sin necesidad de planificar la capacidad. Recomendamos mantener el modo de capacidad como bajo demanda para un funcionamiento más eficiente de estas tablas de metadatos.

Si decide cambiar la tabla de arrendamiento al [modo de capacidad aprovisionada](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html), siga estas prácticas recomendadas:
+ Analice los patrones de uso:
  + Supervisa los patrones y usos de lectura y escritura de tu aplicación (RCU, WCU) mediante las métricas de Amazon. CloudWatch 
  + Conozca los requisitos de rendimiento máximo y medio.
+ Calcule la capacidad necesaria:
  + Calcule las unidades de capacidad de lectura (RCUs) y las unidades de capacidad de escritura (WCUs) en función de su análisis.
  + Contemple factores como la cantidad de particiones, la frecuencia de los puntos de control y la cantidad de procesos de trabajo.
+ Implemente el escalado automático:
  + Utilice el [escalado automático de DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling) para ajustar automáticamente la capacidad aprovisionada y establecer los límites de capacidad mínima y máxima adecuados. 
  + El escalado automático de DynamoDB ayudará a evitar que la tabla de metadatos de KCL alcance el límite de capacidad y se vea limitada.
+ Supervise y optimice periódicamente:
  + Supervise continuamente CloudWatch las métricas para`ThrottledRequests`.
  + Ajuste la capacidad a medida que su carga de trabajo cambie con el tiempo.

Si se encuentra con `ProvisionedThroughputExceededException` en las tablas de metadatos de DynamoDB de su aplicación de consumo de KCL, deberá aumentar la capacidad de rendimiento aprovisionada de la tabla de DynamoDB. Si establece un nivel determinado de unidades de capacidad de lectura (RCU) y unidades de capacidad de escritura (WCU) al crear la aplicación de consumo por primera vez, puede que no sea suficiente a medida que aumente su uso. Por ejemplo, si su aplicación de consumo de KCL realiza registros de puntos de control frecuentes u opera en un flujo con muchas particiones, es posible que necesite más unidades de capacidad. Para obtener información sobre el rendimiento aprovisionado en DynamoDB, consulte [Capacidad de rendimiento de DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html) y [actualización de una tabla](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable) en la Guía para desarrolladores de Amazon DynamoDB.

## Cómo KCL asigna los arrendamientos a los procesos de trabajo y equilibra la carga
<a name="kcl-assign-leases"></a>

KCL recopila y supervisa continuamente las métricas de uso de la CPU de los hosts de procesamiento que ejecutan los procesos de trabajo para garantizar una distribución uniforme de la carga de trabajo. Estas métricas de uso de la CPU se almacenan en la tabla de métricas de procesos de trabajo de DynamoDB. Si KCL detecta que algunos procesos de trabajo muestran índices de uso de la CPU más altos que otros, reasignará los arrendamientos entre ellos para reducir la alta carga de trabajo. El objetivo es equilibrar la carga de trabajo de manera más uniforme en toda la flota de aplicaciones de consumo, evitando que un solo proceso de trabajo se sobrecargue. A medida que KCL distribuye la utilización de la CPU entre la flota de aplicaciones de consumo, puede ajustar la capacidad de su flota de aplicaciones de consumo al elegir el número correcto de procesos de trabajo o al utilizar el escalado automático para administrar de manera eficiente la capacidad de cómputo y lograr un menor costo.

**importante**  
KCL puede recopilar las métricas de uso de la CPU de los procesos de trabajo únicamente si se cumplen ciertos requisitos previos. Para obtener más información, consulte [Requisitos previos](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites). Si KCL no puede recopilar las métricas de uso de la CPU de los procesos de trabajo, volverá a utilizar el rendimiento por proceso de trabajo para asignar los arrendamientos y equilibrar la carga entre los procesos de trabajo de la flota. KCL supervisará el rendimiento que recibe cada proceso de trabajo en un momento dado y reasignará los arrendamientos para asegurarse de que cada uno obtenga un nivel de rendimiento total similar en los arrendamientos asignados.

# Desarrollar consumidores con KCL
<a name="develop-kcl-consumers"></a>

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones de consumo que procesen datos de los flujos de datos de Kinesis.

KCL está disponible en varios lenguajes. En este tema se describe cómo desarrollar consumidores de KCL en lenguajes Java y otros distintos a Java.
+ Para ver la referencia de Javadoc de la Kinesis Client Library, consulte [Javadoc de Amazon Kinesis Client Library](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html).
+ Para descargar KCL para Java desde GitHub, consulte la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) para Java.
+ Para localizar la KCL para Java en Apache Maven, consulte el [Repositorio central de KCL Maven](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client).

**Topics**
+ [Desarrollar consumidores con KCL en Java](develop-kcl-consumers-java.md)
+ [Desarrollar consumidores con KCL en lenguajes distintos de Java](develop-kcl-consumers-non-java.md)

# Desarrollar consumidores con KCL en Java
<a name="develop-kcl-consumers-java"></a>

## Requisitos previos
<a name="develop-kcl-consumers-java-prerequisites"></a>

Antes de comenzar con KCL 3.x, asegúrese de que dispone de lo siguiente:
+ Java Development Kit (JDK) 8 o posterior,
+ AWS SDK para Java 2.x
+ Maven o Gradle para la administración de dependencias.

KCL recopila métricas de uso de la CPU, como el uso de la CPU del host de cómputo en el que se ejecutan los procesos de trabajo para equilibrar la carga y lograr un nivel de uso de recursos uniforme entre ellos. Para permitir que KCL recopile métricas de uso de CPU desde los procesos de trabajo, debe cumplir los siguientes requisitos previos:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ El sistema operativo debe ser Linux.
+ Debe habilitarlo [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)en su instancia EC2.

 **Amazon Elastic Container Service (Amazon ECS) en Amazon EC2**
+ El sistema operativo debe ser Linux.
+ Debe habilitar la [versión 4 del punto de conexión de metadatos de tareas de ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La versión del agente de contenedor de Amazon ECS debe ser 1.39.0 o posterior.

 **Amazon ECS en AWS Fargate**
+ Debe habilitar la [versión 4 del punto de conexión de metadatos de tareas de Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). Si utiliza la versión 1.4.0 o una posterior de la plataforma Fargate, se habilitará de forma predeterminada. 
+ Versión de la plataforma de Fargate 1.4.0 o posterior.

 **Amazon Elastic Kubernetes Service (Amazon EKS) en Amazon EC2** 
+ El sistema operativo debe ser Linux.

 **Amazon EKS en AWS Fargate**
+ Plataforma de Fargate 1.3.0 o posterior.

**importante**  
Si KCL no puede recopilar las métricas de uso de la CPU de los procesos de trabajo, volverá a utilizar el rendimiento por proceso de trabajo para asignar los arrendamientos y equilibrar la carga entre los procesos de trabajo de la flota. Para obtener más información, consulte [Cómo KCL asigna los arrendamientos a los procesos de trabajo y equilibra la carga](kcl-dynamoDB.md#kcl-assign-leases).

## Instale y agrege dependencias
<a name="develop-kcl-consumers-java-installation"></a>

Si está utilizando Maven, agregue la siguiente dependencia a su archivo `pom.xml`. Asegúrese de reemplazar la versión 3.x.x por la versión más reciente de KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Si está utilizando Gradle, agregue lo siguiente a su archivo `build.gradle`. Asegúrese de reemplazar la versión 3.x.x por la versión más reciente de KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Puede buscar la versión más reciente del KCL en el [Repositorio central de Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Implementar el consumidor
<a name="develop-kcl-consumers-java-implemetation"></a>

Una aplicación de consumidor de KCL consta de los siguientes componentes clave:

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Programador](#implementation-scheduler)
+ [Aplicación de consumo principal](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor es el componente principal en el que reside la lógica empresarial para procesar los registros de transmisión de datos de Kinesis. Define la forma en que procesa la aplicación los datos que recibe del flujo de Kinesis.

Responsabilidades principales:
+ Inicializar el procesamiento de una partición
+ Procesar lotes de registros del flujo de Kinesis
+ Cerrar el procesamiento de una partición (por ejemplo, cuando la partición se divide o fusiona, o cuando el arrendamiento se transfiere a otro host)
+ Controlar el registro de puntos de control para realizar un seguimiento del progreso

A continuación, se muestra un ejemplo de implementación:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Ahora se explicará con detalle cada método utilizado en el ejemplo:

**inicializar (InitializationInput) InitializationInput**
+ Objetivo: configurar los recursos o estados necesarios para procesar los registros.
+ Cuándo se llama: una vez, cuando KCL asigna una partición a este procesador de registros.
+ Puntos clave:
  + `initializationInput.shardId()`: el ID de la partición que gestionará este procesador.
  + `initializationInput.extendedSequenceNumber()`: el número de secuencia desde el que se iniciará el procesamiento.

**ProcessRecords () ProcessRecordsInput processRecordsInput**
+ Objetivo: procesar los registros entrantes y, de manera opcional, comprobar el progreso del punto de control.
+ Cuando se llama: repetidamente, siempre y cuando el procesador de registros sea el propietario del arrendamiento de la partición.
+ Puntos clave:
  + `processRecordsInput.records()`: lista de registros que se van a procesar.
  + `processRecordsInput.checkpointer()`: se utiliza para comprobar el progreso.
  + Asegúrese de haber gestionado cualquier excepción durante el procesamiento para evitar que la KCL falle.
  + Este método debe ser idempotente, ya que el mismo registro puede procesarse más de una vez en algunos escenarios, por ejemplo, cuando los datos no se han registrado en puntos de control antes de que el proceso de trabajo fallara o reinicie de forma inesperada.
  + Vacíe siempre los datos almacenados en el búfer antes de registrar los puntos de control para garantizar la coherencia de datos.

**Arrendamiento perdido () LeaseLostInput leaseLostInput**
+ Objetivo: limpiar cualquier recurso específico para procesar esta partición.
+ Cuándo se llama: cuando otro programador se hace cargo del arrendamiento de esta partición.
+ Puntos clave:
  + El registro de puntos de control no está permitido en este método.

**Fragmentado () ShardEndedInput shardEndedInput**
+ Objetivo: finalizar el procesamiento de esta partición y este punto de control.
+ Cuándo se llama: cuando la partición se divide o se fusiona, lo que indica que se han procesado todos los datos de esta partición.
+ Puntos clave:
  + `shardEndedInput.checkpointer()`: se utiliza para realizar el registro final de puntos de control.
  + El registro de puntos de control de este método es obligatorio para completar el procesamiento.
  + Si no se vacían los datos y puntos de control, es posible que se pierdan los datos o se duplique el procesamiento cuando se vuelva a abrir la partición.

**Cierre solicitado () ShutdownRequestedInput shutdownRequestedInput**
+ Objetivo: registrar un punto de control y limpiar los recursos cuando KCL se está cerrando.
+ Cuándo se llama: cuando KCL se cierra (por ejemplo, cuando la aplicación se cierra).
+ Puntos clave:
  + `shutdownRequestedInput.checkpointer()`: se utiliza para realizar el registro de puntos de control antes del cierre.
  + Asegúrese de haber implementado el registro de puntos de control en el método para guardar el progreso antes de que la aplicación se detenga.
  + Si no se vacían los datos y puntos de control, se podrían perder los datos o volver a procesar los registros cuando se reinicie la aplicación.

**importante**  
KCL 3.x garantiza un menor reprocesamiento de datos cuando el arrendamiento se transfiere de un proceso de trabajo a otro mediante puntos de control antes de que el proceso de trabajo anterior se cierre. Si no implementa la lógica de registro de puntos de control en el método `shutdownRequested()`, no verá este beneficio. Asegúrese de haber implementado una lógica de registro de puntos de control dentro del método `shutdownRequested()`.

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory es responsable de crear nuevas RecordProcessor instancias. KCL usa esta fábrica para crear una nueva RecordProcessor para cada fragmento que la aplicación necesite procesar.

Responsabilidades principales:
+ Cree nuevas RecordProcessor instancias bajo demanda
+ Asegúrese de que cada una RecordProcessor esté inicializada correctamente

A continuación, se muestra un ejemplo de implementación:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

En este ejemplo, la fábrica crea una nueva SampleRecordProcessor cada vez que se llama a shardRecordProcessor (). Puede ampliarlo para incluir cualquier lógica de inicialización necesaria.

### Programador
<a name="implementation-scheduler"></a>

El programador es un componente de alto nivel que coordina todas las actividades de la aplicación KCL. Es responsable de la orquestación general del procesamiento de datos.

Responsabilidades principales:
+ Gestione el ciclo de vida de RecordProcessors
+ Gestionar la administración de los arrendamientos de particiones
+ Coordinar el registro de puntos de control
+ Equilibrar la carga de procesamiento de la partición entre varios procesos de trabajo de su aplicación
+ Gestionar correctamente las señales de cierre y cierre de las aplicaciones

Por lo general, el programar se crea en la aplicación principal y se inicia en ella. Puede consultar el ejemplo de implementación del programador en la siguiente sección, Aplicación de consumo principal 

### Aplicación de consumo principal
<a name="implementation-main"></a>

La aplicación de consumo principal une todos los componentes. Es responsable de configurar el consumo de KCL, crear los clientes necesarios, configurar el programador y administrar el ciclo de vida de la aplicación.

Responsabilidades principales:
+ Configurar clientes de AWS servicio (Kinesis, DynamoDB,) CloudWatch
+ Configurar la aplicación de KCL
+ Crear e iniciar el programador
+ Gestionar el apagado de aplicaciones

A continuación, se muestra un ejemplo de implementación:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 Por defecto, KCL crea un consumidor de distribución ramificada mejorada (EFO) con un rendimiento dedicado. Para obtener más información sobre la distribución ramificada mejorada, consulte [Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado](enhanced-consumers.md). Si tiene menos de 2 consumidores o no necesita retrasos de propagación de la lectura inferiores a 200 ms, debe establecer la siguiente configuración en el objeto del programador para utilizar consumidores de rendimiento compartido:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

El siguiente código es un ejemplo de cómo crear un objeto del programador que utiliza consumidores de rendimiento compartido:

**Importaciones:**

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Código**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Desarrollar consumidores con KCL en lenguajes distintos de Java
<a name="develop-kcl-consumers-non-java"></a>

En esta sección se describe la implementación de los consumidores que utilizan Kinesis Client Library (KCL) en Python, Node.js, .NET y Ruby.

KCL es una biblioteca de Java. El soporte para lenguajes distintos de Java se proporciona mediante una interfaz multilingüe llamada `MultiLangDaemon`. Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza con un lenguaje de KCL distinto de Java. Por tanto, si instala KCL para lenguajes distintos de Java y escribe completamente su aplicación de consumo en lenguajes distintos de Java, seguirá necesitando tener Java instalado en su sistema debido al `MultiLangDaemon`. Además, `MultiLangDaemon` tiene algunos ajustes predeterminados que podría tener que personalizar para su caso de uso (por ejemplo, la región de AWS a la que se conecta). Para obtener más información `MultiLangDaemon` sobre él GitHub, consulte el [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Si bien los conceptos principales siguen siendo los mismos en todos los lenguajes, existen algunas consideraciones e implementaciones específicas de cada uno. Para conocer los conceptos básicos sobre el desarrollo de los consumidores de KCL, consulte [Desarrollar consumidores con KCL en Java](develop-kcl-consumers-java.md). Para obtener información más detallada sobre cómo desarrollar consumidores de KCL en Python, Node.js, .NET y Ruby y las últimas actualizaciones, consulte los siguientes GitHub repositorios:
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**importante**  
No utilice las siguientes versiones de la biblioteca KCL que no sean de Java si utiliza JDK 8. Estas versiones contienen una dependencia (logback) que es incompatible con JDK 8.  
KCL Python 3.0.2 y 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
Recomendamos utilizar versiones publicadas antes o después de estas versiones afectadas cuando trabaje con JDK 8.

# Procesamiento de varios flujos con KCL
<a name="kcl-multi-stream"></a>

En esta sección se describen los cambios necesarios en KCL, que permiten crear aplicaciones de consumo de KCL que pueden procesar más de un flujo de datos al mismo tiempo.
**importante**  
El procesamiento de varios flujos solo es compatible con KCL 2.3 o versiones posteriores.
Los consumidores de KCL que estén escritos en lenguajes distintos de Java y que funcionen con `multilangdaemon` *no* son compatibles con el procesamiento de varios flujos.
El procesamiento de varios flujos *no* es compatible con ninguna versión de KCL 1.x.
+ **MultistreamTracker interfaz**
  + Para crear una aplicación de consumo que pueda procesar múltiples transmisiones al mismo tiempo, debe implementar una nueva interfaz llamada [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Esta interfaz incluye el método `streamConfigList` que devuelve la lista de flujos de datos y sus configuraciones para que los procese la aplicación de consumo de KCL. Tenga en cuenta que los flujos de datos que se procesan pueden cambiar durante el tiempo de ejecución de la aplicación de consumo. KCL llama a `streamConfigList` periódicamente para obtener información sobre los cambios en los flujos de datos que se van a procesar.
  + `streamConfigList`Rellena la [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)lista.

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + Los campos `StreamIdentifier` y `InitialPositionInStreamExtended` son obligatorios, aunque `consumerArn` es opcional. Debe proporcionar `consumerArn` únicamente si utiliza KCL para implementar una aplicación de consumo con distribución ramificada mejorada.
  + Para obtener más información al respecto`StreamIdentifier`, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129.](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129) Para crear un `StreamIdentifier`, le recomendamos que cree una instancia multiflujo a partir de `streamArn` y la `streamCreationEpoch` que esté disponible en KCL 2.5.0 o versiones posteriores. En las versiones KCL 2.3 y 2.4, que no son compatibles con `streamArm`, cree una instancia multiflujo con el formato `account-id:StreamName:streamCreationTimestamp`. Este formato quedará obsoleto y dejará de ser compatible a partir de la próxima versión principal.
  +  MultistreamTracker también incluye una estrategia para eliminar las concesiones de transmisiones antiguas de la tabla de arrendamientos () formerStreamsLeasesDeletionStrategy. Tenga en cuenta que la estrategia NO SE PUEDE cambiar durante el tiempo de ejecución de la aplicación de consumo. Para obtener más información, consulte [https://github.com/awslabs/amazon-kinesis-clientb/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy/blob/0c5042dadf794fe988438436252a5a8fe70b6b0](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java) .java.
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)es una clase que abarca toda la aplicación y que se puede utilizar para especificar todos los ajustes de configuración de KCL que se utilizarán al crear la aplicación de consumo de KCL para la versión 2.x o posterior de KCL. `ConfigsBuilder`la clase ahora es compatible con la interfaz. `MultistreamTracker` Puede inicializar ConfigsBuilder cualquiera de las dos con el nombre del flujo de datos desde el que se van a consumir los registros: 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

O bien, puede inicializarlo ConfigsBuilder `MultiStreamTracker` si desea implementar una aplicación de consumo de KCL que procese varios flujos al mismo tiempo.

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ Con la compatibilidad con varios flujos implementada para la aplicación de consumo de KCL, cada fila de la tabla de arrendamiento de la aplicación ahora contiene el ID de la partición y el nombre del flujo de los varios flujos de datos que procesa esta aplicación.
+ Cuando se implementa la compatibilidad con varios flujos para la aplicación de consumo de KCL, leaseKey adopta la siguiente estructura: `account-id:StreamName:streamCreationTimestamp:ShardId`. Por ejemplo, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

**importante**  
Cuando la aplicación de consumo de KCL existente está configurada para procesar solo un flujo de datos, `leaseKey` (que es la clave de partición de la tabla de arrendamiento) es el ID de la partición. Si vuelve a configurar una aplicación de consumo de KCL existente para procesar varios flujos de datos, se rompe la tabla de arrendamiento, ya que la estructura de `leaseKey` debe ser la siguiente: `account-id:StreamName:StreamCreationTimestamp:ShardId` para admitir varios flujos.

# Utilice el registro de AWS Glue esquemas con KCL
<a name="kcl-glue-schema"></a>

Puede integrar Kinesis Data Streams con AWS Glue el registro de esquemas. El registro de AWS Glue esquemas le permite descubrir, controlar y desarrollar esquemas de forma centralizada, al tiempo que garantiza que los datos generados se validen continuamente mediante un esquema registrado. Un esquema define la estructura y el formato de un registro de datos. Un esquema es una especificación versionada para publicación, consumo o almacenamiento de confianza de datos. El registro AWS Glue de esquemas le permite mejorar la end-to-end calidad y el gobierno de los datos en sus aplicaciones de streaming. Para obtener más información, consulte [Registro de esquemas de AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Una de las formas de configurar esta integración es a través de KCL para Java.

**importante**  
AWS Glue La integración del registro de esquemas para Kinesis Data Streams solo se admite en KCL 2.3 o versiones posteriores.
AWS Glue La integración del registro de esquemas para Kinesis Data Streams no *es* compatible con los consumidores de KCL escritos en lenguajes distintos de Java que se ejecutan con. `multilangdaemon`
AWS Glue La integración del registro de esquemas para Kinesis Data Streams no *es* compatible con ninguna versión de KCL 1.x.

Para obtener instrucciones detalladas sobre cómo configurar la integración de Kinesis Data Streams con el registro de AWS Glue esquemas mediante KCL, consulte la sección «Interacción con los datos mediante las KPL/KCL bibliotecas» en [Caso de uso: integración de Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) con el registro de esquemas. AWS Glue 

# Permisos de IAM necesarios para las aplicaciones de consumo de KCL
<a name="kcl-iam-permissions"></a>

 Debe agregar los siguientes permisos al rol o usuario de IAM asociado a su aplicación de consumo de KCL. 

 Prácticas recomendadas de seguridad para AWS dictar el uso de permisos detallados para controlar el acceso a los distintos recursos. AWS Identity and Access Management (IAM) le permite administrar los usuarios y los permisos de acceso de los usuarios. AWS Una política de IAM enumera de forma explícita las acciones que se pueden realizar y los recursos a los que se pueden aplicar dichas acciones.

En la siguiente tabla se muestran los permisos de IAM mínimos que generalmente se requieren para las aplicaciones de consumo de KCL:


**Permisos de IAM mínimos para las aplicaciones de consumo de KCL**  

| Servicio | Acciones | Recursos () ARNs | Finalidad | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  El flujo de datos de Kinesis desde la que la aplicación de KCL procesará los datos.`arn:aws:kinesis:region:account:stream/StreamName`  |  Antes de intentar leer registros, el consumidor comprueba si la secuencia de datos existe y si está activa, y si los fragmentos se encuentran en la secuencia de datos. Registra los consumidores en una partición.  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | El flujo de datos de Kinesis desde la que la aplicación de KCL procesará los datos.`arn:aws:kinesis:region:account:stream/StreamName` |  Lee registros de un fragmento.  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  El flujo de datos de Kinesis desde la que la aplicación de KCL procesará los datos. Agregue esta acción solo si utiliza consumidores con distribución ramificada mejorada (EFO). `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  Se suscribe a una sección destinada a los consumidores con distribución ramificada mejorada (EFO).  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Tabla de arrendamiento (tabla de metadatos en DynamoDB creada por KCL). `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  Estas acciones son necesarias para que KCL administre la tabla de arrendamiento creada en DynamoDB.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Tabla de métricas de procesos de trabajo y de estado de coordinadores (tablas de metadatos en DynamoDB) creadas por KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  Estas acciones son obligatorias para que KCL administre las tablas de metadatos para las métricas de los procesos de trabajo y del estado de coordinadores en DynamoDB.  | 
| Amazon DynamoDB | `Query` |  Un índice secundario global en la tabla de arrendamiento. `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  Esta acción es obligatoria para que KCL lea el índice secundario global de la tabla de arrendamiento creada en DynamoDB.  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  Sube métricas CloudWatch que sean útiles para monitorear la aplicación. El asterisco (\$1) se utiliza porque no hay ningún recurso específico CloudWatch en el que se invoque la `PutMetricData` acción.   | 

**nota**  
Sustituya «región», «cuenta» y «StreamNamenombre» por su propio Cuenta de AWS número Región de AWS, KCLApplication nombre de la ARNs transmisión de datos de Kinesis y nombre de la aplicación de KCL, respectivamente. KCL 3.x crea dos tablas de metadatos más en DynamoDB. Para obtener más información sobre las tablas de metadatos de DynamoDB creadas por KCL, consulte [Tablas de metadatos de DynamoDB y equilibrio de carga en KCL](kcl-dynamoDB.md). Si utiliza configuraciones para personalizar los nombres de las tablas de metadatos creadas por KCL, utilice los nombres de tabla especificados en lugar del nombre de la aplicación de KCL. 

A continuación, se muestra un ejemplo de documento de política para una aplicación de consumo de KCL. 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Antes de utilizar la política de ejemplo, revise los siguientes elementos:
+ Sustituya REGION por su Región de AWS (por ejemplo, us-east-1).
+ Sustituya ACCOUNT\$1ID por su ID. Cuenta de AWS 
+ sustituya STREAM\$1NAME por el nombre de su flujo de datos de Kinesis,
+ sustituya CONSUMER\$1NAME por el nombre de su consumidor. Suele ser el nombre de su aplicación al utilizar KCL,
+ sustituya KCL\$1APPLICATION\$1NAME por el nombre de la aplicación de KCL.

# Configuraciones de KCL
<a name="kcl-configuration"></a>

Defina propiedades de configuración para personalizar la funcionalidad de Kinesis Client Library y cumplir requisitos específicos. En la siguiente tabla se describen las propiedades y clases de configuración.

**importante**  
En KCL 3.x, el algoritmo de equilibrio de carga tiene como objetivo lograr un uso uniforme de la CPU entre los procesos de trabajo, y no un número igual de arrendamientos por proceso de trabajo. Si establece `maxLeasesForWorker` demasiado bajo, podría limitar la capacidad de KCL de equilibrar la carga de trabajo de forma eficaz. Si utiliza la configuración `maxLeasesForWorker`, tenga presente aumentar su valor para permitir la mejor distribución de carga posible.


**En esta tabla se muestran las propiedades de configuración de KCL**  

| Propiedad de configuración | Clase de configuración | Description (Descripción) | Predeterminado | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | El nombre de la aplicación de KCL. Se utiliza de forma predeterminada para tableName y consumerName. | No aplicable | 
| tableName | ConfigsBuilder |  Permite sustituir el nombre de la tabla que se utiliza para la tabla de asignaciones de Amazon DynamoDB.  | No aplicable | 
| streamName | ConfigsBuilder |  El nombre de la secuencia cuyos registros procesa de esta aplicación.  | No aplicable | 
| workerIdentifier | ConfigsBuilder |  Identificador único que representa esta instancia del procesador de aplicaciones. Deben ser único.  | No aplicable | 
| failoverTimeMillis | LeaseManagementConfig |  El número de milisegundos que deben transcurrir antes de que se pueda considerar que se ha producido un error en el propietario de una asignación. En el caso de las aplicaciones que tienen una gran cantidad de particiones, se pueden establecer en un número mayor para reducir la cantidad de IOPS de DynamoDB necesarios para el seguimiento de los arrendamientos.  | 10 000 (10 segundos) | 
| shardSyncIntervalMillis | LeaseManagementConfig |  El tiempo entre llamadas de sincronización del fragmento.  | 60 000 (60 segundos) | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  Cuando se establece, se eliminan las asignaciones tan pronto como se inicia el procesamiento del fragmento secundario.  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  Cuando se establece, los fragmentos secundarios que tienen un fragmento abierto se pasan por alto. Esto es principalmente para DynamoDB Streams.  | FALSO | 
| maxLeasesForWorker | LeaseManagementConfig |  La cantidad máxima de arrendamientos que debe aceptar un solo proceso de trabajo. Si se establece un valor demasiado bajo, podría causar que se pierdan datos si los procesos de trabajo no pueden procesar todas las particiones y que la asignación del arrendamiento entre los procesos de trabajo no sea óptima. Tenga en cuenta el número total de particiones, el número de procesos de trabajo y la capacidad de procesamiento de los procesos de trabajo al configurarlo.  | Sin límite | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  Controla el tamaño del grupo de subprocesos del renovador de asignaciones. Cuanto más grande sea el número de asignaciones que puede tomar la aplicación, más grande debe ser este grupo.  | 20 | 
| billingMode | LeaseManagementConfig |  Determina el modo de capacidad de la tabla de arrendamiento creada en DynamoDB. Existen dos opciones: el modo bajo demanda (PAY\$1PER\$1REQUEST) y el modo aprovisionado. Recomendamos utilizar la configuración predeterminada del modo bajo demanda, ya que escala automáticamente para adaptarse a la carga de trabajo sin necesidad de planificar la capacidad.  | PAY\$1PER\$1REQUEST (modo bajo demanda) | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Capacidad de lectura de DynamoDB que se utiliza cuando Kinesis Client Library debe crear una tabla de arrendamiento de DynamoDB con el modo de capacidad aprovisionada. Puede ignorar esta configuración si utiliza el modo de capacidad bajo demanda predeterminado en la configuración billingMode. | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Capacidad de lectura de DynamoDB que se utiliza cuando Kinesis Client Library debe crear una tabla de arrendamiento de DynamoDB. Puede ignorar esta configuración si utiliza el modo de capacidad bajo demanda predeterminado en la configuración billingMode. | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  La posición inicial de la secuencia en la que debería comenzar la aplicación. Esto solo se utiliza durante la creación inicial de la asignación.  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  Un valor porcentual que determina cuándo el algoritmo de equilibrio de carga debería reasignar las particiones entre los procesos de trabajo. Es una nueva configuración presentada en KCL 3.x.  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  Un valor porcentual que se utiliza para amortiguar la cantidad de carga que se moverá desde el proceso de trabajo sobrecargado en una sola operación de rebalanceo. Es una nueva configuración presentada en KCL 3.x.  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  Determina si aún es necesario contratar un arrendamiento adicional al proceso de trabajo sobrecargado, incluso si esto provoca que el monto total del arrendamiento solicitado supere el monto de rendimiento deseado. Es una nueva configuración presentada en KCL 3.x.  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  Determina si KCL debe ignorar las métricas de recursos de los procesos de trabajos (como el uso de la CPU) al reasignar los arrendamientos y equilibrar la carga. Configúrelo en TRUE si quiere evitar que KCL equilibre la carga según el uso de la CPU. Es una nueva configuración presentada en KCL 3.x.  | FALSO | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  Cantidad de rendimiento máximo que se debe asignar a un proceso de trabajo durante la asignación del arrendamiento. Es una nueva configuración presentada en KCL 3.x.  | Sin límite | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  Controla el comportamiento de la transferencia del arrendamiento entre los procesos de trabajo. Si se establece en true, KCL intentará transferir los arrendamientos sin problemas, dejando que el fragmento RecordProcessor tenga tiempo suficiente para completar el procesamiento antes de entregar el arrendamiento a otro trabajador. Esto puede ayudar a garantizar la integridad de los datos y realizar transiciones fluidas, pero puede aumentar el tiempo de transferencia. Si se establece en falso, el contrato de arrendamiento se transferirá inmediatamente sin esperar a que se cierre correctamente. RecordProcessor Esto puede llevar a que las transferencias sean más rápidas, pero se corre el riesgo de que el procesamiento quede incompleto. Nota: Los puntos de control deben implementarse dentro del método shutdownRequested () del RecordProcessor para poder beneficiarse de la elegante función de traspaso de arrendamientos. Es una nueva configuración presentada en KCL 3.x.  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  Especifica el tiempo mínimo (en milisegundos) que se debe esperar a que el fragmento actual se cierre correctamente antes de RecordProcessor transferir forzosamente el arrendamiento al siguiente propietario. Si el método processRecords suele ejecutarse durante más tiempo que el valor predeterminado, tenga en mente aumentar esta configuración. Esto garantiza que RecordProcessor tenga tiempo suficiente para completar su procesamiento antes de que se produzca la transferencia del arrendamiento. Es una nueva configuración presentada en KCL 3.x.  | 30 000 (30 segundos) | 
| maxRecords | PollingConfig |  Permite establecer el número máximo de registros que devuelve Kinesis.  | 10 000 | 
| retryGetRecordsInSeconds | PollingConfig |  Configura el retraso entre los GetRecords intentos en caso de error.  | Ninguno | 
| maxGetRecordsThreadPool | PollingConfig |  El tamaño del grupo de subprocesos utilizado para GetRecords.  | Ninguno | 
| idleTimeBetweenReadsInMillis | PollingConfig |  Determina cuánto tiempo espera KCL entre GetRecords llamadas para sondear los datos de los flujos de datos. La unidad es de milisegundos.  | 1500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  Cuando se establece, se llama al procesador de registros incluso cuando no se proporciona ningún registro de Kinesis.  | FALSO | 
| parentShardPollIntervalMillis | CoordinatorConfig |  Determina la frecuencia con que debería sondear un procesador de registros para ver si el fragmento principal se ha completado. La unidad es milisegundos.  | 10 000 (10 segundos) | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  Deshabilita la sincronización de los datos de los fragmentos si la tabla de asignaciones todavía contiene entradas.  |  FALSO  | 
| shardPrioritization | CoordinatorConfig |  La priorización de fragmentos que se va a utilizar.  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  Determina en qué modo de compatibilidad de versiones de KCL se ejecutará la aplicación. Esta configuración es solo para la migración desde versiones anteriores de KCL. Al migrar a la versión 3.x, debe establecer esta configuración en `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`. Puede eliminar esta configuración cuando complete la migración.  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  El tiempo que se debe esperar para reintentar la tareas de KCL con errores. La unidad es de milisegundos.  | 500 (0,5 segundos) | 
| logWarningForTaskAfterMillis | LifecycleConfig |  Tiempo que se debe esperar antes de registrar una advertencia si una tarea no ha finalizado.  | Ninguno | 
| listShardsBackoffTimeInMillis | RetrievalConfig | El número de milisegundos que se debe esperar entre llamadas a ListShards cuando se producen errores. La unidad es de milisegundos. | 1500 (1,5 segundos) | 
| maxListShardsRetryAttempts | RetrievalConfig | El número máximo de veces que se reintenta ListShards antes de desistir. | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  Especifica el tiempo máximo (en milisegundos) para almacenar en búfer las métricas antes de publicarlas. CloudWatch  | 10 000 (10 segundos) | 
| metricsMaxQueueSize | MetricsConfig |  Especifica el número máximo de métricas que se deben almacenar en búfer antes de publicarlas CloudWatch.  | 10 000 | 
| metricsLevel | MetricsConfig |  Especifica el nivel de granularidad de CloudWatch las métricas que se van a habilitar y publicar.  Valores posibles: NONE, SUMMARY, DETAILED.  |  MetricsLevel.DETALLADO  | 
| metricsEnabledDimensions | MetricsConfig |  Controla las dimensiones permitidas para las CloudWatch métricas.  | Todas las dimensiones | 

**Configuraciones discontinuadas en KCL 3.x**

Las siguientes propiedades de configuración están discontinuadas en KCL 3.x:


**En la tabla se muestran las propiedades de configuración discontinuadas de KCL 3.x**  

| Propiedad de configuración | Clase de configuración | Description (Descripción) | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  El número máximo de asignaciones del que debería intentar apropiarse una aplicación al mismo tiempo. KCL 3.x ignorará esta configuración y reasignará los arrendamientos según la utilización de los recursos por parte de los procesos de trabajo.  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  Controla si los trabajadores deben dar prioridad a los arrendamientos muy vencidos (los arrendamientos no se renuevan hasta triplicar el tiempo de conmutación por error) y a los nuevos arrendamientos de particiones, independiente del número de arrendamientos de destino, pero respetando los límites máximos de arrendamiento. KCL 3.x ignorará esta configuración y siempre distribuirá los arrendamientos vencidos entre los procesos de trabajo.  | 

**importante**  
Debe seguir teniendo las propiedades de configuración interrumpidas durante la migración de versiones anteriores de KCL a KCL 3.x. Durante la migración, el proceso de trabajo de KCL comenzará primero con el modo compatible con KCL 2.x y pasará al modo de funcionalidad de KCL 3.x cuando detecte que todos los procesos de trabajo de KCL de la aplicación están preparados para ejecutar KCL 3.x. Estas configuraciones discontinuadas son necesarias mientras los procesos de trabajo de KCL utilizan el modo compatible con KCL 2.x.

# Política del ciclo de vida de la versión de KCL
<a name="kcl-version-lifecycle-policy"></a>

En este tema se describe la política de ciclo de vida de las versiones para la biblioteca de clientes de Amazon Kinesis (KCL). AWS publica periódicamente nuevas versiones de KCL para incorporar nuevas funciones y mejoras, correcciones de errores, parches de seguridad y actualizaciones de dependencias. Le recomendamos que utilice las versiones de KCL para mantenerse al día con las últimas funciones, actualizaciones de seguridad y dependencias subyacentes. up-to-date **No** recomendamos seguir utilizando una versión de KCL no compatible.

El ciclo de vida de las principales versiones de KCL consta de las siguientes tres fases:
+ **Disponibilidad general (GA)**: durante esta fase, la versión principal es totalmente compatible. AWS proporciona versiones periódicas de versiones secundarias y de parches que incluyen compatibilidad con nuevas funciones o actualizaciones de API para Kinesis Data Streams, así como correcciones de errores y de seguridad.
+ **Modo de mantenimiento**: AWS limita las publicaciones de las versiones de parches para abordar únicamente las correcciones de errores críticos y los problemas de seguridad. La versión principal no recibirá actualizaciones de las nuevas funciones ni APIs de Kinesis Data Streams.
+ **E nd-of-support**: La versión principal ya no recibirá actualizaciones ni versiones. Las versiones publicadas anteriormente seguirán estando disponibles a través de los administradores de paquetes públicos y el código permanecerá activado GitHub. El uso de una versión disponible end-of-support queda a discreción del usuario. Recomendamos que actualice a la versión principal más reciente.


| Versión principal | Fase actual | Fecha de publicación | Fecha del modo de mantenimiento | End-of-support fecha | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | Modo de mantenimiento | 19/12/2013 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | Disponibilidad general | 2018-08-02 | -- | -- | 
| KCL 3.x | Disponibilidad general | 2024-11-06 | -- | -- | 

# Migre desde versiones anteriores de KCL
<a name="kcl-migration-previous-versions"></a>

En este tema se explica cómo migrar desde versiones anteriores de Kinesis Client Library (KCL). 

## ¿Qué novedades incluye KCL 3.0?
<a name="kcl-migration-new-3-0"></a>

Kinesis Client Library (KCL) 3.0 presenta numerosas mejoras importantes en comparación con las versiones anteriores:
+  Reduce los costos de procesamiento de las aplicaciones de consumo al redistribuir automáticamente el trabajo de los procesos de trabajo sobreutilizados a los procesos de trabajo infrautilizados de la flota de aplicaciones de consumo. Este nuevo algoritmo de equilibrio de carga garantiza una distribución uniforme del uso de la CPU entre los procesos de trabajo y elimina la necesidad de aprovisionar en exceso a los procesos de trabajo.
+  Reduce el costo de DynamoDB asociado a KCL al optimizar las operaciones de lectura en la tabla de arrendamiento.
+ Minimiza el reprocesamiento de los datos cuando los arrendamientos se reasignan a otro proceso de trabajo, ya que permite que el proceso de trabajo actual complete el registro de puntos de control de los registros que ha procesado.
+  Se utiliza AWS SDK for Java 2.x para mejorar el rendimiento y las funciones de seguridad, lo que elimina por completo la dependencia de la versión 1.x. AWS SDK para Java 

Para obtener más información, consulte las [notas de KCL 3.0](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md).

**Topics**
+ [¿Qué novedades incluye KCL 3.0?](#kcl-migration-new-3-0)
+ [Migre de KCL 2.x a KCL 3.x](kcl-migration-from-2-3.md)
+ [Restauración de la versión de KCL anterior](kcl-migration-rollback.md)
+ [Avance a KCL 3.x después de una restauración](kcl-migration-rollforward.md)
+ [Prácticas recomendadas para la tabla de arrendamiento con el modo de capacidad aprovisionada](kcl-migration-lease-table.md)
+ [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md)

# Migre de KCL 2.x a KCL 3.x
<a name="kcl-migration-from-2-3"></a>

En este tema se proporcionan step-by-step instrucciones para migrar a su consumidor de KCL 2.x a KCL 3.x. KCL 3.x permite la migración local de los consumidores de KCL 2.x. Puede seguir consumiendo los datos de su flujo de datos de Kinesis y, al mismo tiempo, migrar a sus procesos de trabajo de forma continua.

**importante**  
KCL 3.x mantiene las mismas interfaces y métodos que KCL 2.x. Por lo tanto, no es necesario actualizar el código de procesamiento de registros durante la migración. Sin embargo, debe establecer la configuración adecuada y comprobar los pasos necesarios para la migración. Recomendamos encarecidamente que siga los siguientes pasos de migración para que la experiencia de migración sea fluida.

## Paso 1: requisitos previos
<a name="kcl-migration-from-2-3-prerequisites"></a>

Antes de comenzar con KCL 3.x, asegúrese de que dispone de lo siguiente:
+ Java Development Kit (JDK) 8 o posterior,
+ AWS SDK para Java 2.x
+ Maven o Gradle para la administración de dependencias.

**importante**  
No utilice las AWS SDK para Java versiones 2.27.19 a 2.27.23 con KCL 3.x. Estas versiones cuentan con un problema que provoca un error de excepción relacionado con el uso de DynamoDB por parte de KCL. Le recomendamos que utilice la AWS SDK para Java versión 2.28.0 o posterior para evitar este problema. 

## Paso 2: agregar dependencias
<a name="kcl-migration-from-2-3-dependencies"></a>

Si está utilizando Maven, agregue la siguiente dependencia a su archivo `pom.xml`. Asegúrese de reemplazar la versión 3.x.x por la versión más reciente de KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Si está utilizando Gradle, agregue lo siguiente a su archivo `build.gradle`. Asegúrese de reemplazar la versión 3.x.x por la versión más reciente de KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Puede buscar la versión más reciente del KCL en el [Repositorio central de Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Paso 3: configurar la configuración relacionada con la migración
<a name="kcl-migration-from-2-3-configuration"></a>

Para migrar de KCL 2.x a KCL 3.x, debe establecer el siguiente parámetro de configuración:
+ CoordinatorConfig. clientVersionConfig: Esta configuración determina en qué modo de compatibilidad de versiones de KCL se ejecutará la aplicación. Al migrar de KCL 2.x a 3.x, debe establecer esta configuración en `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`. Para establecer esta configuración, agregue la siguiente línea al crear el objeto del programador:

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

El siguiente es un ejemplo de cómo configurar `CoordinatorConfig.clientVersionConfig` para la migración de KCL 2.x a 3.x. Puede ajustar otras configuraciones según sea necesario según los requisitos específicos:

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

Es importante que todos los procesos de trabajo de la aplicación de consumo utilicen el mismo algoritmo de equilibrio de carga en un momento dado, ya que KCL 2.x y 3.x utilizan diferentes algoritmos de equilibrio de carga. Si los procesos de trabajo utilizan diferentes algoritmos de equilibrio de carga, es posible que la distribución de la carga no sea óptima, ya que los dos algoritmos funcionan de forma independiente.

Esta configuración de compatibilidad con KCL 2.x permite que la aplicación KCL 3.x se ejecute en un modo compatible con KCL 2.x y utilice el algoritmo de equilibrio de carga para KCL 2.x hasta que todos los procesos de trabajo de la aplicación de consumo se hayan actualizado a KCL 3.x. Cuando se complete la migración, KCL cambiará automáticamente al modo de funcionalidad completa de KCL 3.x y empezará a utilizar un nuevo algoritmo de equilibrio de carga de KCL 3.x para todos los procesos de trabajo en ejecución.

**importante**  
Si no va a utilizar `ConfigsBuilder`, y va a crear un objeto `LeaseManagementConfig` para establecer las configuraciones, debe agregar otro parámetro denominado `applicationName` en la versión 3.x o posterior de KCL. Para obtener más información, consulte [Error de compilación con el LeaseManagementConfig constructor](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig). Recomendamos utilizar `ConfigsBuilder` para establecer las configuraciones de KCL. `ConfigsBuilder` ofrece una forma más flexible y fácil de mantener para configurar su aplicación KCL.

## Paso 4: seguir las prácticas recomendadas para la implementación del método shutdownRequested()
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x presenta una característica denominada *transferencia ágil de arrendamientos* para minimizar el reprocesamiento de datos cuando se entrega un arrendamiento a otro proceso de trabajo como parte del proceso de reasignación del arrendamiento. Ello se logra mediante el registro de puntos de control en el último número de secuencia procesado en la tabla de arrendamiento antes de la transferencia del arrendamiento. Para garantizar que el traspaso del arrendamiento se realiza correcto, debe asegurarse de invocar el objeto `checkpointer` según el método `shutdownRequested` en su clase `RecordProcessor`. Si no está invocando el objeto `checkpointer` dentro del método `shutdownRequested`, puede implementarlo como se ilustra en el siguiente ejemplo. 

**importante**  
El siguiente ejemplo de implementación es un requisito mínimo para que la transferencia del arrendamiento se realice sin contratiempos. Si es necesario, puede ampliarlo para incluir lógica adicional relacionada con los registros de puntos de control. Si va a realizar un procesamiento asíncrono, asegúrese de que todos los registros enviados a la cadena descendente se hayan procesado antes de utilizar el registro de puntos de control. 
Si bien un traspaso eficiente del arrendamiento reduce considerablemente la probabilidad de que se reprocesen los datos durante las transferencias de arrendamiento, no elimina por completo esta posibilidad. Para preservar la integridad y la coherencia de los datos, diseñe sus aplicaciones de consumo intermedias de manera que sean idempotentes. Esto significa que deberían poder gestionar el posible procesamiento de registros duplicados sin efectos adversos en el sistema en general.

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## Paso 5: revisar los requisitos previos de la KCL 3.x para recopilar las métricas de los procesos de trabajo.
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x recopila métricas de uso de la CPU, como el uso de la CPU por parte de los procesos de trabajo, para equilibrar la carga entre los procesos de trabajo de manera uniforme. Los procesos de trabajo de aplicaciones de consumo se pueden ejecutar en Amazon EC2, Amazon ECS, Amazon EKS o AWS Fargate. KCL 3.x puede recopilar métricas de uso de la CPU de los procesos de trabajo únicamente cuando se cumplan los siguientes requisitos previos:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ El sistema operativo debe ser Linux.
+ Debe habilitarlo [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)en su instancia de EC2.

 **Amazon Elastic Container Service (Amazon ECS) en Amazon EC2**
+ El sistema operativo debe ser Linux.
+ Debe habilitar la [versión 4 del punto de conexión de metadatos de tareas de ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La versión del agente de contenedor de Amazon ECS debe ser 1.39.0 o posterior.

 **Amazon ECS en AWS Fargate**
+ Debe habilitar la [versión 4 del punto de conexión de metadatos de tareas de Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). Si utiliza la versión 1.4.0 o una posterior de la plataforma Fargate, se habilitará de forma predeterminada. 
+ Versión de la plataforma de Fargate 1.4.0 o posterior.

 **Amazon Elastic Kubernetes Service (Amazon EKS) en Amazon EC2** 
+ El sistema operativo debe ser Linux.

 **Amazon EKS en AWS Fargate**
+ Plataforma de Fargate 1.3.0 o posterior.

**importante**  
Si KCL 3.x no puede recopilar las métricas de uso de la CPU de los procesos de trabajo, ya que no se cumplen los requisitos previos, reequilibrará la carga según el nivel de rendimiento por arrendamiento. Este mecanismo de reequilibrio alternativo garantizará que todos los procesos de trabajo obtengan niveles de rendimiento total similares en los arrendamientos asignados a cada proceso de trabajo. Para obtener más información, consulte [Cómo KCL asigna los arrendamientos a los procesos de trabajo y equilibra la carga](kcl-dynamoDB.md#kcl-assign-leases).

## Paso 6: actualizar los permisos de IAM para KCL 3.x
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

Debe agregar los siguientes permisos al rol o la política de IAM asociada a su aplicación de consumo de KCL 3.x. Esto implica actualizar la política de IAM existente que utiliza la aplicación KCL. Para obtener más información, consulte [Permisos de IAM necesarios para las aplicaciones de consumo de KCL](kcl-iam-permissions.md).

**importante**  
Es posible que sus aplicaciones de KCL existentes no tengan las siguientes acciones y recursos de IAM agregados en la política de IAM porque no eran necesarios en KCL 2.x. Recuerde agregarlos antes de ejecutar su aplicación de KCL 3.x:  
Acciones: `UpdateTable`  
Recursos (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName`
Acciones: `Query`  
Recursos (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
Acciones:`CreateTable`, `DescribeTable``Scan`,`GetItem`,`PutItem`,`UpdateItem`, `DeleteItem`  
Recursos (ARNs):`arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`, `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
Sustituya «región», «cuenta» y «KCLApplicationnombre» por su propio Región de AWS Cuenta de AWS número y nombre de la aplicación de KCL, respectivamente. ARNs Si utiliza configuraciones para personalizar los nombres de las tablas de metadatos creadas por KCL, utilice los nombres de tabla especificados en lugar del nombre de la aplicación de KCL.

## Paso 7: implementar el código KCL 3.x para sus procesos de trabajo
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

Tras haber establecido la configuración necesaria para la migración y haber completado todas las listas de verificación anteriores, podrá crear e implementar el código para sus procesos de trabajo.

**nota**  
Si ve un error de compilación en el `LeaseManagementConfig` constructor, consulte Error de [compilación con el LeaseManagementConfig constructor para obtener información sobre la](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig) solución de problemas.

## Paso 8: completar la migración
<a name="kcl-migration-from-2-3-finish"></a>

Durante la implementación del código KCL 3.x, KCL sigue utilizando el algoritmo de asignación de arrendamientos de KCL 2.x. Cuando haya implementado correctamente el código KCL 3.x en todos sus procesos de trabajo, KCL lo detectará automáticamente y pasará al nuevo algoritmo de asignación de arrendamientos según la utilización de los recursos de los procesos de trabajo. Para obtener más información sobre el nuevo algoritmo de asignación de arrendamiento, consulte [Cómo KCL asigna los arrendamientos a los procesos de trabajo y equilibra la carga](kcl-dynamoDB.md#kcl-assign-leases).

Durante la implementación, puede supervisar el proceso de migración con las siguientes métricas emitidas a CloudWatch. Puede supervisar las métricas de la operación `Migration`. Todas las métricas son per-KCL-application métricas y se configuran en el nivel de `SUMMARY` métrica. Si la estadística `Sum` de la métrica `CurrentState:3xWorker` coincide con el número total de procesos de trabajo de su aplicación de KCL, indica que la migración a KCL 3.x se ha completado correctamente.

**importante**  
 KCL tarda al menos 10 minutos en cambiar al nuevo algoritmo de asignación de arrendatarios una vez que todos los procesos de trabajo estén preparados para ejecutarlo.


**CloudWatch métricas para el proceso de migración a KCL**  

| Métricas | Description (Descripción) | 
| --- | --- | 
| CurrentState:3xWorker |  La cantidad de procesos de trabajo de KCL se migró correctamente a KCL 3.x y se ejecutó el nuevo algoritmo de asignación de arrendamientos. Si el recuento `Sum` de esta métrica coincide con el número total de sus procesos de trabajo, indica que la migración a KCL 3.x se ha completado correctamente. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  El número de procesos de trabajo de KCL que utilizan el modo compatible con KCL 2.x durante el proceso de migración. Un valor distinto de cero en esta métrica indica que la migración aún está en curso. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  El número de excepciones que se encontraron durante el proceso de migración. La mayoría de estas excepciones son errores transitorios, y KCL 3.x volverá a intentar completar la migración automáticamente. Si observa un valor de métrica persistente `Fault`, revise los registros del periodo de migración para seguir solucionando problemas. Si el problema persiste, ponte en contacto con Soporte. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  El estado de la creación del índice secundario global (GSI) en la tabla de arrendamiento. Esta métrica indica si se ha creado el GSI de la tabla de arrendamiento, un requisito previo para ejecutar KCL 3.x. El valor es 0 o 1, donde 1 indica que la creación se ha realizado correctamente. Durante un estado de reversión, esta métrica no se emitirá. Cuando vuelva a realizar la actualización, podrá reanudar la supervisión de esta métrica. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  Emisión de métricas del estado del proceso de trabajo por parte de todos los procesos de trabajo. Las métricas indican si todos los procesos de trabajo están emitiendo métricas como el uso de la CPU. El valor es 0 o 1, donde 1 indica que todos los procesos de trabajo están emitiendo correctamente las métricas y están preparados para el nuevo algoritmo de asignación de arrendamientos. Durante un estado de reversión, esta métrica no se emitirá. Cuando vuelva a realizar la actualización, podrá reanudar la supervisión de esta métrica. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/kcl-migration-from-2-3.html)  | 

El KCL ofrece la capacidad de reversión al modo compatible con la versión 2.x durante la migración. Si la migración a KCL 3.x se realiza correctamente, recomendamos eliminar la configuración `CoordinatorConfig.clientVersionConfig` de `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` si la reversión ya no es necesaria. Al eliminar esta configuración, se detiene la emisión de métricas relacionadas con la migración desde la aplicación KCL.

**nota**  
Recomendamos supervisar el rendimiento y la estabilidad de la aplicación durante un periodo durante la migración y una vez finalizada la migración. Si observa algún problema, puede hacer que los procesos de trabajo pasen a utilizar una funcionalidad compatible con KCL 2.x mediante la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

# Restauración de la versión de KCL anterior
<a name="kcl-migration-rollback"></a>

Este tema explica los pasos para revertir al consumidor a la versión anterior. Para poder revertir, existe un proceso de dos pasos: 

1. Ejecución de la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Nueva implementación del código de la versión de KCL anterior (opcional).

## Paso 1: ejecución de la herramienta de migración de KCL
<a name="kcl-migration-rollback-tool"></a>

Cuando necesite restaurar la versión anterior de KCL, debe ejecutar la herramienta de migración de KCL. La herramienta de migración de KCL realiza dos tareas importantes:
+ Elimina una tabla de metadatos llamada tabla de métricas de procesos de trabajo y el índice secundario global de la tabla de arrendamiento en DynamoDB. Estos dos artefactos los crea KCL 3.x, pero no son necesarios al restaurar la versión anterior.
+ Hace que todos los trabajadores funcionen en un modo compatible con KCL 2.x y comiencen a utilizar el algoritmo de equilibrio de carga utilizado en las versiones anteriores de KCL. Si tiene problemas con el nuevo algoritmo de equilibrio de carga en KCL 3.x, esto mitigará el problema inmediatamente.

**importante**  
La tabla de estados del coordinador en DynamoDB debe existir y no debe eliminarse durante el proceso de migración, restauración y avance. 

**nota**  
Es importante que todos los procesos de trabajo de la aplicación de consumo utilicen el mismo algoritmo de equilibrio de carga en un momento dado. La herramienta de migración de KCL se asegura de que todos los procesos de trabajo de la aplicación de consumo KCL 3.x cambien al modo compatible con KCL 2.x, de modo que todos los procesos de trabajo ejecuten el mismo algoritmo de equilibrio de carga durante el despliegue progresivo de su versión anterior de KCL.

[Puede descargar la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) en el directorio de scripts del repositorio de KCL. GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master) Puede ejecutar el script desde cualquiera de sus procesos de trabajo o desde cualquier host que tenga los permisos necesarios para escribir en la tabla de estado de coordinadores, eliminar la tabla de métricas de los procesos de trabajo y actualizar la tabla de arrendamiento. Puede consultar [Permisos de IAM necesarios para las aplicaciones de consumo de KCL](kcl-iam-permissions.md) para conocer los permisos de IAM necesarios para ejecutar el script. Debe ejecutar el script solo una vez por aplicación de KCL. Puede ejecutar esta herramienta de migración de KCL con el siguiente comando: 

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**Parámetros**
+ --region: reemplace `<region>` por su. Región de AWS
+ --application\$1name: este parámetro es necesario si utiliza nombres predeterminados para las tablas de metadatos de DynamoDB (tabla de arrendamiento, tabla de estado de coordinadores y tabla de métricas de procesos de trabajo). Si ha especificado nombres personalizados para estas tablas, puede omitir este parámetro. Reemplace `<applicationName>` por el nombre de la aplicación KCL. La herramienta utiliza este nombre para derivar los nombres de tabla predeterminados si no se proporcionan nombres personalizados.
+ --lease\$1table\$1name (opcional): este parámetro es necesario cuando se ha establecido un nombre personalizado para la tabla de arrendamiento en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace `leaseTableName` por el nombre de tabla personalizado que especificó para la tabla de arrendamiento.
+ --coordinator\$1state\$1table\$1name (opcional): este parámetro es necesario cuando se ha establecido un nombre personalizado para la tabla de estado de coordinadores en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace `<coordinatorStateTableName>` por el nombre de tabla personalizado que especificó para la tabla de estado de coordinadores. 
+ --worker\$1metrics\$1table\$1name (opcional): este parámetro es necesario cuando se ha establecido un nombre personalizado para la tabla de métricas de procesos de trabajo en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace `<workerMetricsTableName>` por el nombre de tabla personalizado que especificó para la tabla de métricas de procesos de trabajo. 

## Paso 2: nueva implementación del código con la versión de KCL anterior (opcional)
<a name="kcl-migration-rollback-redeploy"></a>

 Tras ejecutar la herramienta de migración de KCL para realizar una recuperación, verá uno de los siguientes mensajes:
+ **Mensaje 1:** “Se completó la reversión. Su aplicación KCL ejecutaba el modo compatible con KCL 2.x. Si no observa la migración de cualquier regresión, revierta sus archivos binarios de aplicación anteriores al implementar el código con su versión de KCL anterior”.
  + **Acción requerida:** Esto significa que sus trabajadores estaban trabajando en el modo compatible con KCL 2.x. Si el problema persiste, vuelva a implementar el código con la versión de KCL anterior en los procesos de trabajo.
+ **Mensaje 2:** “Se completó la reversión. Su aplicación KCL ejecutaba el modo de funcionalidad KCL 3.x. No es necesario volver a los archivos binarios de la aplicación anterior, a menos que no vea ninguna solución al problema en 5 minutos. Si sigue teniendo problemas, regrese a sus archivos binarios de aplicación anteriores al implementar el código con la versión de KCL anterior”.
  + **Acción necesaria:** Esto significa que sus trabajadores estaban trabajando en el modo KCL 3.x y que la herramienta de migración de KCL cambió a todos los trabajadores al modo compatible con KCL 2.x. Si el problema se resuelve, no es necesario volver a implementar el código con la versión anterior de KCL. Si el problema persiste, vuelva a implementar el código con la versión de KCL anterior en los procesos de trabajo.

 

# Avance a KCL 3.x después de una restauración
<a name="kcl-migration-rollforward"></a>

En este tema se explican los pasos para volver a KCL 3.x después de una reversión. Cuando necesite avanzar, debe realizar un proceso de dos pasos: 

1. Ejecución de la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py). 

1. Implemente el código con KCL 3.x.

## Paso 1: ejecución de la herramienta de migración de KCL
<a name="kcl-migration-rollback-tool"></a>

Ejecución de la herramienta de migración de KCL. La herramienta de migración de KCL con el siguiente comando para avanzar a KCL 3.x:

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**Parámetros**
+ --región: sustituya por la suya. `<region>` Región de AWS
+ --application\$1name: este parámetro es obligatorio si utiliza nombres predeterminados para la tabla de estado de coordinadores. Si ha especificado nombres personalizados para la tabla de estados de coordinador, puede omitir este parámetro. Reemplace `<applicationName>` por el nombre de la aplicación KCL. La herramienta utiliza este nombre para derivar los nombres de tabla predeterminados si no se proporcionan nombres personalizados.
+ --coordinator\$1state\$1table\$1name (opcional): este parámetro es necesario cuando se ha establecido un nombre personalizado para la tabla de estado de coordinadores en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace `<coordinatorStateTableName>` por el nombre de tabla personalizado que especificó para la tabla de estado de coordinadores. 

Después de ejecutar la herramienta de migración en modo de avance, KCL crea los siguientes recursos de DynamoDB necesarios para KCL 3.x:
+ Un índice secundario global en la tabla de arrendamientos
+ Una tabla de métricas de proceso de trabajo

## Paso 2: implementación del código con KCL 3.x
<a name="kcl-migration-rollback-redeploy"></a>

Después de ejecutar la herramienta de migración de KCL para una restauración, implemente el código con KCL 3.x en los procesos de trabajo. Consulte [Paso 8: completar la migración](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) para completar la migración.

# Prácticas recomendadas para la tabla de arrendamiento con el modo de capacidad aprovisionada
<a name="kcl-migration-lease-table"></a>

Si la tabla de arrendamiento de su aplicación KCL se cambió al modo de capacidad aprovisionada, KCL 3.x creará un índice secundario global en la tabla de arrendamientos con el modo de facturación aprovisionado y las mismas unidades de capacidad de lectura (RCU) y unidades de capacidad de escritura (WCU) que la tabla de arrendamiento base. Cuando se cree el índice secundario global, como recomendación, debe supervisar el uso del índice secundario global en la consola de DynamoDB y ajustar las unidades de capacidad si es necesario. Para obtener una guía más detallada sobre cómo cambiar el modo de capacidad de las tablas de metadatos de DynamoDB creadas por KCL, consulte [Modo de capacidad de DynamoDB para las tablas de metadatos creadas por KCL](kcl-dynamoDB.md#kcl-capacity-mode). 

**nota**  
Por defecto, KCL crea tablas de metadatos, como la tabla de arrendamiento, la tabla de métricas de procesos de trabajo y la tabla de estado de coordinadores, y el índice secundario global en la tabla de arrendamiento mediante el modo de capacidad bajo demanda. Recomendamos utilizar el modo de capacidad bajo demanda para ajustar automáticamente la capacidad en función de los cambios de uso. 

# Migración de KCL 1.x a KCL 3.x
<a name="kcl-migration-1-3"></a>

En este tema se explican las instrucciones para migrar a un consumidor de KCL 1.x a KCL 3.x. KCL 1.x utiliza clases e interfaces diferentes en comparación con KCL 2.x y KCL 3.x. Debe migrar primero el procesador de registros, el generador de procesadores de registros y las clases de procesos de trabajo al formato compatible con KCL 2.x y 3.x y, a continuación, seguir los pasos de migración de KCL 2.x a KCL 3.x. Puede actualizar directamente de KCL 1.x a KCL 3.x.
+ **Paso 1: migrar el procesador de grabación**

  Consulte la sección [Migración del procesador de registros](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) de la página [Migración de consumidores de KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Paso 2: Migrar la fábrica de procesadores de grabación**

  Consulte la sección [Migración del generador de procesadores de registros](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration) de la página [Migración de consumidores de KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Paso 3: Migrar al trabajador**

  Consulte la sección [Migración del proceso de trabajo](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration) de la página [Migración de consumidores de KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Paso 4: migración de la configuración de KCL 1.x**

  Consulte la sección [Configuración del cliente de Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration) en la página [Migración de consumidores de KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Paso 5: revisión de la eliminación del tiempo de inactividad y eliminación de la configuración del cliente**

  Consulte las secciones [Eliminación del tiempo de inactividad](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal) y [Eliminación de la configuración del cliente](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals) en la página [Migración de consumidores de KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Paso 6: Siga step-by-step las instrucciones de la guía de migración de KCL 2.x a KCL 3.x**

  Siga las instrucciones de la página [Migre de KCL 2.x a KCL 3.x](kcl-migration-from-2-3.md) para completar la migración. Si necesita revertir la versión anterior de KCL o actualizar a KCL 3.x después de una reversión, consulte [Restauración de la versión de KCL anterior](kcl-migration-rollback.md) y [Avance a KCL 3.x después de una restauración](kcl-migration-rollforward.md).

**importante**  
No utilice las AWS SDK para Java versiones 2.27.19 a 2.27.23 con KCL 3.x. Estas versiones cuentan con un problema que provoca un error de excepción relacionado con el uso de DynamoDB por parte de KCL. Le recomendamos que utilice la AWS SDK para Java versión 2.28.0 o posterior para evitar este problema. 

# Documentación de la versión anterior de KCL
<a name="kcl-archive"></a>

Se han archivado los siguientes temas. Para ver la documentación actual de Kinesis Client Library, consulte [Uso de Kinesis Client Library](kcl.md).

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

**Topics**
+ [Información sobre KCL 1.x y 2.x](shared-throughput-kcl-consumers.md)
+ [Desarrollar consumidores personalizados con rendimiento compartido](shared-throughput-consumers.md)
+ [Migrar consumidores de KCL 1.x a KCL 2.x](kcl-migration.md)

# Información sobre KCL 1.x y 2.x
<a name="shared-throughput-kcl-consumers"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Uno de los métodos para desarrollar aplicaciones de consumo personalizadas que puedan procesar datos de flujos de datos de KDS consiste en utilizar Kinesis Client Library (KCL).

**Topics**
+ [Acerca de KCL (versiones anteriores)](#shared-throughput-kcl-consumers-overview)
+ [Versiones anteriores de KCL](#shared-throughput-kcl-consumers-versions)
+ [Conceptos de KCL (versiones anteriores)](#shared-throughput-kcl-consumers-concepts)
+ [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](#shared-throughput-kcl-consumers-leasetable)
+ [Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java](#shared-throughput-kcl-multistream)
+ [Utilice la KCL con el registro de esquemas AWS Glue](#shared-throughput-kcl-consumers-glue-schema-registry)

**nota**  
Se recomienda actualizar a la última versión tanto KCL 1.x como KCL 2.x, según el escenario de uso. Tanto KCL 1.x como KCL 2.x se actualizan periódicamente con versiones más recientes que incluyen las últimas revisiones de dependencia y seguridad, correcciones de errores y nuevas características compatibles con versiones anteriores. [Para obtener más información, consulte https://github.com/awslabs/ amazon-kinesis-client /releases.](https://github.com/awslabs/amazon-kinesis-client/releases)

## Acerca de KCL (versiones anteriores)
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL ayuda a consumir y procesar los datos de un flujo de datos de Kinesis, ya que se encarga de muchas de las tareas complejas asociadas a la computación distribuida. Estas incluyen equilibrar la carga entre varias instancias de aplicaciones de consumo, responder a los errores de las instancias de aplicaciones de consumo, comprobar los registros procesados y reaccionar ante la repartición. KCL se encarga de todas estas subtareas para que pueda centrar sus esfuerzos en escribir una lógica de procesamiento de registros personalizada.

El KCL es diferente de los Kinesis Data APIs Streams que están disponibles en. AWS SDKs Los Kinesis Data APIs Streams le ayudan a gestionar muchos aspectos de Kinesis Data Streams, como la creación de transmisiones, la refragmentación y la creación y obtención de registros. KCL proporciona una capa de abstracción en torno a todas estas subtareas, específicamente para que pueda centrarse en la lógica de procesamiento de datos personalizada de su aplicación de consumo. Para obtener información sobre la API de Kinesis Data Streams, consulte la [referencia de la API de Amazon Kinesis](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html).

**importante**  
KCL es una biblioteca de Java. El soporte para lenguajes distintos de Java se proporciona mediante una interfaz multilingüe llamada MultiLangDaemon. Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por ejemplo, si instala el KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando instalar Java en su sistema debido a la MultiLangDaemon. Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que necesites personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información MultiLangDaemon sobre esto GitHub, consulte el [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

KCL ejerce de intermediaria entre su lógica de procesamiento de registros y Kinesis Data Streams. 

## Versiones anteriores de KCL
<a name="shared-throughput-kcl-consumers-versions"></a>

Actualmente, puede utilizar cualquiera de las siguientes versiones compatibles de KCL para crear sus aplicaciones de consumo personalizadas:
+ **KCL 1.x**

  Para obtener más información, consulte [Desarrollar consumidores de KCL 1.x](developing-consumers-with-kcl.md)
+ **KCL 2.x**

  Para obtener más información, consulte [Desarrollar consumidores de KCL 2.x](developing-consumers-with-kcl-v2.md)

Puede usar KCL 1.x o KCL 2.x para crear aplicaciones de consumo que utilicen un rendimiento compartido. Para obtener más información, consulte [Desarrollar consumidores personalizados con rendimiento compartido mediante KCL](custom-kcl-consumers.md).

Para crear aplicaciones de consumo que utilicen un rendimiento dedicado (consumidores con distribución mejorada), solo puede utilizar KCL 2.x. Para obtener más información, consulte [Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado](enhanced-consumers.md).

Para obtener información sobre las diferencias entre KCL 1.x y KCL 2.x e instrucciones sobre cómo migrar de KCL 1.x a KCL 2.x, consulte [Migrar consumidores de KCL 1.x a KCL 2.x](kcl-migration.md).

## Conceptos de KCL (versiones anteriores)
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **Aplicación para consumidores de KCL**: una aplicación creada a medida con KCL y diseñada para leer y procesar registros de flujos de datos. 
+ **Instancia de aplicación de consumo**: las aplicaciones de consumo de KCL suelen estar distribuidas y una o más instancias de aplicación se ejecutan simultáneamente para coordinar los fallos y equilibrar la carga de forma dinámica del procesamiento de registro de datos.
+ **Proceso de trabajo**: clase de alto nivel que utiliza una instancia de aplicación de consumo de KCL para empezar a procesar datos. 
**importante**  
Cada instancia de aplicación de consumo de KCL tiene un proceso de trabajo. 

  El proceso de trabajo inicializa y supervisa diversas tareas, como la sincronización de la información sobre los arrendamientos y las particiones, el seguimiento de las asignaciones de las particiones y el procesamiento de los datos de las particiones. Un trabajador proporciona a KCL la información de configuración de la aplicación de consumo, como el nombre del flujo de datos cuyos registros de datos va a procesar la aplicación de consumo de KCL y las AWS credenciales necesarias para acceder a este flujo de datos. El proceso de trabajo también pone en marcha esa instancia específica de la aplicación de consumo de KCL para entregar los registros de datos del flujo de datos a los procesadores de registros.
**importante**  
En KCL 1.x, esta clase se denomina **Proceso de trabajo**. [Para obtener más información (estos son los repositorios KCL de Java), consulte/.java. https://github.com/awslabs/ amazon-kinesis-client blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java) En KCL 2.x, esta clase se denomina **Programador**. El propósito del programador en KCL 2.x es idéntico al propósito del proceso de trabajo en KCL 1.x. [Para obtener más información sobre la clase Scheduler en KCL 2.x, consulte/.java. https://github.com/awslabs/ amazon-kinesis-client blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java) 
+ **Arrendamiento**: datos que definen el enlace entre un proceso de trabajo y una partición. Las aplicaciones de consumo distribuidas de KCL utilizan los arrendamientos para dividir el procesamiento de registros de datos entre una flota de procesos de trabajo. En un momento dado, cada partición de registros de datos está vinculada a un proceso de trabajo en particular mediante un arrendamiento identificado por la variable **leaseKey**. 

  De forma predeterminada, un trabajador puede tener uno o más contratos de arrendamiento (sujetos al valor de la variable **maxLeasesForWorker) al mismo** tiempo. 
**importante**  
Cada proceso de trabajo competirá por tener todos los arrendamientos disponibles para todas las particiones disponibles en un flujo de datos. Sin embargo, solo un proceso de trabajo podrá mantener satisfactoriamente cada arrendamiento a la vez. 

  Por ejemplo, si tiene una instancia de aplicación de consumo A con el proceso de trabajo A que procesa un flujo de datos con 4 particiones, el proceso de trabajo A puede retener los arrendamientos de las particiones 1, 2, 3 y 4 al mismo tiempo. Sin embargo, si tiene dos instancias de aplicaciones de consumo: A y B con el proceso de trabajo A y el proceso de trabajo B, y estas instancias procesan un flujo de datos con 4 particiones, el proceso de trabajo A y el proceso de trabajo B no pueden retener el arrendamiento de la partición 1 al mismo tiempo. Un proceso de trabajo retiene el arrendamiento de una partición concreta hasta que esté listo para dejar de procesar los registros de datos de esta partición o hasta que falle. Cuando un proceso de trabajo deja de ser titular del arrendamiento, otro proceso de trabajo lo acepta y lo retiene. 

  [Para obtener más información (estos son los repositorios KCL de Java), consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java para KCL 1.x y https://github.com/awslabs/amazon-kinesis-client/.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) para KCL 2.x. blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java)
+ **Tabla de arrendamiento**: una tabla exclusiva de Amazon DynamoDB que se utiliza para realizar un seguimiento de las particiones de un flujo de datos de KDS que los procesos de trabajo de la aplicación de consumo de KCL están arrendando y procesando. La tabla de arrendamiento debe permanecer sincronizada (dentro de un proceso de trabajo y entre todos los procesos de trabajo) con la información más reciente sobre las particiones del flujo de datos mientras se ejecuta la aplicación de consumo de KCL. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](#shared-throughput-kcl-consumers-leasetable).
+ **Procesador de registros**: lógica que define la forma en que su aplicación de consumo de KCL procesa los datos que obtiene de los flujos de datos. En tiempo de ejecución, una instancia de una aplicación de consumo de KCL crea una instancia de un proceso de trabajo, y este proceso de trabajo crea una instancia de un procesador de registros por cada partición que tiene en arrendamiento. 

## Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [Qué es una tabla de arrendamiento](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [Rendimiento](#shared-throughput-kcl-leasetable-throughput)
+ [Cómo se sincroniza una tabla de arrendamiento con las particiones de Kinesis Data Streams](#shared-throughput-kcl-consumers-leasetable-sync)

### Qué es una tabla de arrendamiento
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

Para cada aplicación de Amazon Kinesis Data Streams, KCL utiliza una tabla de arrendamiento única (almacenada en una tabla de Amazon DynamoDB) para realizar un seguimiento de las particiones de un flujo de datos de KDS que los procesos de trabajo de la aplicación de consumo de KCL están arrendando y procesando.

**importante**  
KCL utiliza el nombre de la aplicación de consumo para crear el nombre de la tabla de arrendamiento que utiliza esta aplicación de consumo, por lo que el nombre de cada aplicación de consumo debe ser único.

Puede consultar la tabla con la [consola de Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) mientras se ejecuta la aplicación de consumo.

Si la tabla de arrendamiento de la aplicación de consumo de KCL no existe cuando se inicia la aplicación, uno de los procesos de trabajo crea la tabla de arrendamiento para esta aplicación. 

**importante**  
 Se le realizará el cobro de los costos de su cuenta asociados a la tabla de DynamoDB, además de los costos propios asociados a Kinesis Data Streams. 

Cada fila de la tabla de arrendamiento representa una partición que procesan los procesos de trabajo de la aplicación de consumo. Si la aplicación de consumo de KCL procesa solo un flujo de datos, la `leaseKey` que es la clave hash de la tabla de arrendamiento será el identificador de la partición. Si es [Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java](#shared-throughput-kcl-multistream), la estructura de leaseKey tendrá el siguiente aspecto: `account-id:StreamName:streamCreationTimestamp:ShardId`. Por ejemplo, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

Además de la ID del fragmento, cada fila incluye también los siguientes datos:
+ **checkpoint:** el número secuencial de punto de comprobación más reciente del fragmento. Este valor es único en todas las particiones del flujo de datos.
+ **checkpointSubSequenceNúmero:** cuando se utiliza la función de agregación de la biblioteca de productores de Kinesis, se trata de una extensión del **punto de control** que rastrea los registros de los usuarios individuales dentro del registro de Kinesis.
+ **leaseCounter:** se utiliza para el control de versiones de las asignaciones, de modo que los procesos de trabajo puedan detectar que su asignación ha sido utilizada por otro proceso de trabajo.
+ **leaseKey:** un identificador único para una asignación. Cada arrendamiento es específico de una partición del flujo de datos y solo lo retiene un proceso de trabajo cada vez.
+ **leaseOwner:** el proceso de trabajo que tiene esta asignación.
+ **ownerSwitchesSincePunto de control:** cuántas veces este contrato de arrendamiento ha cambiado de trabajadores desde la última vez que se emitió un punto de control.
+ **parentShardId:** Se utiliza para garantizar que el fragmento principal se procese por completo antes de que comience el procesamiento en los fragmentos secundarios. Así, se garantiza que los registros se procesen en el mismo orden en el que se introdujeron en la secuencia.
+ **hashrange:** lo utiliza `PeriodicShardSyncManager` para ejecutar sincronizaciones periódicas para encontrar las particiones que faltan en la tabla de arrendamiento y crear arrendamientos para ellas si es necesario. 
**nota**  
Estos datos están presentes en la tabla de arrendamiento de todas las particiones a partir de KCL 1.14 y KCL 2.3. Para obtener más información sobre `PeriodicShardSyncManager` y la sincronización periódica entre los arrendamientos y las particiones, consulte [Cómo se sincroniza una tabla de arrendamiento con las particiones de Kinesis Data Streams](#shared-throughput-kcl-consumers-leasetable-sync).
+ **childshards:** lo utiliza `LeaseCleanupManager` para revisar el estado de procesamiento de la partición secundaria y decidir si la partición principal se puede eliminar de la tabla de arrendamiento.
**nota**  
Estos datos están presentes en la tabla de arrendamiento de todas las particiones a partir de KCL 1.14 y KCL 2.3.
+ **shardID:** ID de la partición.
**nota**  
Estos datos solo están presentes en la tabla de arrendamiento si es [Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java](#shared-throughput-kcl-multistream). Esto solo se admite en KCL 2.x para Java, a partir de KCL 2.3 para Java y versiones posteriores. 
+ **nombre del flujo** El identificador del flujo de datos en el siguiente formato: `account-id:StreamName:streamCreationTimestamp`.
**nota**  
Estos datos solo están presentes en la tabla de arrendamiento si se dedica al [Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java](#shared-throughput-kcl-multistream). Esto solo se admite en KCL 2.x para Java, a partir de KCL 2.3 para Java y versiones posteriores. 

### Rendimiento
<a name="shared-throughput-kcl-leasetable-throughput"></a>

Si la aplicación de Amazon Kinesis Data Streams recibe excepciones de rendimiento aprovisionado, debe aumentar el rendimiento aprovisionado para la tabla de DynamoDB. KCL crea la tabla con un rendimiento aprovisionado de 10 lecturas por segundo y 10 escrituras por segundo, pero esto podría no ser suficiente para su aplicación. Por ejemplo, si su aplicación de Amazon Kinesis Data Streams crea frecuentemente puntos de comprobación u opera en un flujo que se compone de muchas particiones, es posible que necesite más rendimiento.

Para obtener información sobre el rendimiento aprovisionado en DynamoDB, consulte [Modo de capacidad de lectura y escritura](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) y [Uso de tablas y datos](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html) en la *Guía para desarrolladores de Amazon DynamoDB*.

### Cómo se sincroniza una tabla de arrendamiento con las particiones de Kinesis Data Streams
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

Los procesos de trabajo de las aplicaciones de consumo de KCL utilizan los arrendamientos para procesar particiones de un flujo de datos determinado. La información sobre qué proceso de trabajo está arrendando cada partición en un momento dado se almacena en una tabla de arrendamiento. La tabla de arrendamiento debe permanecer sincronizada con la información más reciente sobre la partición del flujo de datos mientras se ejecuta la aplicación de consumo de KCL. KCL sincroniza la tabla de arrendamiento con la información de las particiones obtenida del servicio Kinesis Data Streams durante el arranque de la aplicación de consumo (ya sea al inicializar o reiniciar la aplicación de consumo) y también siempre que una partición que se esté procesando llegue a su fin (repartición). En otras palabras, los procesos de trabajo o una aplicación de consumo de KCL se sincronizan con el flujo de datos que están procesando durante el arranque inicial de la aplicación de consumo y siempre que la aplicación de consumo encuentra un evento de repartición del flujo de datos.

**Topics**
+ [Sincronización en KCL 1.0 a 1.13 y KCL 2.0 a 2.2](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [Sincronización en KCL 2.x, a partir de KCL 2.3 y versiones posteriores](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [Sincronización en KCL 1.x, a partir de KCL 1.14 y versiones posteriores](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### Sincronización en KCL 1.0 a 1.13 y KCL 2.0 a 2.2
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

En KCL 1.0 a 1.13 y KCL 2.0 a 2.2, durante el arranque de la aplicación de consumo y también durante cada evento de refragmentación del flujo de datos, KCL sincroniza la tabla de arrendamiento con la información de los fragmentos adquirida en el servicio Kinesis Data Streams invocando la o la detección. `ListShards` `DescribeStream` APIs En todas las versiones de KCL enumeradas anteriormente, cada empleado de una aplicación de consumo de KCL completa los siguientes pasos para realizar el proceso de lease/shard sincronización durante el arranque de la aplicación de consumo y en cada evento de refragmentación de la transmisión:
+ Obtiene todos las particiones de datos del flujo que se está procesando.
+ Obtiene todos los arrendamientos de particiones de la tabla de arrendamiento.
+ Filtra cada partición abierta que no tenga arrendamientos en la tabla de arrendamiento.
+ Repite todas las particiones abiertas encontradas y para cada partición abierta sin un elemento principal abierto:
  + Recorre el árbol jerárquico siguiendo la ruta de sus antecesores para determinar si la partición es descendiente. Una partición se considera descendiente si se está procesando una partición anterior (en la tabla de arrendamiento se indica el arrendamiento de la partición anterior) o si se debe procesar una partición anterior (por ejemplo, si la posición inicial es `TRIM_HORIZON` o `AT_TIMESTAMP`).
  + Si la partición abierta en el contexto es descendiente, KCL comprueba la partición en función de su posición inicial y crea arrendamientos para sus elementos principales, si es necesario.

#### Sincronización en KCL 2.x, a partir de KCL 2.3 y versiones posteriores
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

A partir de las últimas versiones compatibles de KCL 2.x (KCL 2.3) y versiones posteriores, la biblioteca ahora admite los siguientes cambios en el proceso de sincronización. Estos cambios de lease/shard sincronización reducen considerablemente el número de llamadas a la API que realizan las aplicaciones de consumo de KCL al servicio Kinesis Data Streams y optimizan la administración de arrendamientos en su aplicación de consumo de KCL. 
+ Durante el arranque de la aplicación, si la tabla de arrendamiento está vacía, KCL utiliza la opción de filtrado de la API `ListShard` (el parámetro de solicitud `ShardFilter` opcional) para recuperar y crear arrendamientos únicamente para una instantánea de las particiones abiertas en el momento especificado por el parámetro `ShardFilter`. El parámetro `ShardFilter` permite filtrar la respuesta de la API `ListShards`. La única propiedad obligatoria del parámetro `ShardFilter` es `Type`. KCL utiliza la propiedad de filtro `Type` y los siguientes valores válidos para identificar y devolver una instantánea de las particiones abiertas que podrían requerir nuevos arrendamientos:
  + `AT_TRIM_HORIZON`: la respuesta incluye todas las particiones que estaban abiertas en `TRIM_HORIZON`. 
  + `AT_LATEST`: la respuesta incluye solo las particiones actualmente abiertas del flujo de datos. 
  + `AT_TIMESTAMP`: la respuesta incluye todas las particiones cuya marca de tiempo de inicio es anterior o igual a la marca de tiempo dada y cuya marca de tiempo de finalización es posterior o igual que la marca de tiempo dada, o que aún están abiertas.

  `ShardFilter` se utiliza al crear arrendamientos para una tabla de arrendamiento vacía con el fin de inicializar los arrendamientos de una instantánea de las particiones especificadas en `RetrievalConfig#initialPositionInStreamExtended`.

  Para obtener más información acerca de `ShardFilter`, consulte [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ En lugar de que todos los trabajadores realicen la lease/shard sincronización para mantener la tabla de arrendamientos actualizada con los fragmentos más recientes del flujo de datos, un único líder obrero elegido realiza la sincronización entre el arrendamiento y el fragmento.
+ KCL 2.3 utiliza el parámetro de `ChildShards` retorno de `GetRecords` y el `SubscribeToShard` APIs para realizar la lease/shard sincronización que se produce en los fragmentos cerrados, lo que permite a `SHARD_END` un trabajador de KCL crear únicamente arrendamientos para los fragmentos secundarios del fragmento que ha terminado de procesar. Para compartirla en todas las aplicaciones de consumo, esta optimización de la lease/shard sincronización utiliza el parámetro de la API. `ChildShards` `GetRecords` En el caso de las aplicaciones de consumo dedicadas al rendimiento (distribución mejorada), esta optimización de la lease/shard sincronización utiliza el `ChildShards` parámetro de la `SubscribeToShard` API. Para obtener más información, consulte [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html), [SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) y [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Con los cambios anteriores, el comportamiento de KCL está pasando del modelo en el que todos los procesos de trabajo aprenden sobre todas las particiones existentes al modelo en el que los procesos de trabajo aprenden solo sobre las particiones secundarias de las particiones que son propiedad de cada proceso de trabajo. Por lo tanto, además de la sincronización que se produce durante el arranque de las aplicaciones de consumo y los eventos de refragmentación, KCL ahora también realiza shard/lease escaneos periódicos adicionales para identificar cualquier posible laguna en la tabla de arrendamientos (en otras palabras, para obtener información sobre todos los fragmentos nuevos) a fin de garantizar que se procese todo el rango de hash del flujo de datos y crear arrendamientos para ellos si es necesario. `PeriodicShardSyncManager`es el componente responsable de ejecutar escaneos periódicos. lease/shard 

  Para obtener más información sobre `PeriodicShardSyncManager` KCL 2.3, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java \$1L201](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213) -L213.

  En KCL 2.3, hay nuevas opciones de configuración disponibles para configurar `PeriodicShardSyncManager` en `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/shared-throughput-kcl-consumers.html)

  Ahora también se emiten nuevas CloudWatch métricas para monitorear el estado del. `PeriodicShardSyncManager` Para obtener más información, consulte [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ Incluye una optimización de `HierarchicalShardSyncer` para crear solo arrendamientos para una capa de particiones.

#### Sincronización en KCL 1.x, a partir de KCL 1.14 y versiones posteriores
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

A partir de las últimas versiones compatibles de KCL 1.x (KCL 1.14) y versiones posteriores, la biblioteca ahora admite los siguientes cambios en el proceso de sincronización. Estos cambios de lease/shard sincronización reducen considerablemente el número de llamadas a la API que realizan las aplicaciones de consumo de KCL al servicio Kinesis Data Streams y optimizan la administración de arrendamientos en su aplicación de consumo de KCL. 
+ Durante el arranque de la aplicación, si la tabla de arrendamiento está vacía, KCL utiliza la opción de filtrado de la API `ListShard` (el parámetro de solicitud `ShardFilter` opcional) para recuperar y crear arrendamientos únicamente para una instantánea de las particiones abiertas en el momento especificado por el parámetro `ShardFilter`. El parámetro `ShardFilter` permite filtrar la respuesta de la API `ListShards`. La única propiedad obligatoria del parámetro `ShardFilter` es `Type`. KCL utiliza la propiedad de filtro `Type` y los siguientes valores válidos para identificar y devolver una instantánea de las particiones abiertas que podrían requerir nuevos arrendamientos:
  + `AT_TRIM_HORIZON`: la respuesta incluye todas las particiones que estaban abiertas en `TRIM_HORIZON`. 
  + `AT_LATEST`: la respuesta incluye solo las particiones actualmente abiertas del flujo de datos. 
  + `AT_TIMESTAMP`: la respuesta incluye todas las particiones cuya marca de tiempo de inicio es anterior o igual a la marca de tiempo dada y cuya marca de tiempo de finalización es posterior o igual que la marca de tiempo dada, o que aún están abiertas.

  `ShardFilter` se utiliza al crear arrendamientos para una tabla de arrendamiento vacía con el fin de inicializar los arrendamientos de una instantánea de las particiones especificadas en `KinesisClientLibConfiguration#initialPositionInStreamExtended`.

  Para obtener más información acerca de `ShardFilter`, consulte [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ En lugar de que todos los trabajadores realicen la lease/shard sincronización para mantener la tabla de arrendamientos actualizada con los fragmentos más recientes del flujo de datos, un único líder obrero elegido realiza la sincronización entre el arrendamiento y el fragmento.
+ KCL 1.14 utiliza el parámetro de `ChildShards` retorno de `GetRecords` y el `SubscribeToShard` APIs para realizar la lease/shard sincronización que se produce en los fragmentos cerrados, lo que permite a `SHARD_END` un trabajador de KCL crear únicamente arrendamientos para los fragmentos secundarios del fragmento que ha terminado de procesar. Para obtener más información, consulte [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) y [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Con los cambios anteriores, el comportamiento de KCL está pasando del modelo en el que todos los procesos de trabajo aprenden sobre todas las particiones existentes al modelo en el que los procesos de trabajo aprenden solo sobre las particiones secundarias de las particiones que son propiedad de cada proceso de trabajo. Por lo tanto, además de la sincronización que se produce durante el arranque de las aplicaciones de consumo y los eventos de refragmentación, KCL ahora también realiza shard/lease escaneos periódicos adicionales para identificar cualquier posible laguna en la tabla de arrendamientos (en otras palabras, para obtener información sobre todos los fragmentos nuevos) a fin de garantizar que se procese todo el rango de hash del flujo de datos y crear arrendamientos para ellos si es necesario. `PeriodicShardSyncManager`es el componente responsable de ejecutar escaneos periódicos. lease/shard 

  Cuando `KinesisClientLibConfiguration#shardSyncStrategyType` está establecido en `ShardSyncStrategyType.SHARD_END`, `PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` se utiliza para determinar el umbral del número de exámenes consecutivos que contienen lagunas en la tabla de arrendamiento, tras lo cual se exige la sincronización de las particiones. Cuando `KinesisClientLibConfiguration#shardSyncStrategyType` se establece en `ShardSyncStrategyType.PERIODIC`, `leasesRecoveryAuditorInconsistencyConfidenceThreshold` se ignora.

  Para obtener más información sobre `PeriodicShardSyncManager` KCL 1.14, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999) \$1L987 -L999.

  En KCL 1.14, hay una nueva opción de configuración disponible para configurar `PeriodicShardSyncManager` en `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/shared-throughput-kcl-consumers.html)

  Ahora también se emiten nuevas CloudWatch métricas para monitorear el estado del. `PeriodicShardSyncManager` Para obtener más información, consulte [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ KCL 1.14 ahora también admite la limpieza de arrendamientos diferidos. `LeaseCleanupManager` elimina los arrendamientos de forma asíncrona al llegar a `SHARD_END`, cuando una partición ha caducado pasado el periodo de retención del flujo de datos o se ha cerrado como resultado de una operación de repartición.

  Están disponibles nuevas opciones de configuración para `LeaseCleanupManager`.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ Incluye una optimización de `KinesisShardSyncer` para crear solo arrendamientos para una capa de particiones.

## Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java
<a name="shared-throughput-kcl-multistream"></a>

En esta sección se describen los siguientes cambios en KCL 2.x para Java, que permiten crear aplicaciones de consumo de KCL que pueden procesar más de un flujo de datos al mismo tiempo. 

**importante**  
El procesamiento de varios flujos solo se admite en KCL 2.x para Java, a partir de KCL 2.3 para Java y versiones posteriores.   
El procesamiento de varios flujos NO es compatible con ningún otro lenguaje en el que se pueda implementar KCL 2.x.  
El procesamiento de varios flujos NO es compatible con ninguna versión de KCL 1.x.
+ **MultistreamTracker interfaz**

  Para crear una aplicación de consumo que pueda procesar múltiples transmisiones al mismo tiempo, debe implementar una nueva interfaz llamada [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Esta interfaz incluye el método `streamConfigList` que devuelve la lista de flujos de datos y sus configuraciones para que los procese la aplicación de consumo de KCL. Tenga en cuenta que los flujos de datos que se procesan pueden cambiar durante el tiempo de ejecución de la aplicación de consumo. KCL llama a `streamConfigList` periódicamente para obtener información sobre los cambios en los flujos de datos que se van a procesar.

  El `streamConfigList` método rellena la [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)lista. 

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  Tenga en cuenta que los campos `StreamIdentifier` y `InitialPositionInStreamExtended` son obligatorios, aunque `consumerArn` es opcional. Debe proporcionar `consumerArn` únicamente si utiliza KCL 2.x para implementar una aplicación de consumo con distribución mejorada.

  Para obtener más información al respecto`StreamIdentifier`, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129.](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129) Para crear un `StreamIdentifier`, le recomendamos que cree una instancia multiflujo a partir de `streamArn` y la `streamCreationEpoch` que esté disponible en la versión 2.5.0 y versiones posteriores. En las versiones KCL 2.3 y 2.4, que no son compatibles con `streamArm`, cree una instancia multiflujo con el formato `account-id:StreamName:streamCreationTimestamp`. Este formato quedará obsoleto y dejará de ser compatible a partir de la próxima versión principal.

  `MultistreamTracker` también incluye una estrategia para eliminar los arrendamientos de flujos antiguos en la tabla de arrendamiento (`formerStreamsLeasesDeletionStrategy`). Tenga en cuenta que la estrategia NO SE PUEDE cambiar durante el tiempo de ejecución de la aplicación de consumo. Para obtener más información, consulte [https://github.com/awslabs/amazon-kinesis-clientb/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy/blob/0c5042dadf794fe988438436252a5a8fe70b6b0](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java) .java
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)es una clase que abarca toda la aplicación y que puede utilizar para especificar todos los ajustes de configuración de KCL 2.x que se utilizarán al crear su aplicación de consumo de KCL. `ConfigsBuilder`la clase ahora es compatible con la interfaz. `MultistreamTracker` Puede inicializar ConfigsBuilder cualquiera de las dos con el nombre del flujo de datos desde el que se van a consumir los registros:

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  O bien, puede inicializarlo ConfigsBuilder `MultiStreamTracker` si desea implementar una aplicación de consumo de KCL que procese varios flujos al mismo tiempo.

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ Con la compatibilidad con varios flujos implementada para la aplicación de consumo de KCL, cada fila de la tabla de arrendamiento de la aplicación ahora contiene el ID de la partición y el nombre del flujo de los varios flujos de datos que procesa esta aplicación. 
+ Cuando se implementa la compatibilidad con varios flujos para la aplicación de consumo de KCL, leaseKey adopta la siguiente estructura: `account-id:StreamName:streamCreationTimestamp:ShardId`. Por ejemplo, `111111111:multiStreamTest-1:12345:shardId-000000000336`.
**importante**  
Cuando la aplicación de consumo de KCL existente está configurada para procesar solo un flujo de datos, leaseKey (que es la clave hash de la tabla de arrendamiento) es el ID de la partición. Si vuelve a configurar esta aplicación de consumo de KCL existente para procesar varios flujos de datos, se rompe la tabla de arrendamiento, ya que con la compatibilidad con varios flujos, la estructura de leaseKey debe ser la siguiente: `account-id:StreamName:StreamCreationTimestamp:ShardId`.

## Utilice la KCL con el registro de esquemas AWS Glue
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

Puede integrar sus flujos de datos de Kinesis con el registro de AWS Glue esquemas. AWS Glue Schema Registry le permite descubrir, controlar y evolucionar de forma centralizada esquemas, además de garantizar que un esquema registrado valide de forma continua los datos generados. Un esquema define la estructura y el formato de un registro de datos. Un esquema es una especificación versionada para publicación, consumo o almacenamiento de confianza de datos. El registro AWS Glue de esquemas le permite mejorar la end-to-end calidad y el gobierno de los datos en sus aplicaciones de streaming. Para obtener más información, consulte [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Una de las formas de configurar esta integración es a través de KCL en Java. 

**importante**  
Actualmente, la integración de Kinesis Data Streams AWS Glue y Schema Registry solo se admite para las transmisiones de datos de Kinesis que utilizan consumidores de KCL 2.3 implementados en Java. No se proporciona soporte multilingüe. No se admiten las aplicaciones de consumo de KCL 1.0. No se admiten las aplicaciones de consumo de KCL 2.x anteriores a KCL 2.3.

Para obtener instrucciones detalladas sobre cómo configurar la integración de Kinesis Data Streams con Schema Registry mediante la KCL, consulte la sección «Interacción con los datos mediante KPL/KCL las bibliotecas» [en Caso de uso: integración de Amazon Kinesis Data Streams con AWS el](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) registro de esquemas de Glue.

# Desarrollar consumidores personalizados con rendimiento compartido
<a name="shared-throughput-consumers"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de enero de end-of-support 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Si no necesita un rendimiento específico al recibir datos de Kinesis Data Streams y tampoco necesita demoras de propagación de lectura inferiores a 200 ms, puede crear aplicaciones consumidoras tal y como se describe en los temas siguientes. Puede utilizar Kinesis Client Library (KCL) o Kinesis Client Library (KCL) o AWS SDK para Java.

**Topics**
+ [Desarrollar consumidores personalizados con rendimiento compartido mediante KCL](custom-kcl-consumers.md)

Para obtener más información sobre cómo crear consumidores que puedan recibir registros de flujos de datos de Kinesis con un rendimiento específico, consulte [Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado](enhanced-consumers.md).

# Desarrollar consumidores personalizados con rendimiento compartido mediante KCL
<a name="custom-kcl-consumers"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Uno de los métodos para desarrollar una aplicación de consumo personalizada con un rendimiento compartido consiste en utilizar Kinesis Client Library (KCL). 

Elegir uno de los siguientes temas para la versión de KCL que utilice.

**Topics**
+ [Desarrollar consumidores de KCL 1.x](developing-consumers-with-kcl.md)
+ [Desarrollar consumidores de KCL 2.x](developing-consumers-with-kcl-v2.md)

# Desarrollar consumidores de KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede desarrollar una aplicación de consumo para Amazon Kinesis Data Streams mediante Kinesis Client Library (KCL). 

Para obtener más información acerca de, consulte [Acerca de KCL (versiones anteriores)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Elija uno de los siguientes temas según la opción que desee utilizar.

**Topics**
+ [Desarrollar un consumidor de Kinesis Client Library en Java](kinesis-record-processor-implementation-app-java.md)
+ [Desarrollar un consumidor de Kinesis Client Library en Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Desarrollar un consumidor de Kinesis Client Library en .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Desarrollar un consumidor de Kinesis Client Library en Python](kinesis-record-processor-implementation-app-py.md)
+ [Desarrollar un consumidor de Kinesis Client Library en Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Desarrollar un consumidor de Kinesis Client Library en Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Java. Para ver la referencia sobre Javadoc, consulte el tema sobre [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) de Class. AmazonKinesisClient

Para descargar el KCL de Java GitHub, vaya a la [biblioteca de clientes de Kinesis (](https://github.com/awslabs/amazon-kinesis-client)Java). Para localizar KCL para Java en Apache Maven, vaya a la página de [resultados de la búsqueda de KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Para descargar un código de muestra para una aplicación de consumo de Java KCL desde GitHub, visite la página del proyecto de [ejemplo de KCL para Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) en. GitHub 

La aplicación de muestra utiliza [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). Puede cambiar la configuración de registro en el método estático `configure` definido en el archivo `AmazonKinesisApplicationSample.java`. *Para obtener más información sobre cómo utilizar el registro de Apache Commons con Log4j y aplicaciones AWS Java, consulte [Registrar con Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) en la guía para desarrolladores.AWS SDK para Java *

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Java:

**Topics**
+ [Implemente los métodos del procesador IRecord](#kinesis-record-processor-implementation-interface-java)
+ [Implemente una fábrica de clases para la interfaz IRecord del procesador](#kinesis-record-processor-implementation-factory-java)
+ [Crear un proceso de trabajo](#kcl-java-worker)
+ [Modificar las propiedades de configuración](#kinesis-record-processor-initialization-java)
+ [Migrar a la versión 2 de la interfaz del procesador de registros](#kcl-java-v2-migration)

## Implemente los métodos del procesador IRecord
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL es compatible actualmente con dos versiones de la interfaz de `IRecordProcessor`: la interfaz original está disponible con la primera versión de KCL y la versión 2 está disponible a partir de la versión 1.5.0 de KCL. Ambas interfaces son totalmente compatibles. La elección dependerá de su situación específica. Consulte sus javadocs locales o el código fuente para ver todas las diferencias. En las siguientes secciones se describe la implementación mínima introductoria.

**Topics**
+ [Interfaz original (versión 1)](#kcl-java-interface-original)
+ [Interfaz actualizada (versión 2)](#kcl-java-interface-v2)

### Interfaz original (versión 1)
<a name="kcl-java-interface-original"></a>

La interfaz original `IRecordProcessor` (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) expone los siguientes métodos del procesador de registros que el consumidor debe implementar. En la muestra se presentan implementaciones que puede utilizar como punto de partida (consulte `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**inicializar**  
KCL llama al método `initialize` cuando se crea una instancia del procesador de registros y pasa un ID de partición específico como parámetro. Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por más de un proceso de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
KCL llama al método `processRecords` y pasa una lista de registros de datos desde la partición especificada por el método `initialize(shardId)`. El procesador de registros procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. La clase `Record` expone los siguientes métodos que proporcionan acceso a los datos, el número secuencial y la clave de partición del registro. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

En el ejemplo, el método privado `processRecordsWithRetries` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento pasando un generador de puntos de verificación (`IRecordProcessorCheckpointer`) a `processRecords`. El procesador de registros llama al método `checkpoint` en esta interfaz para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a `checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a `checkpoint` después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `checkpoint` en cada llamada a `processRecords`. Un procesador podría, por ejemplo, llamar a `checkpoint` cada tercera vez que llame a `processRecords`. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `checkpoint`. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado `checkpoint` muestra cómo llamar a `IRecordProcessorCheckpointer.checkpoint` mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL depende de `processRecords` para administrar cualquier excepción que surja del procesamiento de los registros de datos. Si `processRecords` genera una excepción, KCL omite los registros de datos que se pasaron antes de la excepción. Es decir, estos registros no se reenviarán al procesador de registros que generó la excepción ni a ningún otro procesador de registros en el consumidor.

**shutdown**  
KCL llama al método `shutdown` cuando finaliza el procesamiento (el motivo del cierre es `TERMINATE`) o cuando el proceso de trabajo ya no responde (el motivo del cierre es `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa una interfaz `IRecordProcessorCheckpointer` a `shutdown`. Si el motivo del shutdown es `TERMINATE`, el procesador de registros debería terminar de procesar los registros de datos y llamar al método `checkpoint` en esta interfaz.

### Interfaz actualizada (versión 2)
<a name="kcl-java-interface-v2"></a>

La interfaz actualizada `IRecordProcessor` (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) expone los siguientes métodos del procesador de registros que el consumidor debe implementar: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Se puede obtener acceso a todos los argumentos de la versión original de la interfaz mediante métodos "get" en los objetos del contenedor. Por ejemplo, para recuperar la lista de registros en `processRecords()`, puede utilizar `processRecordsInput.getRecords()`.

A partir de la versión 2 de esta interfaz (KCL 1.5.0 y posteriores), están disponibles las siguientes entradas nuevas, además de las entradas proporcionadas por la interfaz original:

starting sequence number  
En el objeto `InitializationInput` que se pasa a la operación `initialize()` el número secuencial inicial a partir del cual se facilitan los registros a la instancia del procesador de registros. Este es el último número secuencial objeto de un punto de comprobación por parte de la instancia del procesador de registros que procesara anteriormente el mismo fragmento. Estos datos se ofrecen por si su aplicación necesitara esta información. 

pending checkpoint sequence number  
En el objeto `InitializationInput` que se pasa a la operación `initialize()`, el número secuencial pendiente de punto de comprobación (si hay alguno) que no se ha podido confirmar antes de que se detuviera la instancia anterior del procesador de registros.

## Implemente una fábrica de clases para la interfaz IRecord del procesador
<a name="kinesis-record-processor-implementation-factory-java"></a>

También necesitará implementar un generador para la clase que implementa los métodos del procesador de registros. Cuando el consumidor crea instancias del proceso de trabajo, pasa una referencia a este generador.

La muestra implementa el generador de clases en el archivo `AmazonKinesisApplicationSampleRecordProcessorFactory.java` mediante la interfaz del procesador de registros original. Si desea que el generador de clases cree procesadores de registros de la versión 2, utilice el nombre de paquete `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Crear un proceso de trabajo
<a name="kcl-java-worker"></a>

Tal y como se ha explicado en [Implemente los métodos del procesador IRecord](#kinesis-record-processor-implementation-interface-java), hay dos versiones de la interfaz de procesador de registros de KCL para elegir, lo que afecta a la creación de un proceso de trabajo. La interfaz de procesador de registros original utiliza la siguiente estructura de código para crear un proceso de trabajo:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Con la versión 2 del procesador de registros, puede utilizar la `Worker.Builder` para crear un proceso de trabajo sin preocuparse por qué constructor utilizar ni por el orden de los argumentos. La interfaz de procesador de registros actualizada utiliza la siguiente estructura de código para crear un proceso de trabajo:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-java"></a>

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Este datos de configuración del proceso de trabajo se consolidan posteriormente en un objeto `KinesisClientLibConfiguration`. Este objeto y una referencia al generador de clases de `IRecordProcessor` se pasan en la llamada que crea la instancia del proceso de trabajo. Puede sobrescribir cualquiera de estas propiedades con sus propios valores a través de un archivo de propiedades de Java (consulte `AmazonKinesisApplicationSample.java`).

### Nombre de la aplicación
<a name="configuration-property-application-name"></a>

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-cred-java"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Por ejemplo, si ejecuta su consumidor en una instancia de EC2, se recomienda que lance la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para un consumidor que se ejecute en una instancia EC2.

En primer lugar, la aplicación de muestra intenta recuperar las credenciales de IAM de los metadatos de la instancia: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Si la aplicación de muestra no puede obtener credenciales de los metadatos de la instancia, intenta recuperar las credenciales desde un archivo de propiedades:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Para más información sobre los metadatos de instancia, consulte [Metadatos de instancia](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) en la *Guía del usuario de Amazon EC2*.

### Uso de ID de proceso de trabajo para varias instancias
<a name="kinesis-record-processor-workerid-java"></a>

El código de inicialización de muestra crea un ID para el proceso de trabajo, `workerId`, con el nombre del equipo local y un identificador global único anexo, tal y como se muestra en el siguiente fragmento de código. Este enfoque es compatible con un escenario con varias instancias de la aplicación consumidora ejecutándose en un único equipo.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrar a la versión 2 de la interfaz del procesador de registros
<a name="kcl-java-v2-migration"></a>

Si desea migrar código que utilice la interfaz original, además de los pasos descritos anteriormente, tendrá que seguir estos pasos:

1. Cambie la clase de su procesador de registros para importar la versión 2 de la interfaz del procesador de registros:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Cambiar las referencias a las entradas para usar métodos `get` en los objetos del contenedor. Por ejemplo, en la operación `shutdown()`, cambie "`checkpointer`" por "`shutdownInput.getCheckpointer()`".

1. Cambie la clase del generador de procesadores de registros para importar la interfaz del generador de procesadores de registros de la versión 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Cambie la construcción del proceso de trabajo para usar `Worker.Builder`. Por ejemplo:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Desarrollar un consumidor de Kinesis Client Library en Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Node.js

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Node.js y escribe su aplicación para consumidores completamente en Node.js, seguirá necesitando instalar Java en su sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar el archivo KCL de Node.js GitHub, vaya a la [biblioteca de clientes de Kinesis (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs).

**Descargas de código de muestra**

Hay dos códigos de muestra disponibles para la KCL en Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Se utiliza en las siguientes secciones para ilustrar los aspectos fundamentales de la creación de una aplicación de consumo de KCL en Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Es ligeramente más avanzado y utiliza una situación real. Para cuando se haya familiarizado con el código de muestra básico. Esta muestra no se trata aquí, pero tiene un archivo README con más información.

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Node.js:

**Topics**
+ [Implementar el procesador de registros](#kinesis-record-processor-implementation-interface-nodejs)
+ [Modificar las propiedades de configuración](#kinesis-record-processor-initialization-nodejs)

## Implementar el procesador de registros
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

El consumidor más sencillo posible que utilice KCL para Node.js debe implementar una función `recordProcessor`, que a su vez contenga las funciones `initialize`, `processRecords` y `shutdown`. En la muestra se presenta una implementación que puede utilizar como punto de partida (consulte `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**inicializar**  
KCL llama a la función `initialize` cuando se inicia el procesador de registros. Este procesador de registros procesa solo la ID de fragmento que se haya pasado como `initializeInput.shardId`, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por varios procesos de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL llama a esta función con una entrada que contiene una lista de registros de datos de la partición especificados para la función `initialize`. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

Además de los datos en sí, el registro también contiene un número secuencial y una clave de partición, que el proceso de trabajo puede utilizar al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. El diccionario `record` expone los siguientes pares clave-valor para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

```
record.data
record.sequenceNumber
record.partitionKey
```

Tenga en cuenta que los datos se codifican en Base64.

En el ejemplo básico, la función `processRecords` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento con un objeto `checkpointer` que pasa como `processRecordsInput.checkpointer`. Su procesador de registros llama a la función `checkpointer.checkpoint` para informar a KCL de su avance en el procesamiento de los registros de la partición. En el caso de que se produzca un error en el proceso de trabajo, KCL utiliza esta información al reiniciar el procesamiento de la partición para continuar desde el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no pasa el número secuencial a la función `checkpoint`, KCL supone que la llamada a `checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros debería llamar a `checkpoint` **solo** después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `checkpoint` en cada llamada a `processRecords`. Un procesador podría, por ejemplo, realizar una llamada `checkpoint` una de cada tres veces o algún evento externo a su procesador de registros, como un verification/validation servicio personalizado que haya implementado. 

Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `checkpoint`. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

La aplicación de muestra básica muestra la llamada más sencilla posible a la función `checkpointer.checkpoint`. Puede agregar otra lógica de creación de puntos de comprobación que necesite para su consumidor en este punto en la función.

**shutdown**  
KCL llama a la función `shutdown` cuando finaliza el procesamiento (`shutdownInput.reason` es `TERMINATE`) o cuando el proceso de trabajo ya no responde (`shutdownInput.reason` es `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa un objeto `shutdownInput.checkpointer` a `shutdown`. Si el motivo del cierre es `TERMINATE`, debe asegurarse de que el procesador de registros haya terminado de procesar los registros de datos y después llame a la función `checkpoint` en esta interfaz.

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-nodejs"></a>

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte `sample.properties` en el ejemplo básico).

### Nombre de la aplicación
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL requiere una aplicación que sea única entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-credentials-nodejs"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad `AWSCredentialsProvider` para configurar un proveedor de credenciales. El archivo `sample.properties` debe poner sus credenciales a disposición de uno de los proveedores de credenciales de la [cadena de proveedores de credenciales predeterminada](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si ejecuta su consumidor en una instancia de Amazon EC2, le recomendamos que configure la instancia con un rol de IAM. AWS Las credenciales que reflejan los permisos asociados a esta función de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de la instancia. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.

En el siguiente ejemplo, se configura KCL para procesar un flujo de datos de Kinesis denominado `kclnodejssample` utilizando el procesador de registros facilitado en `sample_kcl_app.js`:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Desarrollar un consumidor de Kinesis Client Library en .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de .NET.

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala la KCL para.NET y escribe la aplicación para el usuario en su totalidad en .NET, seguirá necesitando instalar Java en el sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar el KCL de.NET GitHub, vaya a la [biblioteca de clientes de Kinesis (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Para descargar un código de muestra para una aplicación de consumo de KCL de.NET, visite la página de [ejemplos de proyectos de consumo de KCL para .NET](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) en. GitHub

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en .NET:

**Topics**
+ [Implemente los métodos de la clase IRecord Processor](#kinesis-record-processor-implementation-interface-dotnet)
+ [Modificar las propiedades de configuración](#kinesis-record-processor-initialization-dotnet)

## Implemente los métodos de la clase IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

El consumidor debe implementar los siguientes métodos para `IRecordProcessor`. En el consumidor de muestra se presentan implementaciones que puede utilizar como punto de partida (consulte la clase `SampleRecordProcessor` en `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Initialize**  
KCL llama a este método cuando se crea una instancia del procesador de registros y pasa un ID de partición específico en el parámetro `input` (`input.ShardId`). Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por varios procesos de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
KCL llama a este método y pasa una lista de registros de datos en el parámetro `input` (`input.Records`) desde la partición especificada por el método `Initialize`. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. La clase `Record` expone lo siguiente para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

En el ejemplo, el método `ProcessRecordsWithRetries` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento, pasando un objeto `Checkpointer` a `ProcessRecords` (`input.Checkpointer`). El procesador de registros llama al método `Checkpointer.Checkpoint` para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `Checkpointer.Checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a `Checkpointer.Checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a `Checkpointer.Checkpoint` después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `Checkpointer.Checkpoint` en cada llamada a `ProcessRecords`. Un procesador podría, por ejemplo, llamar a `Checkpointer.Checkpoint` en cada tercera o cuarta llamada. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `Checkpointer.Checkpoint`. En este caso, KCL supone que los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado `Checkpoint(Checkpointer checkpointer)` muestra cómo llamar al método `Checkpointer.Checkpoint` mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL para .NET administra las excepciones de forma diferente a las bibliotecas de KCL para el resto de lenguajes, ya que no administra las excepciones que surgen del procesamiento de los registros de datos. Las excepciones no detectadas procedentes del código del usuario harán que el programa se bloquee.

**Apagado**  
KCL llama al método `Shutdown` cuando finaliza el procesamiento (el motivo del cierre es `TERMINATE`) o cuando el proceso de trabajo ya no responde (el valor de `input.Reason` del cierre es `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa un objeto `Checkpointer` a `shutdown`. Si el motivo del shutdown es `TERMINATE`, el procesador de registros debería terminar de procesar los registros de datos y llamar al método `checkpoint` en esta interfaz.

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-dotnet"></a>

En el consumidor muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte `SampleConsumer/kcl.properties`).

### Nombre de la aplicación
<a name="modify-kinesis-record-processor-application-name"></a>

KCL requiere una aplicación que sea única entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-creds-dotnet"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad `AWSCredentialsProvider` para configurar un proveedor de credenciales. Las [propiedades de muestra](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) deben poner sus credenciales a disposición de uno de los proveedores de credenciales de la [cadena de proveedores de credenciales predeterminada](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si ejecuta su aplicación de consumo en una instancia de EC2, se recomienda que configure la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para un consumidor que se ejecute en una instancia EC2.

El archivo de propiedades de ejemplo configura KCL para procesar un flujo de datos de Kinesis llamado “words” utilizando el procesador de registros facilitado en `AmazonKinesisSampleConsumer.cs`. 

# Desarrollar un consumidor de Kinesis Client Library en Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Python.

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando instalar Java en su sistema debido a la MultiLangDaemon. Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar la KCL de Python GitHub, vaya a la biblioteca de [clientes de Kinesis (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Para descargar un código de muestra para una aplicación de consumo de KCL para Python, vaya a la página del [proyecto de ejemplo de KCL para Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) en. GitHub

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Python:

**Topics**
+ [Implementa los métodos de la clase RecordProcessor](#kinesis-record-processor-implementation-interface-py)
+ [Modificar las propiedades de configuración](#kinesis-record-processor-initialization-py)

## Implementa los métodos de la clase RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

La clase `RecordProcess` debe ampliar la `RecordProcessorBase` para implementar los siguientes métodos. En la muestra se presentan implementaciones que puede utilizar como punto de partida (consulte `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**inicializar**  
KCL llama al método `initialize` cuando se crea una instancia del procesador de registros y pasa un ID de partición específico como parámetro. Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por más de un proceso de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL llama a este método y pasa una lista de registros de datos de la partición especificada por el método `initialize`. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. El diccionario `record` expone los siguientes pares clave-valor para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Tenga en cuenta que los datos se codifican en Base64.

En el ejemplo, el método `process_records` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento, pasando un objeto `Checkpointer` a `process_records`. El procesador de registros llama al método `checkpoint` en este objeto para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a `checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a `checkpoint` después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `checkpoint` en cada llamada a `process_records`. Un procesador podría, por ejemplo, llamar a `checkpoint` cada tercera vez que llame. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `checkpoint`. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado `checkpoint` muestra cómo llamar al método `Checkpointer.checkpoint` mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL depende de `process_records` para administrar cualquier excepción que surja del procesamiento de los registros de datos. Si `process_records` genera una excepción, KCL omite los registros de datos que se pasaron a `process_records` antes de la excepción. Es decir, estos registros no se reenviarán al procesador de registros que generó la excepción ni a ningún otro procesador de registros en el consumidor.

**shutdown**  
 KCL llama al método `shutdown` cuando finaliza el procesamiento (el motivo del cierre es `TERMINATE`) o cuando el proceso de trabajo ya no responde (el `reason` del cierre es `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

 KCL también pasa un objeto `Checkpointer` a `shutdown`. Si el `reason` del shutdown es `TERMINATE`, el procesador de registros debería terminar de procesar los registros de datos y llamar al método `checkpoint` en esta interfaz.

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-py"></a>

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte `sample.properties`).

### Nombre de la aplicación
<a name="kinesis-record-processor-application-name-py"></a>

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se supone que los procesos de trabajo que están asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-creds-py"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad `AWSCredentialsProvider` para configurar un proveedor de credenciales. Las [propiedades de muestra](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) deben poner sus credenciales a disposición de uno de los proveedores de credenciales de la [cadena de proveedores de credenciales predeterminada](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si ejecuta su aplicación de consumo en una instancia de Amazon EC2, se recomienda que configure la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.

El archivo de propiedades de ejemplo configura KCL para procesar un flujo de datos de Kinesis llamado “words” utilizando el procesador de registros facilitado en `sample_kclpy_app.py`. 

# Desarrollar un consumidor de Kinesis Client Library en Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Ruby.

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Ruby y escribe su aplicación para consumidores completamente en Ruby, seguirá necesitando instalar Java en su sistema debido a la. MultiLangDaemon Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar el KCL de Ruby GitHub, vaya a la [biblioteca de clientes de Kinesis (](https://github.com/awslabs/amazon-kinesis-client-ruby)Ruby). Para descargar un código de muestra para una aplicación de usuario de Ruby KCL, visite la página del proyecto de [ejemplo de KCL para Ruby](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) en. GitHub

Para más información acerca de la biblioteca de soporte de Ruby en KCL, consulte la [documentación de Ruby Gems en KCL](http://www.rubydoc.info/gems/aws-kclrb).

# Desarrollar consumidores de KCL 2.x
<a name="developing-consumers-with-kcl-v2"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

En este tema se muestra cómo utilizar la versión 2.0 de Kinesis Client Library (KCL). 

Para más información acerca de KCL, consulte la información general que se proporciona en [Desarrollo de consumidores mediante Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Elija uno de los siguientes temas según la opción que desee utilizar.

**Topics**
+ [Desarrollar un consumidor de Kinesis Client Library en Java](kcl2-standard-consumer-java-example.md)
+ [Desarrollar un consumidor de Kinesis Client Library en Python](kcl2-standard-consumer-python-example.md)
+ [Desarrollar consumidores de distribución mejorados con KCL 2.x](building-enhanced-consumers-kcl-retired.md)

# Desarrollar un consumidor de Kinesis Client Library en Java
<a name="kcl2-standard-consumer-java-example"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

En el siguiente código, se muestra un ejemplo de implementación en Java de `ProcessorFactory` y `RecordProcessor`. Si desea aprovechar la característica de distribución ramificada mejorada, consulte [Uso de consumidores con la distribución ramificada mejorada](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Desarrollar un consumidor de Kinesis Client Library en Python
<a name="kcl2-standard-consumer-python-example"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Python.

La KCL es una biblioteca de Java; el soporte para otros lenguajes además de Java se proporciona mediante una interfaz multilingüe llamada. *MultiLangDaemon* Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por lo tanto, si instala el KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando instalar Java en su sistema debido a la MultiLangDaemon. Además, MultiLangDaemon tiene algunos ajustes predeterminados que puede que tengas que personalizar para tu caso de uso, por ejemplo, la AWS región a la que se conecta. Para obtener más información sobre MultiLangDaemon esto GitHub, visita la página del [ MultiLangDaemon proyecto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para descargar la KCL de Python GitHub, vaya a la biblioteca de [clientes de Kinesis (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Para descargar un código de muestra para una aplicación de consumo de KCL para Python, vaya a la página del [proyecto de ejemplo de KCL para Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) en. GitHub

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Python:

**Topics**
+ [Implementa los métodos de la clase RecordProcessor](#kinesis-record-processor-implementation-interface-py)
+ [Modificar las propiedades de configuración](#kinesis-record-processor-initialization-py)

## Implementa los métodos de la clase RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

La clase `RecordProcess` debe ampliar la `RecordProcessorBase` para implementar los siguientes métodos:

```
initialize
process_records
shutdown_requested
```

Este ejemplo proporciona implementaciones que puede utilizar como punto de partida.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-py"></a>

En el ejemplo se proporcionan valores predeterminados para las propiedades de configuración, como los que se muestran en el siguiente script. Puede sobrescribir cualquiera de estas propiedades con sus propios valores.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Nombre de la aplicación
<a name="kinesis-record-processor-application-name-py"></a>

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se supone que los procesos de trabajo que están asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse entre varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Credenciales
<a name="kinesis-record-processor-creds-py"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de [credenciales predeterminada](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Puede usar la propiedad `AWSCredentialsProvider` para configurar un proveedor de credenciales. Si ejecuta su aplicación de consumidor en una instancia de Amazon EC2, le recomendamos que configure la instancia con un rol de IAM. AWS Las credenciales que reflejan los permisos asociados a esta función de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de la instancia. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.

# Desarrollar consumidores de distribución mejorados con KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Los consumidores que utilizan la *distribución ramificada mejorada* en Amazon Kinesis Data Streams pueden recibir los registros de un flujo de datos con un rendimiento dedicado de hasta 2 MB de datos por segundo por partición. Este tipo de consumidor no tiene que competir con otros consumidores que reciben datos de la secuencia. Para obtener más información, consulte [Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado](enhanced-consumers.md).

Puede utilizar la versión 2.0 o posterior de Kinesis Client Library (KCL) para desarrollar aplicaciones que utilicen la distribución ramificada mejorada para recibir datos de los flujos. El KCL suscribe automáticamente su aplicación a todos los fragmentos de una transmisión y garantiza que su aplicación de consumo pueda leer con un valor de rendimiento de 2 por fragmento. MB/sec Si quiere utilizar KCL sin activar la distribución ramificada mejorada, consulte la página sobre [desarrollo de consumidores mediante Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Desarrollar consumidores de distribución mejorados con KCL 2.x en Java](building-enhanced-consumers-kcl-java.md)

# Desarrollar consumidores de distribución mejorados con KCL 2.x en Java
<a name="building-enhanced-consumers-kcl-java"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x llegará el 30 de enero de 2026. end-of-support **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar la versión 2.0 o posterior de Kinesis Client Library (KCL) para desarrollar aplicaciones en Amazon Kinesis Data Streams para recibir datos de los flujos mediante la distribución ramificada mejorada. En el siguiente código, se muestra un ejemplo de implementación en Java de `ProcessorFactory` y `RecordProcessor`.

Es recomendable que utilice `KinesisClientUtil` para crear `KinesisAsyncClient` y para establecer `maxConcurrency` en `KinesisAsyncClient`.

**importante**  
El cliente de Amazon Kinesis puede experimentar un importante aumento de la latencia, a menos que configure `KinesisAsyncClient` de forma que el valor de `maxConcurrency` sea lo suficientemente alto para permitir todas las asignaciones, además de los usos adicionales de `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Migrar consumidores de KCL 1.x a KCL 2.x
<a name="kcl-migration"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

En este tema se explican las diferencias entre las versiones 1.x y 2.x de Kinesis Client Library (KCL). También le muestra cómo migrar el consumidor de la versión 1.x a la versión 2.x de KCL. Después de migrar su cliente, comenzará el procesamiento de registros a partir de la última ubicación del punto de comprobación.

La versión 2.0 de KCL presenta los siguientes cambios en la interfaz:


**Cambios en la interfaz de KCL**  

| Interfaz de KCL 1.x | Interfaz de KCL 2.0 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Plegado en software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [Migrar el procesador de registros](#recrod-processor-migration)
+ [Migrar el generador de procesadores de registros](#recrod-processor-factory-migration)
+ [Migrar el proceso de trabajo](#worker-migration)
+ [Configurar el cliente de Amazon Kinesis](#client-configuration)
+ [Eliminar el tiempo de inactividad](#idle-time-removal)
+ [Eliminar la configuración de clientes](#client-configuration-removals)

## Migrar el procesador de registros
<a name="recrod-processor-migration"></a>

En el siguiente ejemplo, se muestra un procesador de registros implementado para la versión 1.x de KCL:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Para migrar la clase del procesador de registros**

1. Cambie las interfaces de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` y `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` a `software.amazon.kinesis.processor.ShardRecordProcessor`, tal y como se indica a continuación:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Actualice las instrucciones `import` para los métodos `initialize` y `processRecords`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Sustituya el método `shutdown` por los métodos nuevos siguientes: `leaseLost`, `shardEnded` y `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

A continuación, se muestra la versión actualizada de la clase del procesador de registros.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Migrar el generador de procesadores de registros
<a name="recrod-processor-factory-migration"></a>

El generador de procesadores de registros es responsable de la creación de procesadores de registros cuando se adquiere una asignación. A continuación, se muestra un ejemplo de un generador de la versión 1.x de KCL.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**Para migrar el generador de procesadores de registros**

1. Cambie la interfaz implementada de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, tal y como se indica a continuación:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Cambie la firma de retorno de `createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

A continuación, se muestra un ejemplo de generador de procesadores de registros de la versión 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrar el proceso de trabajo
<a name="worker-migration"></a>

En la versión 2.0 de KCL, una nueva clase, llamada `Scheduler`, sustituye a la clase `Worker`. A continuación, se muestra un ejemplo de proceso de trabajo de la versión 1.x de KCL.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**Para migrar el proceso de trabajo**

1. Cambie la instrucción `import` para la clase `Worker` por las instrucciones de importación para las clases `Scheduler` y `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Cree un `ConfigsBuilder` y un `Scheduler` como se muestra en el ejemplo siguiente.

   Es recomendable que utilice `KinesisClientUtil` para crear `KinesisAsyncClient` y para establecer `maxConcurrency` en `KinesisAsyncClient`.
**importante**  
El cliente de Amazon Kinesis puede experimentar un importante aumento de la latencia, a menos que configure `KinesisAsyncClient` de forma que el valor de `maxConcurrency` sea lo suficientemente alto para permitir todas las asignaciones, además de los usos adicionales de `KinesisAsyncClient`.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Configurar el cliente de Amazon Kinesis
<a name="client-configuration"></a>

Con el lanzamiento de la versión 2.0 de Kinesis Client Library, la configuración del cliente pasa de tener una única clase de configuración (`KinesisClientLibConfiguration`) a tener seis clases de configuración. En la tabla siguiente, se describe la migración.


**Campos de configuración y sus clases nuevas**  

| Campo original | Clase de configuración nueva | Description (Descripción) | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | El nombre de la aplicación de KCL. Se utiliza de forma predeterminada para tableName y consumerName. | 
| tableName | ConfigsBuilder | Permite sustituir el nombre de la tabla que se utiliza para la tabla de asignaciones de Amazon DynamoDB. | 
| streamName | ConfigsBuilder | El nombre de la secuencia cuyos registros procesa de esta aplicación. | 
| kinesisEndpoint | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| dynamoDBEndpoint | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| initialPositionInStreamExtended | RetrievalConfig | La ubicación de la partición desde la que KCL comienza a recuperar registros, empezando por la ejecución inicial de la aplicación. | 
| kinesisCredentialsProvider | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| failoverTimeMillis | LeaseManagementConfig | El número de milisegundos que deben transcurrir antes de que se pueda considerar que se ha producido un error en el propietario de una asignación. | 
| workerIdentifier | ConfigsBuilder | Identificador único que representa esta instancia del procesador de aplicaciones. Deben ser único. | 
| shardSyncIntervalMillis | LeaseManagementConfig | El tiempo entre llamadas de sincronización del fragmento. | 
| maxRecords | PollingConfig | Permite establecer el número máximo de registros que devuelve Kinesis. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | Esta opción se ha eliminado. Consulte Eliminación del tiempo de inactividad. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | Cuando se establece, se llama al procesador de registros incluso cuando no se proporciona ningún registro de Kinesis. | 
| parentShardPollIntervalMillis | CoordinatorConfig | Determina la frecuencia con que debería sondear un procesador de registros para ver si el fragmento principal se ha completado. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | Cuando se establece, se eliminan las asignaciones tan pronto como se inicia el procesamiento del fragmento secundario. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | Cuando se establece, los fragmentos secundarios que tienen un fragmento abierto se pasan por alto. Esto es principalmente para DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| dynamoDBClientConfig | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| cloudWatchClientConfig | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| taskBackoffTimeMillis | LifecycleConfig | El tiempo que se debe esperar para reintentar la tareas con errores. | 
| metricsBufferTimeMillis | MetricsConfig | Controla la publicación de CloudWatch métricas. | 
| metricsMaxQueueSize | MetricsConfig | Controla la publicación de CloudWatch métricas. | 
| metricsLevel | MetricsConfig | Controla la publicación de CloudWatch métricas. | 
| metricsEnabledDimensions | MetricsConfig | Controla la publicación de CloudWatch métricas. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | Esta opción se ha eliminado. Consulte Validación del número de secuencia del punto de comprobación. | 
| regionName | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| maxLeasesForWorker | LeaseManagementConfig | El número máximo de asignaciones que debería aceptar una sola instancia de la aplicación. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | El número máximo de asignaciones del que debería intentar apropiarse una aplicación al mismo tiempo. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | La IOPs lectura de DynamoDB que se utiliza si la biblioteca de clientes de Kinesis necesita crear una nueva tabla de concesiones de DynamoDB. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | La IOPs lectura de DynamoDB que se utiliza si la biblioteca de clientes de Kinesis necesita crear una nueva tabla de concesiones de DynamoDB. | 
| initialPositionInStreamExtended | LeaseManagementConfig | La posición inicial de la secuencia en la que debería comenzar la aplicación. Esto solo se utiliza durante la creación inicial de la asignación. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Deshabilita la sincronización de los datos de los fragmentos si la tabla de asignaciones todavía contiene entradas. KinesisEcoTODO: -438 | 
| shardPrioritization | CoordinatorConfig | La priorización de fragmentos que se va a utilizar. | 
| shutdownGraceMillis | N/A | Esta opción se ha eliminado. Consulte MultiLang Mudanzas. | 
| timeoutInSeconds | N/A | Esta opción se ha eliminado. Consulte MultiLang Mudanzas. | 
| retryGetRecordsInSeconds | PollingConfig | Configura el retraso entre GetRecords intentos en caso de error. | 
| maxGetRecordsThreadPool | PollingConfig | El tamaño del grupo de subprocesos utilizado para GetRecords. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Controla el tamaño del grupo de subprocesos del renovador de asignaciones. Cuanto más grande sea el número de asignaciones que puede tomar la aplicación, más grande debe ser este grupo. | 
| recordsFetcherFactory | PollingConfig | Permite sustituir el generador que se utiliza para crear capturadores que recuperan datos de las secuencias. | 
| logWarningForTaskAfterMillis | LifecycleConfig | Tiempo que se debe esperar antes de registrar una advertencia si una tarea no ha finalizado. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | El número de milisegundos que se debe esperar entre llamadas a ListShards cuando se producen errores. | 
| maxListShardsRetryAttempts | RetrievalConfig | El número máximo de veces que se reintenta ListShards antes de desistir. | 

## Eliminar el tiempo de inactividad
<a name="idle-time-removal"></a>

En la versión 1.x de KCL, `idleTimeBetweenReadsInMillis` correspondía a dos cantidades: 
+ La cantidad de tiempo entre envíos de tareas. Ahora puede configurar este tiempo entre tareas estableciendo `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ La cantidad de tiempo en reposo cuando no se devuelven registros desde Kinesis Data Streams. En la versión 2.0, con la distribución ramificada mejorada, los registros se envían desde sus respectivos recuperadores. Solo se produce actividad en el consumidor del fragmento cuando se recibe una solicitud enviada. 

## Eliminar la configuración de clientes
<a name="client-configuration-removals"></a>

En la versión 2.0, KCL ya no crea los clientes. El usuario es el responsable de suministrar un cliente válido. Con este cambio, se han eliminado todos los parámetros de configuración que controlaban la creación de clientes. Si necesita estos parámetros, puede establecerlos en los clientes antes de proporcionar clientes a `ConfigsBuilder`.


****  

| Campo eliminado | Configuración equivalente | 
| --- | --- | 
| kinesisEndpoint | Configure el SDK KinesisAsyncClient con el punto de enlace preferido: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build(). | 
| dynamoDBEndpoint | Configure el SDK DynamoDbAsyncClient con el punto de enlace preferido: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build(). | 
| kinesisClientConfig | Configure el SDK KinesisAsyncClient con la configuración necesaria: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Configure el SDK DynamoDbAsyncClient con la configuración necesaria: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Configure el SDK CloudWatchAsyncClient con la configuración necesaria: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Configure el SDK con la región preferida. Es la misma para todos los clientes del SDK. Por ejemplo, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 