

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Desarrollar un consumidor de Kinesis Client Library en Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Java. Para ver la referencia sobre Javadoc, consulte el tema sobre [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) de Class. AmazonKinesisClient

Para descargar el KCL de Java GitHub, vaya a la [biblioteca de clientes de Kinesis (](https://github.com/awslabs/amazon-kinesis-client)Java). Para localizar KCL para Java en Apache Maven, vaya a la página de [resultados de la búsqueda de KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Para descargar un código de muestra para una aplicación de consumo de Java KCL desde GitHub, visite la página del proyecto de [ejemplo de KCL para Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) en. GitHub 

La aplicación de muestra utiliza [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). Puede cambiar la configuración de registro en el método estático `configure` definido en el archivo `AmazonKinesisApplicationSample.java`. *Para obtener más información sobre cómo utilizar el registro de Apache Commons con Log4j y aplicaciones AWS Java, consulte [Registrar con Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) en la guía para desarrolladores.AWS SDK para Java *

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Java:

**Topics**
+ [Implemente los métodos del procesador IRecord](#kinesis-record-processor-implementation-interface-java)
+ [Implemente una fábrica de clases para la interfaz IRecord del procesador](#kinesis-record-processor-implementation-factory-java)
+ [Crear un proceso de trabajo](#kcl-java-worker)
+ [Modificar las propiedades de configuración](#kinesis-record-processor-initialization-java)
+ [Migrar a la versión 2 de la interfaz del procesador de registros](#kcl-java-v2-migration)

## Implemente los métodos del procesador IRecord
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL es compatible actualmente con dos versiones de la interfaz de `IRecordProcessor`: la interfaz original está disponible con la primera versión de KCL y la versión 2 está disponible a partir de la versión 1.5.0 de KCL. Ambas interfaces son totalmente compatibles. La elección dependerá de su situación específica. Consulte sus javadocs locales o el código fuente para ver todas las diferencias. En las siguientes secciones se describe la implementación mínima introductoria.

**Topics**
+ [Interfaz original (versión 1)](#kcl-java-interface-original)
+ [Interfaz actualizada (versión 2)](#kcl-java-interface-v2)

### Interfaz original (versión 1)
<a name="kcl-java-interface-original"></a>

La interfaz original `IRecordProcessor` (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) expone los siguientes métodos del procesador de registros que el consumidor debe implementar. En la muestra se presentan implementaciones que puede utilizar como punto de partida (consulte `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**inicializar**  
KCL llama al método `initialize` cuando se crea una instancia del procesador de registros y pasa un ID de partición específico como parámetro. Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Kinesis Data Streams tiene una semántica de *al menos una vez*, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por más de un proceso de trabajo, consulte [Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
KCL llama al método `processRecords` y pasa una lista de registros de datos desde la partición especificada por el método `initialize(shardId)`. El procesador de registros procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. La clase `Record` expone los siguientes métodos que proporcionan acceso a los datos, el número secuencial y la clave de partición del registro. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

En el ejemplo, el método privado `processRecordsWithRetries` tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento pasando un generador de puntos de verificación (`IRecordProcessorCheckpointer`) a `processRecords`. El procesador de registros llama al método `checkpoint` en esta interfaz para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a `checkpoint` para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a `checkpoint` significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros solo debe llamar a `checkpoint` después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a `checkpoint` en cada llamada a `processRecords`. Un procesador podría, por ejemplo, llamar a `checkpoint` cada tercera vez que llame a `processRecords`. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para `checkpoint`. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado `checkpoint` muestra cómo llamar a `IRecordProcessorCheckpointer.checkpoint` mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL depende de `processRecords` para administrar cualquier excepción que surja del procesamiento de los registros de datos. Si `processRecords` genera una excepción, KCL omite los registros de datos que se pasaron antes de la excepción. Es decir, estos registros no se reenviarán al procesador de registros que generó la excepción ni a ningún otro procesador de registros en el consumidor.

**shutdown**  
KCL llama al método `shutdown` cuando finaliza el procesamiento (el motivo del cierre es `TERMINATE`) o cuando el proceso de trabajo ya no responde (el motivo del cierre es `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa una interfaz `IRecordProcessorCheckpointer` a `shutdown`. Si el motivo del shutdown es `TERMINATE`, el procesador de registros debería terminar de procesar los registros de datos y llamar al método `checkpoint` en esta interfaz.

### Interfaz actualizada (versión 2)
<a name="kcl-java-interface-v2"></a>

La interfaz actualizada `IRecordProcessor` (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) expone los siguientes métodos del procesador de registros que el consumidor debe implementar: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Se puede obtener acceso a todos los argumentos de la versión original de la interfaz mediante métodos "get" en los objetos del contenedor. Por ejemplo, para recuperar la lista de registros en `processRecords()`, puede utilizar `processRecordsInput.getRecords()`.

A partir de la versión 2 de esta interfaz (KCL 1.5.0 y posteriores), están disponibles las siguientes entradas nuevas, además de las entradas proporcionadas por la interfaz original:

starting sequence number  
En el objeto `InitializationInput` que se pasa a la operación `initialize()` el número secuencial inicial a partir del cual se facilitan los registros a la instancia del procesador de registros. Este es el último número secuencial objeto de un punto de comprobación por parte de la instancia del procesador de registros que procesara anteriormente el mismo fragmento. Estos datos se ofrecen por si su aplicación necesitara esta información. 

pending checkpoint sequence number  
En el objeto `InitializationInput` que se pasa a la operación `initialize()`, el número secuencial pendiente de punto de comprobación (si hay alguno) que no se ha podido confirmar antes de que se detuviera la instancia anterior del procesador de registros.

## Implemente una fábrica de clases para la interfaz IRecord del procesador
<a name="kinesis-record-processor-implementation-factory-java"></a>

También necesitará implementar un generador para la clase que implementa los métodos del procesador de registros. Cuando el consumidor crea instancias del proceso de trabajo, pasa una referencia a este generador.

La muestra implementa el generador de clases en el archivo `AmazonKinesisApplicationSampleRecordProcessorFactory.java` mediante la interfaz del procesador de registros original. Si desea que el generador de clases cree procesadores de registros de la versión 2, utilice el nombre de paquete `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Crear un proceso de trabajo
<a name="kcl-java-worker"></a>

Tal y como se ha explicado en [Implemente los métodos del procesador IRecord](#kinesis-record-processor-implementation-interface-java), hay dos versiones de la interfaz de procesador de registros de KCL para elegir, lo que afecta a la creación de un proceso de trabajo. La interfaz de procesador de registros original utiliza la siguiente estructura de código para crear un proceso de trabajo:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Con la versión 2 del procesador de registros, puede utilizar la `Worker.Builder` para crear un proceso de trabajo sin preocuparse por qué constructor utilizar ni por el orden de los argumentos. La interfaz de procesador de registros actualizada utiliza la siguiente estructura de código para crear un proceso de trabajo:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modificar las propiedades de configuración
<a name="kinesis-record-processor-initialization-java"></a>

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Este datos de configuración del proceso de trabajo se consolidan posteriormente en un objeto `KinesisClientLibConfiguration`. Este objeto y una referencia al generador de clases de `IRecordProcessor` se pasan en la llamada que crea la instancia del proceso de trabajo. Puede sobrescribir cualquiera de estas propiedades con sus propios valores a través de un archivo de propiedades de Java (consulte `AmazonKinesisApplicationSample.java`).

### Nombre de la aplicación
<a name="configuration-property-application-name"></a>

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:
+ Se entiende que los procesos de trabajo asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.
+ KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte [Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configuración de credenciales
<a name="kinesis-record-processor-cred-java"></a>

Debe poner sus AWS credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Por ejemplo, si ejecuta su consumidor en una instancia de EC2, se recomienda que lance la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para un consumidor que se ejecute en una instancia EC2.

En primer lugar, la aplicación de muestra intenta recuperar las credenciales de IAM de los metadatos de la instancia: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Si la aplicación de muestra no puede obtener credenciales de los metadatos de la instancia, intenta recuperar las credenciales desde un archivo de propiedades:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Para más información sobre los metadatos de instancia, consulte [Metadatos de instancia](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) en la *Guía del usuario de Amazon EC2*.

### Uso de ID de proceso de trabajo para varias instancias
<a name="kinesis-record-processor-workerid-java"></a>

El código de inicialización de muestra crea un ID para el proceso de trabajo, `workerId`, con el nombre del equipo local y un identificador global único anexo, tal y como se muestra en el siguiente fragmento de código. Este enfoque es compatible con un escenario con varias instancias de la aplicación consumidora ejecutándose en un único equipo.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrar a la versión 2 de la interfaz del procesador de registros
<a name="kcl-java-v2-migration"></a>

Si desea migrar código que utilice la interfaz original, además de los pasos descritos anteriormente, tendrá que seguir estos pasos:

1. Cambie la clase de su procesador de registros para importar la versión 2 de la interfaz del procesador de registros:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Cambiar las referencias a las entradas para usar métodos `get` en los objetos del contenedor. Por ejemplo, en la operación `shutdown()`, cambie "`checkpointer`" por "`shutdownInput.getCheckpointer()`".

1. Cambie la clase del generador de procesadores de registros para importar la interfaz del generador de procesadores de registros de la versión 2:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Cambie la construcción del proceso de trabajo para usar `Worker.Builder`. Por ejemplo:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```