

# Uso del adaptador Kinesis de DynamoDB Streams para procesar registros de transmisión
<a name="Streams.KCLAdapter"></a>

Usar Amazon Kinesis Adapter es la forma recomendada de consumir secuencias de Amazon DynamoDB. La API de DynamoDB Streams es intencionadamente similar a la de Kinesis Data Streams. En ambos servicios, los data streams se componen de fragmentos, que son los contenedores de los registros de secuencia. Los API de ambos servicios contienen operaciones `ListStreams`, `DescribeStream`, `GetShards` y `GetShardIterator`. (Aunque estas acciones de DynamoDB Streams son parecidas a sus homólogas de Kinesis Data Streams, no son idénticas al 100 %).

Como usuario de DynamoDB Streams, puede sacar partido de los patrones de diseño contenidos en KCL para procesar los fragmentos de DynamoDB Streams y transmitir registros. Para ello, se utiliza DynamoDB Streams Kinesis Adapter. Kinesis Adapter implementa la interfaz de Kinesis Data Streams, de tal forma que se pueda usar KCL para consumir y procesar registros desde DynamoDB Streams. Para obtener instrucciones acerca de cómo configurar e instalar DynamoDB Streams Kinesis Adapter, consulte el [repositorio de GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter).

Puede escribir aplicaciones para Kinesis Data Streams mediante Kinesis Client Library (KCL). KCL simplifica la codificación porque proporciona abstracciones útiles por encima del API de bajo nivel de Kinesis Data Streams. Para obtener más información sobre KCL, consulte [Desarrollo de consumidores mediante la biblioteca Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*.

DynamoDB recomienda utilizar la versión 3.x de KCL con AWS SDK para Java v2.x. La versión actual del adaptador de Kinesis de DynamoDB Streams 1.x con AWS SDK para AWS SDK para Java v1.x se seguirá admitiendo por completo durante todo el ciclo de vida, tal y como estaba previsto durante el periodo de transición, en consonancia con la [Política de mantenimiento de AWS SDK y herramientas](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html).

**nota**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x dejará de recibir asistencia el 30 de enero de 2026. Le recomendamos que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [Biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en GitHub. Para obtener información sobre las últimas versiones de KCL, consulte [Uso de la biblioteca de clientes de Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte Migración de KCL 1.x a KCL 3.x.

En el siguiente diagrama se muestra cómo interaccionan estas bibliotecas entre sí.

![\[Interacción entre DynamoDB Streams, Kinesis Data Streams y KCL para procesar registros de DynamoDB Streams.\]](http://docs.aws.amazon.com/es_es/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Si DynamoDB Streams Kinesis Adapter está implementado, puede comenzar a desarrollar para la interfaz de KCL y dirigir las llamadas al API de forma transparente al punto de enlace de DynamoDB Streams.

Cuando se inicia la aplicación, llama a KCL para crear una instancia de un proceso de trabajo. Debe facilitar al proceso de trabajo información sobre la configuración de la aplicación, como el descriptor de la transmisión y las credenciales de AWS, así como el nombre de una clase de procesador de registros que usted proporcione. A medida que el proceso de trabajo ejecuta el código en el procesador de registros, lleva a cabo las siguientes tareas:
+ Se conecta a la secuencia
+ Enumera las particiones del flujo.
+ Comprueba y enumera las particiones secundarias de una partición principal cerrada dentro del flujo
+ Coordina la asociación de los fragmentos con otros procesos de trabajo (si procede)
+ Crea instancias de un procesador de registros para cada fragmento que administra
+ Extrae registros del flujo.
+ Escala la frecuencia de llamadas a la API GetRecords durante el alto rendimiento (si se configura el modo de recuperación).
+ Inserta los registros en el procesador de registros correspondiente
+ Genera puntos de comprobación para los registros procesados
+ Balancea las asociaciones entre fragmentos y procesos de trabajo cuando cambia el recuento de instancias de procesos de trabajo
+ Equilibra las asociaciones entre particiones y procesos de trabajo cuando las particiones se dividen.

El adaptador KCL admite el modo de recuperación, una característica de ajuste automático de la frecuencia de llamadas para gestionar los aumentos temporales de rendimiento. Cuando el retraso en el procesamiento de flujos supera un umbral configurable (un minuto de forma predeterminada), el modo de recuperación escala la frecuencia de llamadas a la API GetRecords en un valor configurable (tres veces de forma predeterminada) para recuperar los registros más rápido y, a continuación, vuelve a la normalidad una vez que disminuye el retraso. Esto resulta muy útil durante los periodos de alto rendimiento, en los que la actividad de escritura de DynamoDB puede saturar a los consumidores que utilizan las frecuencias de sondeo predeterminadas. El modo de recuperación se puede habilitar a través del parámetro de configuración `catchupEnabled` (falso de forma predeterminada).

**nota**  
Para obtener una descripción de los conceptos de KCL enumerados aquí, consulte [Desarrollo de consumidores mediante la biblioteca Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*.  
Para obtener más información acerca de cómo usar los flujos con AWS Lambda, consulte [DynamoDB Streams y disparadores de AWS Lambda](Streams.Lambda.md).

# Migración de KCL 1.x a KCL 3.x
<a name="streams-migrating-kcl"></a>

## Descripción general
<a name="migrating-kcl-overview"></a>

En esta guía se proporcionan instrucciones para migrar la aplicación de consumidor de KCL 1.x a KCL 3.x. Debido a las diferencias de arquitectura entre KCL 1.x y KCL 3.x, la migración requiere actualizar varios componentes para garantizar la compatibilidad.

KCL 1.x utiliza clases e interfaces diferentes en comparación con KCL 3.x. Debe migrar primero el procesador de registros, el generador de procesadores de registros y las clases de procesos de trabajo al formato compatible con KCL 3.x y, a continuación, seguir los pasos de migración de KCL 1.x a KCL 3.x.

## Pasos para realizar la migración
<a name="migration-steps"></a>

**Topics**
+ [Paso 1: migración del procesador de registros](#step1-record-processor)
+ [Paso 2: migración del generador de procesadores de registros](#step2-record-processor-factory)
+ [Paso 3: migración del proceso de trabajo](#step3-worker-migration)
+ [Paso 4: información general y recomendaciones sobre la configuración de KCL 3.x](#step4-configuration-migration)
+ [Paso 5: migración de KCL 2.x a KCL 3.x](#step5-kcl2-to-kcl3)

### Paso 1: migración del procesador de registros
<a name="step1-record-processor"></a>

En el siguiente ejemplo se muestra un procesador de registros implementado para el adaptador de Kinesis de DynamoDB Streams de KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Migración de la clase RecordProcessor**

1. Cambie las interfaces de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` y `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` a `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` tal y como se indica a continuación:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Actualice las instrucciones de importación para los métodos `initialize` y `processRecords`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Sustituya el método `shutdownRequested` por los métodos nuevos siguientes: `leaseLost`, `shardEnded` y `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

A continuación, se muestra la versión actualizada de la clase del procesador de registros:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**nota**  
El adaptador de Kinesis de DynamoDB Streams ahora usa el modelo de registro de SDKv2. En SDKv2, los objetos `AttributeValue` complejos (`BS`, `NS`, `M`, `L` y `SS`) nunca devuelven un valor nulo. Use los métodos `hasBs()`, `hasNs()`, `hasM()`, `hasL()` y `hasSs()` para verificar si estos valores existen.

### Paso 2: migración del generador de procesadores de registros
<a name="step2-record-processor-factory"></a>

El generador de procesadores de registros es responsable de la creación de procesadores de registros cuando se adquiere una asignación. A continuación, se muestra un ejemplo de un generador de la versión 1.x de KCL:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Migración de `RecordProcessorFactory`**
+ Cambie la interfaz implementada de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, tal y como se indica a continuación:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

A continuación, se muestra un ejemplo de generador de procesadores de registros de la versión 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Paso 3: migración del proceso de trabajo
<a name="step3-worker-migration"></a>

En la versión 3.0 de KCL, una nueva clase, llamada **Scheduler**, reemplaza la clase **Worker**. A continuación, se muestra un ejemplo de proceso de trabajo de la versión 1.x de KCL:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Para migrar el proceso de trabajo**

1. Cambie la instrucción `import` para la clase `Worker` por las instrucciones de importación para las clases `Scheduler` y `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importe `StreamTracker` y cambie la importación de `StreamsWorkerFactory` a `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Elija la posición desde la que desea iniciar la aplicación. Puede ser `TRIM_HORIZON` o `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Crear una instancia de `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Cree el objeto `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Cree el objeto `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Cree `Scheduler` mediante `ConfigsBuilder` tal como se muestra en el ejemplo siguiente:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**importante**  
La configuración `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` mantiene la compatibilidad entre el adaptador de Kinesis de DynamoDB Streams para KCL v3 y KCL v1, pero no entre KCL v2 y v3.

### Paso 4: información general y recomendaciones sobre la configuración de KCL 3.x
<a name="step4-configuration-migration"></a>

Para obtener una descripción detallada de las configuraciones introducidas después de KCL 1.x que son relevantes en KCL 3.x, consulte las [configuraciones de KCL](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) y la [configuración del cliente de migración de KCL](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**importante**  
En lugar de crear directamente objetos de `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` y `retrievalConfig`, recomendamos utilizar `ConfigsBuilder` para establecer configuraciones en KCL 3.x y versiones posteriores y así evitar problemas de inicialización del programador. `ConfigsBuilder` proporciona una forma más flexible y sostenible de configurar la aplicación de KCL.

#### Configuraciones con valor predeterminado actualizado en KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
En la versión 1.x de KCL, el valor predeterminado de `billingMode` se establece en `PROVISIONED`. No obstante, con la versión 3.x de KCL, el valor predeterminado de `billingMode` es `PAY_PER_REQUEST` (modo bajo demanda). Le recomendamos que utilice el modo de capacidad bajo demanda para la tabla de arrendamiento a fin de ajustar automáticamente la capacidad en función del uso. Para obtener orientación sobre cómo utilizar la capacidad aprovisionada para las tablas de arrendamiento, consulte [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
En la versión 1.x de KCL, el valor predeterminado de `idleTimeBetweenReadsInMillis` se establece en 1000 (o 1 segundo). La versión 3.x de KCL establece el valor predeterminado de i`dleTimeBetweenReadsInMillis` en 1500 (o 1,5 segundos), pero el adaptador de Kinesis de Amazon DynamoDB Streams reemplaza el valor predeterminado por 1000 (o 1 segundo).

#### Nuevas configuraciones en KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Esta configuración define el intervalo de tiempo antes de que las particiones recién descubiertas comiencen a procesarse y se calcula como 1,5 × `leaseAssignmentIntervalMillis`. Si este ajuste no se configura explícitamente, el intervalo de tiempo se establece de forma predeterminada en 1,5 × `failoverTimeMillis`. El procesamiento de nuevas particiones implica examinar la tabla de arrendamiento y consultar un índice secundario global (GSI) en la tabla de arrendamiento. Al reducir `leaseAssignmentIntervalMillis`, aumenta la frecuencia de estas operaciones de análisis y consulta, lo que se traduce en mayores costos de DynamoDB. Recomendamos establecer este valor en 2000 (o 2 segundos) para minimizar el retraso en el procesamiento de nuevas particiones.

`shardConsumerDispatchPollIntervalMillis`  
Esta configuración define el intervalo entre sondeos sucesivos por parte del consumidor de particiones para activar las transiciones de estado. En la versión 1.x de KCL, este comportamiento se controlaba mediante el parámetro `idleTimeInMillis`, que no se exponía como un ajuste configurable. Con la versión 3.x de KCL, recomendamos establecer esta configuración para que coincida con el valor utilizado para` idleTimeInMillis` en la configuración de la versión 1.x de KCL.

### Paso 5: migración de KCL 2.x a KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Para garantizar una transición fluida y la compatibilidad con la última versión de la biblioteca de clientes de Kinesis (KCL), siga los pasos del 5 al 8 de las instrucciones de la guía de migración para [actualizar de KCL 2.x a KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Para ver los problemas habituales de solución de problemas de KCL 3.x, consulte [Solución de problemas de las aplicaciones de consumidores de KCL](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Restauración de la versión de KCL anterior
<a name="kcl-migration-rollback"></a>

En este tema se explica cómo restaurar la aplicación de consumidor a la versión de KCL anterior. El proceso de restauración consta de dos pasos:

1. Ejecución de la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Nueva implementación del código de la versión de KCL anterior.

## Paso 1: ejecución de la herramienta de migración de KCL
<a name="kcl-migration-rollback-step1"></a>

Cuando necesite restaurar la versión anterior de KCL, debe ejecutar la herramienta de migración de KCL. La herramienta realiza dos tareas importantes:
+ Elimina una tabla de metadatos llamada tabla de métricas de procesos de trabajo y el índice secundario global de la tabla de arrendamiento en DynamoDB. Estos artefactos los crea KCL 3.x, pero no son necesarios al restaurar la versión anterior.
+ Hace que todos los procesos de trabajo se ejecuten en un modo compatible con KCL 1.x y comiencen a utilizar el algoritmo de equilibrio de carga utilizado en versiones anteriores de KCL. Si tiene problemas con el nuevo algoritmo de equilibrio de carga en KCL 3.x, esto mitigará el problema inmediatamente.

**importante**  
La tabla de estados del coordinador en DynamoDB debe existir y no debe eliminarse durante el proceso de migración, restauración y avance.

**nota**  
Es importante que todos los procesos de trabajo de la aplicación de consumo utilicen el mismo algoritmo de equilibrio de carga en un momento dado. La herramienta de migración de KCL se asegura de que todos los procesos de trabajo de la aplicación de consumo KCL 3.x cambien al modo compatible con KCL 1.x, de modo que todos los procesos de trabajo ejecuten el mismo algoritmo de equilibrio de carga durante la restauración de la aplicación a la versión anterior de KCL.

Puede descargar la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) en el directorio de scripts del [repositorio de GitHub de KCL](https://github.com/awslabs/amazon-kinesis-client/tree/master). Ejecute el script desde un proceso de trabajo o host con los permisos adecuados para escribir en la tabla de estados del coordinador, la tabla de métricas de los procesos de trabajo y la tabla de arrendamiento. Asegúrese de que los [permisos de IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) adecuados estén configurados para las aplicaciones de consumo de KCL. Ejecute el script solo una vez por aplicación de KCL mediante el comando especificado:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Sustituya *region* por su Región de AWS.

`--application_name`  
Este parámetro es necesario si utiliza nombres predeterminados para las tablas de metadatos de DynamoDB (tabla de arrendamiento, tabla de estados del coordinador y tabla de métricas de los proceso de trabajo). Si ha especificado nombres personalizados para estas tablas, puede omitir este parámetro. Reemplace *applicationName* por el nombre de la aplicación KCL real. La herramienta utiliza este nombre para derivar los nombres de tabla predeterminados si no se proporcionan nombres personalizados.

`--lease_table_name`  
Este parámetro es necesario cuando ha establecido un nombre personalizado para la tabla de arrendamientos en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace *leaseTableName* por el nombre de tabla personalizado que especificó para la tabla de arrendamiento.

`--coordinator_state_table_name`  
Este parámetro es necesario cuando ha establecido un nombre personalizado para la tabla de estados de coordinador en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace *coordinatorStateTableName* por el nombre de tabla personalizado que especificó para la tabla de estados de coordinador.

`--worker_metrics_table_name`  
Este parámetro es necesario cuando ha establecido un nombre personalizado para la tabla de métricas de proceso de trabajo en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace *workerMetricsTableName* por el nombre de tabla personalizado que especificó para la tabla de métricas de proceso de trabajo.

## Paso 2: nueva implementación del código con la versión de KCL anterior
<a name="kcl-migration-rollback-step2"></a>

**importante**  
Cualquier mención a la versión 2.x en la salida generada por la herramienta de migración de KCL debe interpretarse como una referencia a la versión 1.x de KCL. La ejecución del script no realiza una recuperación completa, solo cambia el algoritmo de equilibrio de carga por el utilizado en la versión 1.x de KCL.

Tras ejecutar la herramienta de migración de KCL para realizar una recuperación, verá uno de los siguientes mensajes:

Mensaje 1  
“Rollback completed. Your application was running 2x compatible functionality. Please rollback to your previous application binaries by deploying the code with your previous KCL version”.  
**Acción requerida:** esto significa que los procesos de trabajo se estaban ejecutando en el modo compatible con KCL 1.x. Vuelva a implementar el código con la versión de KCL anterior en los procesos de trabajo.

Mensaje 2  
“Rollback completed. Your KCL Application was running 3x functionality and will rollback to 2x compatible functionality. If you don't see mitigation after a short period of time, please rollback to your previous application binaries by deploying the code with your previous KCL version”.  
**Acción requerida:** esto significa que los procesos de trabajo se estaban ejecutando en modo KCL 3.x y la herramienta de migración de KCL ha cambiado todos los procesos de trabajo al modo compatible con KCL 1.x. Vuelva a implementar el código con la versión de KCL anterior en los procesos de trabajo.

Mensaje 3  
“Application was already rolled back. Any KCLv3 resources that could be deleted were cleaned up to avoid charges until the application can be rolled forward with migration”.  
**Acción requerida:** esto significa que los procesos de trabajo ya se han restaurado para ejecutarse en el modo compatible con KCL 1.x. Vuelva a implementar el código con la versión de KCL anterior en los procesos de trabajo.

# Avance a KCL 3.x después de una restauración
<a name="kcl-migration-rollforward"></a>

En este tema se explica cómo avanzar la aplicación de consumidor a KCL 3.x después de una restauración. Cuando necesite avanzar, debe completar un proceso de dos pasos:

1. Ejecución de la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Implemente el código con KCL 3.x.

## Paso 1: ejecución de la herramienta de migración de KCL
<a name="kcl-migration-rollforward-step1"></a>

Ejecute la herramienta de migración de KCL con el siguiente comando para avanzar a KCL 3.x:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Sustituya *region* por su Región de AWS.

`--application_name`  
Este parámetro es obligatorio si utiliza nombres predeterminados para la tabla de estados de coordinador. Si ha especificado nombres personalizados para la tabla de estados de coordinador, puede omitir este parámetro. Reemplace *applicationName* por el nombre de la aplicación KCL real. La herramienta utiliza este nombre para derivar los nombres de tabla predeterminados si no se proporcionan nombres personalizados.

`--coordinator_state_table_name`  
Este parámetro es necesario cuando ha establecido un nombre personalizado para la tabla de estados de coordinador en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace *coordinatorStateTableName* por el nombre de tabla personalizado que especificó para la tabla de estados de coordinador.

Después de ejecutar la herramienta de migración en modo de avance, KCL crea los siguientes recursos de DynamoDB necesarios para KCL 3.x:
+ Un índice secundario global en la tabla de arrendamientos
+ Una tabla de métricas de proceso de trabajo

## Paso 2: implementación del código con KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Después de ejecutar la herramienta de migración de KCL para una restauración, implemente el código con KCL 3.x en los procesos de trabajo. Para completar la migración, consulte [Paso 8: complete la migración](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Tutorial: Adaptador Kinesis de DynamoDB Streams
<a name="Streams.KCLAdapter.Walkthrough"></a>

En esta sección se explica paso a paso una aplicación Java en la que se utiliza Amazon Kinesis Client Library y Amazon DynamoDB Streams Kinesis Adapter. En la aplicación se muestra un ejemplo de replicación de datos, donde la actividad de escritura de una tabla se aplica a una segunda tabla, de tal forma que el contenido de ambas se mantiene sincronizado. Para obtener el código fuente, consulte [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

El programa realiza lo siguiente:

1. Crea dos tablas de DynamoDB denominadas `KCL-Demo-src` y `KCL-Demo-dst`. En cada una de estas tablas se ha habilitado una secuencia.

1. Agrega, actualiza y elimina elementos para generar actividad de actualización en la tabla de origen. Esto hace que se escriban datos en la secuencia de la tabla.

1. Lee los registros en la transmisión, los reconstruye como solicitudes de DynamoDB y aplica las solicitudes a la tabla de destino.

1. Examina las tablas de origen y destino para comprobar que sus contenidos sean idénticos.

1. Efectúa una limpieza eliminando las tablas.

Estos pasos se describen en las siguientes secciones y la aplicación completa se muestra al final del tutorial.

**Topics**
+ [Paso 1: crear tablas de DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Paso 2: generar actividad de actualización en la tabla de origen](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Paso 3: procesar la secuencia](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Paso 4: comprobar que el contenido de ambas tablas es idéntico](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Paso 5: Eliminar](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Paso 1: crear tablas de DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

El primer paso consiste en crear dos tablas de DynamoDB, una de origen y una de destino. El `StreamViewType` de la secuencia de la tabla de origen es `NEW_IMAGE`. Esto significa que cada vez que se modifica un elemento en esta tabla, su imagen de "después" se escribe en la secuencia. De esta forma, se realiza un seguimiento en la secuencia de todas las actividades de escritura en la tabla.

En el siguiente ejemplo se muestra el código utilizado para crear las dos tablas.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Paso 2: generar actividad de actualización en la tabla de origen
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

El siguiente paso consiste en generar actividad de escritura en la tabla de origen. Mientras tiene lugar esta actividad, la secuencia de la tabla de origen también se actualiza casi en tiempo real.

En la aplicación se define una clase auxiliar con métodos que llaman a las operaciones de API `PutItem`, `UpdateItem` y `DeleteItem` para escribir los datos. En el siguiente ejemplo se muestra cómo se utilizan estos métodos.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Paso 3: procesar la secuencia
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Ahora, el programa comienza a procesar la secuencia. DynamoDB Streams Kinesis Adapter actúa como una capa transparente entre la KCL y el punto de enlace de DynamoDB Streams, para que el código pueda utilizar plenamente la KCL, en lugar de tener que realizar llamadas a DynamoDB Streams de bajo nivel. En el programa se realizan las siguientes tareas:
+ Se define una clase de procesador de registros, `StreamsRecordProcessor`, con métodos que cumplen con la definición de interfaz de KCL: `initialize`, `processRecords` y `shutdown`. El método `processRecords` contiene la lógica necesaria para leer la secuencia de la tabla de origen y escribir en la tabla de destino.
+ Define un generador de clases para la clase de procesador de registros (`StreamsRecordProcessorFactory`). Esto es necesario para los programas Java que utilizan la KCL.
+ Crea una nueva instancia del proceso de trabajo `Worker` de la KCL, asociado con el generador de clases.
+ Cierra el proceso de trabajo `Worker` cuando ha finalizado de procesar registros.

Si lo desea, habilite el modo de recuperación en la configuración del adaptador KCL de sus flujos para escalar automáticamente la tasa de llamadas a la API GetRecords tres veces (predeterminado) cuando el retraso en el procesamiento de flujos supere un minuto (predeterminado), lo que ayudará a su consumidor de flujos a gestionar los picos de alto rendimiento en su tabla.

Para obtener más información sobre la definición de la interfaz de KCL, consulte [Desarrollo de consumidores mediante la biblioteca Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) en la *Guía de desarrolladores de Amazon Kinesis Data Streams*. 

En el siguiente ejemplo se muestra el bucle principal de `StreamsRecordProcessor`. La instrucción `case` determina qué acción se debe llevar a cabo, según el valor de `OperationType` que aparece en el registro de secuencia.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Paso 4: comprobar que el contenido de ambas tablas es idéntico
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

En este punto, el contenido de las tablas de origen y destino está sincronizado. La aplicación emite solicitudes `Scan` en las dos tablas para comprobar que su contenido sea realmente idéntico.

La clase `DemoHelper` contiene un método `ScanTable` que llama a la API de bajo nivel `Scan`. El siguiente ejemplo le muestra cómo se usa.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Paso 5: Eliminar
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

La demostración ha finalizado. Por consiguiente, la aplicación elimina las tablas de origen y destino. Consulte el siguiente ejemplo de código. Incluso después de que las tablas se hayan eliminado, sus secuencias permanecerán disponibles durante un máximo de 24 horas; transcurrido este periodo se eliminan automáticamente.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Programa completo: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

A continuación encontrará el programa de Java completo que lleva a cabo las tareas descritas en [Tutorial: Adaptador Kinesis de DynamoDB Streams](Streams.KCLAdapter.Walkthrough.md). Cuando lo ejecute, debería ver un resultado similar al siguiente.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**importante**  
 Para ejecutar este programa, utilice políticas con el fin de asegurarse de que la aplicación cliente tenga acceso a DynamoDB y a Amazon CloudWatch. Para obtener más información, consulte [Políticas basadas en identidad de DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

El código de origen consta de cuatro archivos `.java`. Para crear este programa, agregue la siguiente dependencia, que incluye la biblioteca de clientes de Amazon Kinesis (KCL) 3.x y el AWS SDK para Java v2 como dependencias transitivas:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

Los archivos de origen son:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```