

# Captura de datos de cambios para DynamoDB Streams
<a name="Streams"></a>

 DynamoDB Streams captura una secuencia en orden cronológico de las modificaciones de los elementos en una tabla de DynamoDB y almacena esta información en un log durante un máximo de 24 horas. Las aplicaciones pueden obtener acceso a este registro y ver los elementos de datos tal y como se encontraban antes y después de la modificación, prácticamente en tiempo real.

 El cifrado en reposo cifra los datos en DynamoDB streams. Para obtener más información, consulte [Cifrado en reposo en DynamoDB](EncryptionAtRest.md).

Una *transmisión de DynamoDB* es un flujo ordenado de información sobre los cambios que se realizan en los elementos de una tabla de DynamoDB. Cuando se habilita una transmisión en una tabla, DynamoDB obtiene información sobre cada modificación de los elementos de datos de esa tabla.

Cada vez que una aplicación crea, actualiza o elimina elementos en la tabla, DynamoDB Streams escribe un registro de transmisión con los atributos de clave principal de los elementos modificados. Un *registro de transmisión* contiene información sobre una modificación de los datos de un solo elemento de una tabla de DynamoDB. Puede configurar la secuencia de tal forma que sus registros capturen información adicional; por ejemplo, las imágenes de "antes" y "después" de los elementos modificados.

DynamoDB Streams ayuda a garantizar lo siguiente:
+ Cada registro de secuencia aparece una única vez en la secuencia.
+ Para cada elemento que se modifica de una tabla de DynamoDB, los registros de transmisión aparecen en el mismo orden en que se han realizado las modificaciones del elemento.

DynamoDB Streams escribe los registros de transmisión prácticamente en tiempo real, para que pueda crear aplicaciones que consuman estas transmisiones y adopten medidas en función de su contenido.

**Topics**
+ [Endpoints para DynamoDB Streams](#Streams.Endpoints)
+ [Habilitación de una secuencia](#Streams.Enabling)
+ [Lectura y procesamiento de un flujo](#Streams.Processing)
+ [DynamoDB Streams y período de vida](time-to-live-ttl-streams.md)
+ [Uso del adaptador Kinesis de DynamoDB Streams para procesar registros de transmisión](Streams.KCLAdapter.md)
+ [API de bajo nivel de DynamoDB Streams: ejemplo en Java](Streams.LowLevel.Walkthrough.md)
+ [DynamoDB Streams y disparadores de AWS Lambda](Streams.Lambda.md)
+ [DynamoDB Streams y Apache Flink](StreamsApacheFlink.xml.md)

## Endpoints para DynamoDB Streams
<a name="Streams.Endpoints"></a>

AWS mantiene puntos de enlace distintos para DynamoDB y DynamoDB Streams. Para usar las tablas y los índices de la base de datos, la aplicación tendrá que acceder a un punto de enlace de DynamoDB. Para leer y procesar los registros de DynamoDB Streams, la aplicación tendrá que obtener acceso a un punto de enlace de DynamoDB Streams situado en la misma región.

DynamoDB Streams ofrece dos conjuntos de puntos de conexión. Son los siguientes:
+ **Puntos de conexión exclusivos para IPv4**: puntos de conexión con la convención de nomenclatura de `streams.dynamodb.<region>.amazonaws.com`.
+ **Puntos de conexión de doble pila**: puntos de conexión nuevos que son compatibles con IPv4 e IPv6 y que siguen la convención de nomenclatura de `streams-dynamodb.<region>.api.aws`.

**nota**  
Para obtener una lista completa de las regiones y puntos de conexión de DynamoDB y DynamoDB Streams, consulte [Regiones y puntos de conexión](https://docs.aws.amazon.com/general/latest/gr/rande.html) en la *Referencia general de AWS*.

Los SDK de AWS proporcionan clientes independientes para DynamoDB y DynamoDB Streams. Según cuáles sean sus necesidades, la aplicación puede obtener acceso a un punto de enlace de DynamoDB, a un punto de enlace de DynamoDB Streams o a ambos al mismo tiempo. Para conectarse a ambos puntos de enlace, la aplicación tendrá que crear instancias de dos clientes: uno para DynamoDB y otro para DynamoDB Streams.

## Habilitación de una secuencia
<a name="Streams.Enabling"></a>

Puede habilitar una transmisión en una nueva tabla al crearla mediante la AWS CLI o uno de los SDK de AWS. También puede habilitar o deshabilitar una transmisión en una tabla existente, así como cambiar la configuración de una transmisión. DynamoDB Streams opera de forma asincrónica, por lo que el rendimiento de una tabla no se vea afectado al habilitar una transmisión.

La forma más sencilla de administrar DynamoDB Streams es usar la Consola de administración de AWS.

1. Inicie sesión en la Consola de administración de AWS y abra la consola de DynamoDB en [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. En el panel de la consola de DynamoDB, elija **Tablas** y seleccione una tabla existente.

1. Elija la pestaña **Exports and streams** (Exportaciones y flujos).

1. En la sección **Detalles del flujo de DynamoDB**, seleccione **Activar**.

1. En la ventana **Activar el flujo de DynamoDB**, elija la información que se escribirá en el flujo cada vez que se modifiquen los datos de la tabla:
   + **Key attributes only** (Solo atributos clave): solo los atributos clave del elemento modificado.
   + **New image (Nueva imagen)**: el elemento completo tal y como aparece después de modificarlo.
   + **Old image (Imagen anterior)**: el elemento completo tal y como aparecía antes de modificarlo.
   + **New and old images (Imágenes nuevas y anteriores)**: ambas imágenes del elemento, la nueva y la anterior.

   Cuando la configuración sea la que desea, elija **Activar flujo**.

1. (Opcional) Para desactivar un flujo existente, elija **Desactivar** bajo **Detalles del flujo de DynamoDB**.

También puede usar las operaciones de la API `CreateTable` o `UpdateTable` para habilitar o modificar una secuencia, respectivamente. El parámetro `StreamSpecification` determina cómo se configura la secuencia:
+ `StreamEnabled`: especifica si una transmisión está habilitada (`true`) o deshabilitada (`false`) para la tabla.
+ `StreamViewType`: especifica la información que se escribirá en la transmisión cada vez que se modifiquen los datos de la tabla:
  + `KEYS_ONLY`: solo los atributos de clave del elemento modificado.
  + `NEW_IMAGE`: el elemento completo tal y como aparece después de modificarlo.
  + `OLD_IMAGE`: el elemento completo tal y como aparecía antes de modificarlo.
  + `NEW_AND_OLD_IMAGES`: ambas imágenes del elemento, la nueva y la anterior.

Puede habilitar o deshabilitar una secuencia en cualquier momento. Sin embargo, tenga en cuenta que recibirá una excepción `ValidationException` si intenta habilitar una secuencia en una tabla que ya tiene una. Recibirá una `ValidationException` si intenta desactivar una secuencia en una tabla que no tiene ninguna.

Cuando se establece `StreamEnabled` en `true`, DynamoDB crea una nueva transmisión con un descriptor de transmisión único asignado a ella. Si deshabilita y vuelve a habilitar una secuencia en la tabla, se crea una secuencia nueva con un descriptor de secuencia distinto.

Cada secuencia se identifica de forma exclusiva mediante un nombre de recurso de Amazon (ARN). A continuación se muestra un ejemplo de ARN de una transmisión de una tabla de DynamoDB denominada `TestTable`.

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

Para determinar el último descriptor de transmisión de una tabla, se emite una solicitud `DescribeTable` de DynamoDB y se busca el elemento `LatestStreamArn` en la respuesta.

**nota**  
No es posible editar un `StreamViewType` una vez que se ha configurado un flujo. Si necesita realizar cambios en un flujo después de haberlo configurado, debe desactivar el flujo actual y crear uno nuevo.

## Lectura y procesamiento de un flujo
<a name="Streams.Processing"></a>

Para leer y procesar una transmisión, la aplicación tiene que conectarse a un punto de enlace de DynamoDB Streams y emitir solicitudes de API.

Una secuencia consta de *registros de secuencia*. Cada registro de transmisión representa una única modificación de datos de la tabla de DynamoDB a la que pertenece la secuencia. A cada registro de secuencia se le asigna un número de secuencia que refleja el orden en que el registro se ha publicado en la secuencia.

Los registros de secuencia se organizan en grupos, o *fragmentos*. Cada fragmento actúa como contenedor de varios registros de secuencia y contiene la información necesaria para obtener acceso a estos registros y recorrerlos en iteración. Los registros de secuencia de un fragmento se eliminan automáticamente transcurridas de 24 horas.

Los fragmentos son efímeros: se crean y eliminan automáticamente, según sea necesario. Además, cualquier fragmento se puede dividir en varios fragmentos nuevos; esto también sucede automáticamente. (Cabe destacar que un fragmento principal puede tener un solo fragmento secundario). Un fragmento se puede dividir en respuesta a niveles de actividad de escritura elevados en la tabla principal, para que las aplicaciones puedan procesar en paralelo los registros de varios fragmentos.

Si se deshabilita una secuencia, todos fragmentos que estén abiertos se cerrarán. Los datos de la transmisión continuarán disponibles para leerlos durante 24 horas.

Como los fragmentos poseen un parentesco (principales y secundarios), las aplicaciones siempre deben procesar los fragmentos principales antes de procesar los secundarios. Esto ayuda a garantizar que los registros de secuencia se procesen también en el orden correcto. (Si utiliza DynamoDB Streams Kinesis Adapter, esto se lleva a cabo de forma automatizada: la aplicación procesará los fragmentos y registros de transmisión en el orden correcto y también administrará automáticamente los fragmentos nuevos o vencidos, así como aquellos que se dividan mientras se ejecuta la aplicación. Para obtener más información, consulte ). [Uso del adaptador Kinesis de DynamoDB Streams para procesar registros de transmisión](Streams.KCLAdapter.md).)

En el siguiente diagrama se muestra la relación entre una secuencia, sus fragmentos y los registros de secuencia contenidos en los fragmentos.

![\[Estructura de DynamoDB Streams. Los registros de secuencia que representan modificaciones de datos se organizan en particiones.\]](http://docs.aws.amazon.com/es_es/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**nota**  
Si lleva a cabo una operación `PutItem` o `UpdateItem` que no modifica ningún dato de un elemento, DynamoDB Streams *no* escribe ningún registro de transmisión de esa operación.

Para obtener acceso a una secuencia y procesar los registros que contiene, proceda como sigue:
+ Determine el ARN único de la secuencia a la que desea obtener acceso.
+ Determine qué fragmentos de la secuencia contienen los registros de secuencia que le interesan.
+ Obtenga acceso a los fragmentos y recupere los registros de secuencia que desee.

**nota**  
Nunca debe haber más de dos procesos leyendo la misma partición de flujo a la vez. Usar más de dos procesos de lectura por fragmento puede provocar que se aplique la limitación controlada.

La API de DynamoDB Streams ofrece las siguientes acciones para usarlas en los programas de aplicación:
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)`: devuelve una lista de descriptores de transmisión de la cuenta y el punto de enlace actuales. Si lo desea, puede solicitar únicamente los descriptores de secuencia de un nombre de tabla concreto.
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)`: devuelve información sobre un flujo, incluido el estado actual del flujo, su nombre de recurso de Amazon (ARN), la composición de sus particiones y su tabla de DynamoDB correspondiente. Puede utilizar opcionalmente el campo `ShardFilter` para recuperar la partición secundaria existente asociada a la partición principal.
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)`: devuelve un *iterador de fragmentos* que describe una ubicación en el fragmento. Puede solicitar que el iterador proporcione acceso al punto más antiguo, al punto más reciente o a un punto concreto de la secuencia.
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)`: devuelve los registros de secuencia de un fragmento determinado. Debe proporcionar el iterador de fragmentos devuelto por una solicitud `GetShardIterator`.

Para obtener descripciones completas de estas operaciones de la API, así como ejemplos de solicitudes y respuestas, consulte la [Referencia de API de Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html).

### Detección de particiones
<a name="Streams.ShardDiscovery"></a>



Descubra nuevas particiones en el flujo de DynamoDB con dos métodos potentes. Como usuario de Amazon DynamoDB Streams, dispone de dos formas efectivas de rastrear e identificar nuevas particiones:

**Sondeo de toda la topología del flujo**  
Use la API de `DescribeStream` para sondear el flujo con regularidad. Esto devuelve todas las particiones de la transmisión, incluidas las nuevas particiones que se hayan creado. Al comparar los resultados a lo largo del tiempo, puede detectar las particiones recién agregadas.

**Detección de particiones secundarias**  
Use la API de `DescribeStream` con el parámetro `ShardFilter` para encontrar un subconjunto de particiones. Al especificar una partición principal en la solicitud, los flujos de DynamoDB devolverán sus particiones secundarias inmediatas. Este enfoque resulta útil cuando solo se necesita rastrear el linaje de las particiones sin escanear todo el flujo.   
Las aplicaciones que consumen datos de los flujos de DynamoDB pueden pasar de manera eficiente de leer una partición cerrada a su partición secundaria mediante este parámetro `ShardFilter`, lo que evita llamadas repetidas a la API de `DescribeStream` para recuperar y recorrer el mapa de particiones de todas las particiones cerradas y abiertas. Esto ayuda a detectar rápidamente las particiones secundarias después de cerrar la partición principal, lo que hace que las aplicaciones de procesamiento de flujos sean más receptivas y rentables.

Ambos métodos le permiten mantenerse al tanto de la estructura en evolución de los flujos de DynamoDB, lo que garantiza que nunca se pierda las actualizaciones de datos importantes ni las modificaciones de las particiones.

### Límite de retención de datos para DynamoDB Streams
<a name="Streams.DataRetention"></a>

Todos los datos de DynamoDB Streams están sujetos a una vida útil de 24 horas. Puede recuperar y analizar las últimas 24 horas de actividad de cada tabla. Sin embargo, los datos de más de 24 horas se pueden recortar (eliminar) en cualquier momento.

Si deshabilita una secuencia en una tabla, los datos de esa secuencia continuarán disponibles para leerlos durante 24 horas. Transcurrido ese tiempo, los datos vencen y los registros de secuencia se eliminan automáticamente. No existe ningún mecanismo para eliminar manualmente las secuencias. Tiene que esperar hasta que se alcance el límite de retención (24 horas) para que se eliminen los registros de secuencia.

# DynamoDB Streams y período de vida
<a name="time-to-live-ttl-streams"></a>

Puede realizar copias de seguridad o bien procesar los elementos eliminados por [período de vida](TTL.md) (TTL, por sus siglas en inglés) habilitando Amazon DynamoDB Streams en la tabla y procesando los registros de transmisión de los elementos vencidos. Para obtener más información, consulte [Lectura y procesamiento de un flujo](Streams.md#Streams.Processing).

El registro de secuencias contiene el campo de identidad del usuario `Records[<index>].userIdentity`.

Los elementos que el proceso período de vida elimina tras su vencimiento tienen los siguientes campos:
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**nota**  
Cuando utilice el TTL en una tabla global, el campo `userIdentity` se configurará en la región en la que se realizó el TTL. Este campo no se establecerá en otras regiones cuando se replique la eliminación.

En el código JSON siguiente se muestra la parte pertinente de un registro de secuencias único.

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## Uso de DynamoDB Streams y Lambda para archivar elementos de TTL eliminados
<a name="streams-archive-ttl-deleted-items"></a>

La combinación de [Periodo de vida (TTL) de DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html), [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) y [AWS Lambda](https://aws.amazon.com/lambda/) puede simplificar el archivo de datos, reducir los costos de almacenamiento de DynamoDB y reducir la complejidad del código. El uso de Lambda como consumidor de flujos proporciona muchas ventajas, entre las que destaca la reducción de costos en comparación con otros consumidores como Kinesis Client Library (KCL). No se le cobran las llamadas a la API `GetRecords` en su flujo de DynamoDB cuando utiliza Lambda para consumir eventos, y Lambda puede proporcionar un filtrado de eventos mediante la identificación de patrones JSON en un evento de flujo. Con el filtrado de contenido de patrones de eventos, puede definir hasta cinco filtros diferentes para controlar qué eventos se envían a Lambda para procesarlos. De este modo, se reducen las invocaciones de sus funciones Lambda, se simplifica el código y se reduce el costo total.

Aunque DynamoDB Streams contiene todas las modificaciones de datos, como las acciones `Create`, `Modify` y `Remove`, esto puede dar lugar a invocaciones no deseadas de su función Lambda de archivo. Por ejemplo, supongamos que tiene una tabla con dos millones de modificaciones de datos por hora que se incluyen en el flujo, pero menos del 5 por ciento de estas son eliminaciones de elementos que vencerán a través del proceso TTL y se deben archivar. Con los [filtros de origen de eventos de Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), la función Lambda solo se invocará 100 000 veces por hora. El resultado con el filtrado de eventos es que solo se le cobran las invocaciones necesarias en lugar de los dos millones de invocaciones que tendría sin el filtrado de eventos.

El filtrado de eventos se aplica a la [asignación del origen de eventos de Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html), que es un recurso que lee de un evento elegido (el flujo de DynamoDB) e invoca una función Lambda. En el siguiente diagrama, puede ver cómo una función Lambda consume un elemento de TTL eliminado mediante flujos y filtros de eventos.

![\[Un elemento eliminado mediante un proceso de TTL inicia una función de Lambda que utiliza flujos y filtros de eventos.\]](http://docs.aws.amazon.com/es_es/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### Patrón de filtros de eventos de DynamoDB Time to Live
<a name="ttl-event-filter-pattern"></a>

Si agrega el siguiente JSON a sus [criterios de filtro](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html) de asignación de origen de eventos, podrá invocar su función Lambda solo para los elementos de TTL eliminados:

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### Creación de una asignación de origen de eventos de AWS Lambda
<a name="create-event-source-mapping"></a>

Utilice los siguientes fragmentos de código para crear una asignación de origen de eventos filtrados que pueda conectar con el flujo de DynamoDB de una tabla. Cada bloque de código incluye el patrón del filtro de eventos.

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# Uso del adaptador Kinesis de DynamoDB Streams para procesar registros de transmisión
<a name="Streams.KCLAdapter"></a>

Usar Amazon Kinesis Adapter es la forma recomendada de consumir secuencias de Amazon DynamoDB. La API de DynamoDB Streams es intencionadamente similar a la de Kinesis Data Streams. En ambos servicios, los data streams se componen de fragmentos, que son los contenedores de los registros de secuencia. Los API de ambos servicios contienen operaciones `ListStreams`, `DescribeStream`, `GetShards` y `GetShardIterator`. (Aunque estas acciones de DynamoDB Streams son parecidas a sus homólogas de Kinesis Data Streams, no son idénticas al 100 %).

Como usuario de DynamoDB Streams, puede sacar partido de los patrones de diseño contenidos en KCL para procesar los fragmentos de DynamoDB Streams y transmitir registros. Para ello, se utiliza DynamoDB Streams Kinesis Adapter. Kinesis Adapter implementa la interfaz de Kinesis Data Streams, de tal forma que se pueda usar KCL para consumir y procesar registros desde DynamoDB Streams. Para obtener instrucciones acerca de cómo configurar e instalar DynamoDB Streams Kinesis Adapter, consulte el [repositorio de GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter).

Puede escribir aplicaciones para Kinesis Data Streams mediante Kinesis Client Library (KCL). KCL simplifica la codificación porque proporciona abstracciones útiles por encima del API de bajo nivel de Kinesis Data Streams. Para obtener más información sobre KCL, consulte [Desarrollo de consumidores mediante la biblioteca Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*.

DynamoDB recomienda utilizar la versión 3.x de KCL con AWS SDK para Java v2.x. La versión actual del adaptador de Kinesis de DynamoDB Streams 1.x con AWS SDK para AWS SDK para Java v1.x se seguirá admitiendo por completo durante todo el ciclo de vida, tal y como estaba previsto durante el periodo de transición, en consonancia con la [Política de mantenimiento de AWS SDK y herramientas](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html).

**nota**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x dejará de recibir asistencia el 30 de enero de 2026. Le recomendamos que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [Biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en GitHub. Para obtener información sobre las últimas versiones de KCL, consulte [Uso de la biblioteca de clientes de Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte Migración de KCL 1.x a KCL 3.x.

En el siguiente diagrama se muestra cómo interaccionan estas bibliotecas entre sí.

![\[Interacción entre DynamoDB Streams, Kinesis Data Streams y KCL para procesar registros de DynamoDB Streams.\]](http://docs.aws.amazon.com/es_es/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Si DynamoDB Streams Kinesis Adapter está implementado, puede comenzar a desarrollar para la interfaz de KCL y dirigir las llamadas al API de forma transparente al punto de enlace de DynamoDB Streams.

Cuando se inicia la aplicación, llama a KCL para crear una instancia de un proceso de trabajo. Debe facilitar al proceso de trabajo información sobre la configuración de la aplicación, como el descriptor de la transmisión y las credenciales de AWS, así como el nombre de una clase de procesador de registros que usted proporcione. A medida que el proceso de trabajo ejecuta el código en el procesador de registros, lleva a cabo las siguientes tareas:
+ Se conecta a la secuencia
+ Enumera las particiones del flujo.
+ Comprueba y enumera las particiones secundarias de una partición principal cerrada dentro del flujo
+ Coordina la asociación de los fragmentos con otros procesos de trabajo (si procede)
+ Crea instancias de un procesador de registros para cada fragmento que administra
+ Extrae registros del flujo.
+ Escala la frecuencia de llamadas a la API GetRecords durante el alto rendimiento (si se configura el modo de recuperación).
+ Inserta los registros en el procesador de registros correspondiente
+ Genera puntos de comprobación para los registros procesados
+ Balancea las asociaciones entre fragmentos y procesos de trabajo cuando cambia el recuento de instancias de procesos de trabajo
+ Equilibra las asociaciones entre particiones y procesos de trabajo cuando las particiones se dividen.

El adaptador KCL admite el modo de recuperación, una característica de ajuste automático de la frecuencia de llamadas para gestionar los aumentos temporales de rendimiento. Cuando el retraso en el procesamiento de flujos supera un umbral configurable (un minuto de forma predeterminada), el modo de recuperación escala la frecuencia de llamadas a la API GetRecords en un valor configurable (tres veces de forma predeterminada) para recuperar los registros más rápido y, a continuación, vuelve a la normalidad una vez que disminuye el retraso. Esto resulta muy útil durante los periodos de alto rendimiento, en los que la actividad de escritura de DynamoDB puede saturar a los consumidores que utilizan las frecuencias de sondeo predeterminadas. El modo de recuperación se puede habilitar a través del parámetro de configuración `catchupEnabled` (falso de forma predeterminada).

**nota**  
Para obtener una descripción de los conceptos de KCL enumerados aquí, consulte [Desarrollo de consumidores mediante la biblioteca Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*.  
Para obtener más información acerca de cómo usar los flujos con AWS Lambda, consulte [DynamoDB Streams y disparadores de AWS Lambda](Streams.Lambda.md).

# Migración de KCL 1.x a KCL 3.x
<a name="streams-migrating-kcl"></a>

## Descripción general
<a name="migrating-kcl-overview"></a>

En esta guía se proporcionan instrucciones para migrar la aplicación de consumidor de KCL 1.x a KCL 3.x. Debido a las diferencias de arquitectura entre KCL 1.x y KCL 3.x, la migración requiere actualizar varios componentes para garantizar la compatibilidad.

KCL 1.x utiliza clases e interfaces diferentes en comparación con KCL 3.x. Debe migrar primero el procesador de registros, el generador de procesadores de registros y las clases de procesos de trabajo al formato compatible con KCL 3.x y, a continuación, seguir los pasos de migración de KCL 1.x a KCL 3.x.

## Pasos para realizar la migración
<a name="migration-steps"></a>

**Topics**
+ [Paso 1: migración del procesador de registros](#step1-record-processor)
+ [Paso 2: migración del generador de procesadores de registros](#step2-record-processor-factory)
+ [Paso 3: migración del proceso de trabajo](#step3-worker-migration)
+ [Paso 4: información general y recomendaciones sobre la configuración de KCL 3.x](#step4-configuration-migration)
+ [Paso 5: migración de KCL 2.x a KCL 3.x](#step5-kcl2-to-kcl3)

### Paso 1: migración del procesador de registros
<a name="step1-record-processor"></a>

En el siguiente ejemplo se muestra un procesador de registros implementado para el adaptador de Kinesis de DynamoDB Streams de KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Migración de la clase RecordProcessor**

1. Cambie las interfaces de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` y `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` a `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` tal y como se indica a continuación:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Actualice las instrucciones de importación para los métodos `initialize` y `processRecords`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Sustituya el método `shutdownRequested` por los métodos nuevos siguientes: `leaseLost`, `shardEnded` y `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

A continuación, se muestra la versión actualizada de la clase del procesador de registros:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**nota**  
El adaptador de Kinesis de DynamoDB Streams ahora usa el modelo de registro de SDKv2. En SDKv2, los objetos `AttributeValue` complejos (`BS`, `NS`, `M`, `L` y `SS`) nunca devuelven un valor nulo. Use los métodos `hasBs()`, `hasNs()`, `hasM()`, `hasL()` y `hasSs()` para verificar si estos valores existen.

### Paso 2: migración del generador de procesadores de registros
<a name="step2-record-processor-factory"></a>

El generador de procesadores de registros es responsable de la creación de procesadores de registros cuando se adquiere una asignación. A continuación, se muestra un ejemplo de un generador de la versión 1.x de KCL:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Migración de `RecordProcessorFactory`**
+ Cambie la interfaz implementada de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, tal y como se indica a continuación:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

A continuación, se muestra un ejemplo de generador de procesadores de registros de la versión 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Paso 3: migración del proceso de trabajo
<a name="step3-worker-migration"></a>

En la versión 3.0 de KCL, una nueva clase, llamada **Scheduler**, reemplaza la clase **Worker**. A continuación, se muestra un ejemplo de proceso de trabajo de la versión 1.x de KCL:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Para migrar el proceso de trabajo**

1. Cambie la instrucción `import` para la clase `Worker` por las instrucciones de importación para las clases `Scheduler` y `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importe `StreamTracker` y cambie la importación de `StreamsWorkerFactory` a `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Elija la posición desde la que desea iniciar la aplicación. Puede ser `TRIM_HORIZON` o `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Crear una instancia de `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Cree el objeto `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Cree el objeto `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Cree `Scheduler` mediante `ConfigsBuilder` tal como se muestra en el ejemplo siguiente:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**importante**  
La configuración `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` mantiene la compatibilidad entre el adaptador de Kinesis de DynamoDB Streams para KCL v3 y KCL v1, pero no entre KCL v2 y v3.

### Paso 4: información general y recomendaciones sobre la configuración de KCL 3.x
<a name="step4-configuration-migration"></a>

Para obtener una descripción detallada de las configuraciones introducidas después de KCL 1.x que son relevantes en KCL 3.x, consulte las [configuraciones de KCL](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) y la [configuración del cliente de migración de KCL](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**importante**  
En lugar de crear directamente objetos de `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` y `retrievalConfig`, recomendamos utilizar `ConfigsBuilder` para establecer configuraciones en KCL 3.x y versiones posteriores y así evitar problemas de inicialización del programador. `ConfigsBuilder` proporciona una forma más flexible y sostenible de configurar la aplicación de KCL.

#### Configuraciones con valor predeterminado actualizado en KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
En la versión 1.x de KCL, el valor predeterminado de `billingMode` se establece en `PROVISIONED`. No obstante, con la versión 3.x de KCL, el valor predeterminado de `billingMode` es `PAY_PER_REQUEST` (modo bajo demanda). Le recomendamos que utilice el modo de capacidad bajo demanda para la tabla de arrendamiento a fin de ajustar automáticamente la capacidad en función del uso. Para obtener orientación sobre cómo utilizar la capacidad aprovisionada para las tablas de arrendamiento, consulte [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
En la versión 1.x de KCL, el valor predeterminado de `idleTimeBetweenReadsInMillis` se establece en 1000 (o 1 segundo). La versión 3.x de KCL establece el valor predeterminado de i`dleTimeBetweenReadsInMillis` en 1500 (o 1,5 segundos), pero el adaptador de Kinesis de Amazon DynamoDB Streams reemplaza el valor predeterminado por 1000 (o 1 segundo).

#### Nuevas configuraciones en KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Esta configuración define el intervalo de tiempo antes de que las particiones recién descubiertas comiencen a procesarse y se calcula como 1,5 × `leaseAssignmentIntervalMillis`. Si este ajuste no se configura explícitamente, el intervalo de tiempo se establece de forma predeterminada en 1,5 × `failoverTimeMillis`. El procesamiento de nuevas particiones implica examinar la tabla de arrendamiento y consultar un índice secundario global (GSI) en la tabla de arrendamiento. Al reducir `leaseAssignmentIntervalMillis`, aumenta la frecuencia de estas operaciones de análisis y consulta, lo que se traduce en mayores costos de DynamoDB. Recomendamos establecer este valor en 2000 (o 2 segundos) para minimizar el retraso en el procesamiento de nuevas particiones.

`shardConsumerDispatchPollIntervalMillis`  
Esta configuración define el intervalo entre sondeos sucesivos por parte del consumidor de particiones para activar las transiciones de estado. En la versión 1.x de KCL, este comportamiento se controlaba mediante el parámetro `idleTimeInMillis`, que no se exponía como un ajuste configurable. Con la versión 3.x de KCL, recomendamos establecer esta configuración para que coincida con el valor utilizado para` idleTimeInMillis` en la configuración de la versión 1.x de KCL.

### Paso 5: migración de KCL 2.x a KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Para garantizar una transición fluida y la compatibilidad con la última versión de la biblioteca de clientes de Kinesis (KCL), siga los pasos del 5 al 8 de las instrucciones de la guía de migración para [actualizar de KCL 2.x a KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Para ver los problemas habituales de solución de problemas de KCL 3.x, consulte [Solución de problemas de las aplicaciones de consumidores de KCL](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Restauración de la versión de KCL anterior
<a name="kcl-migration-rollback"></a>

En este tema se explica cómo restaurar la aplicación de consumidor a la versión de KCL anterior. El proceso de restauración consta de dos pasos:

1. Ejecución de la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Nueva implementación del código de la versión de KCL anterior.

## Paso 1: ejecución de la herramienta de migración de KCL
<a name="kcl-migration-rollback-step1"></a>

Cuando necesite restaurar la versión anterior de KCL, debe ejecutar la herramienta de migración de KCL. La herramienta realiza dos tareas importantes:
+ Elimina una tabla de metadatos llamada tabla de métricas de procesos de trabajo y el índice secundario global de la tabla de arrendamiento en DynamoDB. Estos artefactos los crea KCL 3.x, pero no son necesarios al restaurar la versión anterior.
+ Hace que todos los procesos de trabajo se ejecuten en un modo compatible con KCL 1.x y comiencen a utilizar el algoritmo de equilibrio de carga utilizado en versiones anteriores de KCL. Si tiene problemas con el nuevo algoritmo de equilibrio de carga en KCL 3.x, esto mitigará el problema inmediatamente.

**importante**  
La tabla de estados del coordinador en DynamoDB debe existir y no debe eliminarse durante el proceso de migración, restauración y avance.

**nota**  
Es importante que todos los procesos de trabajo de la aplicación de consumo utilicen el mismo algoritmo de equilibrio de carga en un momento dado. La herramienta de migración de KCL se asegura de que todos los procesos de trabajo de la aplicación de consumo KCL 3.x cambien al modo compatible con KCL 1.x, de modo que todos los procesos de trabajo ejecuten el mismo algoritmo de equilibrio de carga durante la restauración de la aplicación a la versión anterior de KCL.

Puede descargar la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) en el directorio de scripts del [repositorio de GitHub de KCL](https://github.com/awslabs/amazon-kinesis-client/tree/master). Ejecute el script desde un proceso de trabajo o host con los permisos adecuados para escribir en la tabla de estados del coordinador, la tabla de métricas de los procesos de trabajo y la tabla de arrendamiento. Asegúrese de que los [permisos de IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) adecuados estén configurados para las aplicaciones de consumo de KCL. Ejecute el script solo una vez por aplicación de KCL mediante el comando especificado:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Sustituya *region* por su Región de AWS.

`--application_name`  
Este parámetro es necesario si utiliza nombres predeterminados para las tablas de metadatos de DynamoDB (tabla de arrendamiento, tabla de estados del coordinador y tabla de métricas de los proceso de trabajo). Si ha especificado nombres personalizados para estas tablas, puede omitir este parámetro. Reemplace *applicationName* por el nombre de la aplicación KCL real. La herramienta utiliza este nombre para derivar los nombres de tabla predeterminados si no se proporcionan nombres personalizados.

`--lease_table_name`  
Este parámetro es necesario cuando ha establecido un nombre personalizado para la tabla de arrendamientos en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace *leaseTableName* por el nombre de tabla personalizado que especificó para la tabla de arrendamiento.

`--coordinator_state_table_name`  
Este parámetro es necesario cuando ha establecido un nombre personalizado para la tabla de estados de coordinador en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace *coordinatorStateTableName* por el nombre de tabla personalizado que especificó para la tabla de estados de coordinador.

`--worker_metrics_table_name`  
Este parámetro es necesario cuando ha establecido un nombre personalizado para la tabla de métricas de proceso de trabajo en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace *workerMetricsTableName* por el nombre de tabla personalizado que especificó para la tabla de métricas de proceso de trabajo.

## Paso 2: nueva implementación del código con la versión de KCL anterior
<a name="kcl-migration-rollback-step2"></a>

**importante**  
Cualquier mención a la versión 2.x en la salida generada por la herramienta de migración de KCL debe interpretarse como una referencia a la versión 1.x de KCL. La ejecución del script no realiza una recuperación completa, solo cambia el algoritmo de equilibrio de carga por el utilizado en la versión 1.x de KCL.

Tras ejecutar la herramienta de migración de KCL para realizar una recuperación, verá uno de los siguientes mensajes:

Mensaje 1  
“Rollback completed. Your application was running 2x compatible functionality. Please rollback to your previous application binaries by deploying the code with your previous KCL version”.  
**Acción requerida:** esto significa que los procesos de trabajo se estaban ejecutando en el modo compatible con KCL 1.x. Vuelva a implementar el código con la versión de KCL anterior en los procesos de trabajo.

Mensaje 2  
“Rollback completed. Your KCL Application was running 3x functionality and will rollback to 2x compatible functionality. If you don't see mitigation after a short period of time, please rollback to your previous application binaries by deploying the code with your previous KCL version”.  
**Acción requerida:** esto significa que los procesos de trabajo se estaban ejecutando en modo KCL 3.x y la herramienta de migración de KCL ha cambiado todos los procesos de trabajo al modo compatible con KCL 1.x. Vuelva a implementar el código con la versión de KCL anterior en los procesos de trabajo.

Mensaje 3  
“Application was already rolled back. Any KCLv3 resources that could be deleted were cleaned up to avoid charges until the application can be rolled forward with migration”.  
**Acción requerida:** esto significa que los procesos de trabajo ya se han restaurado para ejecutarse en el modo compatible con KCL 1.x. Vuelva a implementar el código con la versión de KCL anterior en los procesos de trabajo.

# Avance a KCL 3.x después de una restauración
<a name="kcl-migration-rollforward"></a>

En este tema se explica cómo avanzar la aplicación de consumidor a KCL 3.x después de una restauración. Cuando necesite avanzar, debe completar un proceso de dos pasos:

1. Ejecución de la [herramienta de migración de KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Implemente el código con KCL 3.x.

## Paso 1: ejecución de la herramienta de migración de KCL
<a name="kcl-migration-rollforward-step1"></a>

Ejecute la herramienta de migración de KCL con el siguiente comando para avanzar a KCL 3.x:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Sustituya *region* por su Región de AWS.

`--application_name`  
Este parámetro es obligatorio si utiliza nombres predeterminados para la tabla de estados de coordinador. Si ha especificado nombres personalizados para la tabla de estados de coordinador, puede omitir este parámetro. Reemplace *applicationName* por el nombre de la aplicación KCL real. La herramienta utiliza este nombre para derivar los nombres de tabla predeterminados si no se proporcionan nombres personalizados.

`--coordinator_state_table_name`  
Este parámetro es necesario cuando ha establecido un nombre personalizado para la tabla de estados de coordinador en la configuración de KCL. Si utiliza el nombre de tabla predeterminado, puede omitir este parámetro. Reemplace *coordinatorStateTableName* por el nombre de tabla personalizado que especificó para la tabla de estados de coordinador.

Después de ejecutar la herramienta de migración en modo de avance, KCL crea los siguientes recursos de DynamoDB necesarios para KCL 3.x:
+ Un índice secundario global en la tabla de arrendamientos
+ Una tabla de métricas de proceso de trabajo

## Paso 2: implementación del código con KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Después de ejecutar la herramienta de migración de KCL para una restauración, implemente el código con KCL 3.x en los procesos de trabajo. Para completar la migración, consulte [Paso 8: complete la migración](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Tutorial: Adaptador Kinesis de DynamoDB Streams
<a name="Streams.KCLAdapter.Walkthrough"></a>

En esta sección se explica paso a paso una aplicación Java en la que se utiliza Amazon Kinesis Client Library y Amazon DynamoDB Streams Kinesis Adapter. En la aplicación se muestra un ejemplo de replicación de datos, donde la actividad de escritura de una tabla se aplica a una segunda tabla, de tal forma que el contenido de ambas se mantiene sincronizado. Para obtener el código fuente, consulte [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

El programa realiza lo siguiente:

1. Crea dos tablas de DynamoDB denominadas `KCL-Demo-src` y `KCL-Demo-dst`. En cada una de estas tablas se ha habilitado una secuencia.

1. Agrega, actualiza y elimina elementos para generar actividad de actualización en la tabla de origen. Esto hace que se escriban datos en la secuencia de la tabla.

1. Lee los registros en la transmisión, los reconstruye como solicitudes de DynamoDB y aplica las solicitudes a la tabla de destino.

1. Examina las tablas de origen y destino para comprobar que sus contenidos sean idénticos.

1. Efectúa una limpieza eliminando las tablas.

Estos pasos se describen en las siguientes secciones y la aplicación completa se muestra al final del tutorial.

**Topics**
+ [Paso 1: crear tablas de DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Paso 2: generar actividad de actualización en la tabla de origen](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Paso 3: procesar la secuencia](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Paso 4: comprobar que el contenido de ambas tablas es idéntico](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Paso 5: Eliminar](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Paso 1: crear tablas de DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

El primer paso consiste en crear dos tablas de DynamoDB, una de origen y una de destino. El `StreamViewType` de la secuencia de la tabla de origen es `NEW_IMAGE`. Esto significa que cada vez que se modifica un elemento en esta tabla, su imagen de "después" se escribe en la secuencia. De esta forma, se realiza un seguimiento en la secuencia de todas las actividades de escritura en la tabla.

En el siguiente ejemplo se muestra el código utilizado para crear las dos tablas.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Paso 2: generar actividad de actualización en la tabla de origen
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

El siguiente paso consiste en generar actividad de escritura en la tabla de origen. Mientras tiene lugar esta actividad, la secuencia de la tabla de origen también se actualiza casi en tiempo real.

En la aplicación se define una clase auxiliar con métodos que llaman a las operaciones de API `PutItem`, `UpdateItem` y `DeleteItem` para escribir los datos. En el siguiente ejemplo se muestra cómo se utilizan estos métodos.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Paso 3: procesar la secuencia
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Ahora, el programa comienza a procesar la secuencia. DynamoDB Streams Kinesis Adapter actúa como una capa transparente entre la KCL y el punto de enlace de DynamoDB Streams, para que el código pueda utilizar plenamente la KCL, en lugar de tener que realizar llamadas a DynamoDB Streams de bajo nivel. En el programa se realizan las siguientes tareas:
+ Se define una clase de procesador de registros, `StreamsRecordProcessor`, con métodos que cumplen con la definición de interfaz de KCL: `initialize`, `processRecords` y `shutdown`. El método `processRecords` contiene la lógica necesaria para leer la secuencia de la tabla de origen y escribir en la tabla de destino.
+ Define un generador de clases para la clase de procesador de registros (`StreamsRecordProcessorFactory`). Esto es necesario para los programas Java que utilizan la KCL.
+ Crea una nueva instancia del proceso de trabajo `Worker` de la KCL, asociado con el generador de clases.
+ Cierra el proceso de trabajo `Worker` cuando ha finalizado de procesar registros.

Si lo desea, habilite el modo de recuperación en la configuración del adaptador KCL de sus flujos para escalar automáticamente la tasa de llamadas a la API GetRecords tres veces (predeterminado) cuando el retraso en el procesamiento de flujos supere un minuto (predeterminado), lo que ayudará a su consumidor de flujos a gestionar los picos de alto rendimiento en su tabla.

Para obtener más información sobre la definición de la interfaz de KCL, consulte [Desarrollo de consumidores mediante la biblioteca Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) en la *Guía de desarrolladores de Amazon Kinesis Data Streams*. 

En el siguiente ejemplo se muestra el bucle principal de `StreamsRecordProcessor`. La instrucción `case` determina qué acción se debe llevar a cabo, según el valor de `OperationType` que aparece en el registro de secuencia.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Paso 4: comprobar que el contenido de ambas tablas es idéntico
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

En este punto, el contenido de las tablas de origen y destino está sincronizado. La aplicación emite solicitudes `Scan` en las dos tablas para comprobar que su contenido sea realmente idéntico.

La clase `DemoHelper` contiene un método `ScanTable` que llama a la API de bajo nivel `Scan`. El siguiente ejemplo le muestra cómo se usa.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Paso 5: Eliminar
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

La demostración ha finalizado. Por consiguiente, la aplicación elimina las tablas de origen y destino. Consulte el siguiente ejemplo de código. Incluso después de que las tablas se hayan eliminado, sus secuencias permanecerán disponibles durante un máximo de 24 horas; transcurrido este periodo se eliminan automáticamente.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Programa completo: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

A continuación encontrará el programa de Java completo que lleva a cabo las tareas descritas en [Tutorial: Adaptador Kinesis de DynamoDB Streams](Streams.KCLAdapter.Walkthrough.md). Cuando lo ejecute, debería ver un resultado similar al siguiente.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**importante**  
 Para ejecutar este programa, utilice políticas con el fin de asegurarse de que la aplicación cliente tenga acceso a DynamoDB y a Amazon CloudWatch. Para obtener más información, consulte [Políticas basadas en identidad de DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

El código de origen consta de cuatro archivos `.java`. Para crear este programa, agregue la siguiente dependencia, que incluye la biblioteca de clientes de Amazon Kinesis (KCL) 3.x y el AWS SDK para Java v2 como dependencias transitivas:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

Los archivos de origen son:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# API de bajo nivel de DynamoDB Streams: ejemplo en Java
<a name="Streams.LowLevel.Walkthrough"></a>

**nota**  
El código que se presenta en esta página no es exhaustivo y no contempla todos los escenarios de consumo de Amazon DynamoDB Streams. La manera recomendada de consumir registros de transmisión de DynamoDB consiste en usar Amazon Kinesis Adapter con la Kinesis Client Library (KCL), como se describe en [Uso del adaptador Kinesis de DynamoDB Streams para procesar registros de transmisión](Streams.KCLAdapter.md).

Esta sección contiene un programa de Java que muestra el funcionamiento de DynamoDB Streams. El programa realiza lo siguiente:

1. Crea una tabla de DynamoDB con una transmisión habilitada.

1. Describe los ajustes de secuencia de esta tabla.

1. Modifica los datos de la tabla.

1. Describe los fragmentos de la secuencia.

1. Lee los registros de secuencia de los fragmentos.

1. Busca las particiones secundarias y continúa leyendo los registros.

1. Elimina recursos.

Al ejecutar el programa, verá un resultado parecido al siguiente.

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example Ejemplo**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# DynamoDB Streams y disparadores de AWS Lambda
<a name="Streams.Lambda"></a>

Amazon DynamoDB se integra con AWS Lambda para que pueda crear *desencadenadores*, a saber, fragmentos de código que responden automáticamente a los eventos de DynamoDB Streams. Con los disparadores, puede crear aplicaciones que reaccionan ante las modificaciones de datos en las tablas de DynamoDB.

**Topics**
+ [Tutorial n.º 1: Uso de filtros para procesar todos los eventos con Amazon DynamoDB y AWS Lambda mediante la AWS CLI](Streams.Lambda.Tutorial.md)
+ [Tutorial n.º 2: uso de filtros para procesar algunos eventos con DynamoDB y Lambda](Streams.Lambda.Tutorial2.md)
+ [Prácticas recomendadas para usar DynamoDB Streams con Lambda](Streams.Lambda.BestPracticesWithDynamoDB.md)

Si habilita DynamoDB Streams en una tabla, puede asociar el nombre de recurso de Amazon (ARN) de la transmisión con una función de AWS Lambda que haya escrito. Todas las acciones de mutación a esa tabla de DynamoDB pueden capturarse como un elemento en el flujo. Por ejemplo, puede establecer un desencadenador para que, cuando se modifique un elemento de una tabla, aparezca inmediatamente un nuevo registro en el flujo de esa tabla. 

**nota**  
Si suscribe más de dos funciones de Lambda a un solo flujo de DynamoDB, podría aplicarse una limitación en la lectura.

El servicio [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) sondea el flujo en busca de nuevos registros cuatro veces por segundo. Cuando hay nuevos registros de flujo disponibles, se invoca su función Lambda de forma sincrónica. Puede suscribir hasta dos funciones Lambda al mismo flujo de DynamoDB. Si suscribe más de dos funciones de Lambda al mismo flujo de DynamoDB, podría aplicarse una limitación en la lectura.

La función Lambda puede enviar una notificación, iniciar un flujo de trabajo o realizar otras muchas acciones que le especifique. Puede escribir una función Lambda para que simplemente copie cada registro de flujo a un almacenamiento persistente, como la puerta de enlace de archivo de Amazon S3 (Amazon S3), y crear un registro de auditoría permanente de la actividad de escritura en su tabla. O bien suponga que tiene una aplicación de juegos para móviles que escribe en una tabla `GameScores`. Cada vez que se actualiza el atributo `TopScore` de la tabla `GameScores`, se escribe el registro correspondiente en la secuencia de la tabla. Este evento, a su vez, puede activar una función Lambda que publique un mensaje de felicitación en una red social. Esta función también podría escribirse para ignorar cualquier registro de flujo que no sea una actualización de `GameScores` o que no modifique el atributo `TopScore`.

Sin embargo, si la función devuelve un error, Lambda vuelve a intentar ejecutar el lote hasta que se procese correctamente o los datos caduquen. También puede configurar Lambda para que lo reintente con un lote más pequeño, limitar el número de reintentos, descartar los registros cuando sean demasiado antiguos y otras opciones.

Como práctica recomendada de rendimiento, la función Lambda debe ser de corta duración. Para evitar introducir retrasos innecesarios en el procesamiento, tampoco debe ejecutar una lógica compleja. Para un flujo de alta velocidad en concreto, es mejor desencadenar un flujo de trabajo asíncrono de funciones de posprocesamiento que funciones Lambda sincrónicas de larga duración.

 Puede usar activadores de Lambda en diferentes cuentas de AWS configurando una política basada en recursos en el flujo de DynamoDB para conceder a la función de Lambda acceso de lectura entre cuentas. Para obtener información sobre cómo configurar el flujo para permitir el acceso entre cuentas, consulte [Compartir acceso con funciones de AWS Lambda](rbac-cross-account-access.md#shared-access-cross-acount-lambda) entre cuentas en la Guía para desarrolladores de DynamoDB.

Para obtener más información sobre AWS Lambda, consulte la Guía para desarrolladores de [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/).

# Tutorial n.º 1: Uso de filtros para procesar todos los eventos con Amazon DynamoDB y AWS Lambda mediante la AWS CLI
<a name="Streams.Lambda.Tutorial"></a>

 

En este tutorial, creará un desencadenador AWS Lambda para procesar una transmisión de una tabla de DynamoDB.

**Topics**
+ [Paso 1: crear una tabla de DynamoDB con un flujo habilitado](#Streams.Lambda.Tutorial.CreateTable)
+ [Paso 2: crear un rol de ejecución para Lambda](#Streams.Lambda.Tutorial.CreateRole)
+ [Paso 3: crear un tema de Amazon SNS](#Streams.Lambda.Tutorial.SNSTopic)
+ [Paso 4: crear y probar una función Lambda](#Streams.Lambda.Tutorial.LambdaFunction)
+ [Paso 5: crear y probar un disparador](#Streams.Lambda.Tutorial.CreateTrigger)

El escenario en que tiene lugar este tutorial es Woofer, una red social sencilla. Los usuarios de Woofer se comunican utilizando *ladridos* (mensajes de texto breves) que se envían a otros usuarios de Woofer. En el siguiente diagrama se muestran los componentes y el flujo de trabajo de esta aplicación.

![\[Flujo de trabajo de una aplicación Woofer de una tabla de DynamoDB, un registro de flujo, una función de Lambda y un tema de Amazon SNS.\]](http://docs.aws.amazon.com/es_es/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. Un usuario escribe un elemento en una tabla de DynamoDB (`BarkTable`). Cada elemento de la tabla representa un ladrido.

1. Se escribe un nuevo registro de secuencia para reflejar que se ha agregado un elemento nuevo a `BarkTable`.

1. El nuevo registro de secuencia activa una función de AWS Lambda (`publishNewBark`).

1. Si el registro de transmisión indica que se ha agregado un nuevo elemento a `BarkTable`, la función de Lambda lee los datos del registro de transmisión y publica un mensaje en un tema de Amazon Simple Notification Service (Amazon SNS).

1. Los suscriptores a ese tema de Amazon SNS reciben el mensaje. En este tutorial, el único suscriptor es una dirección de correo electrónico.

**Antes de empezar**  
En este tutorial se utiliza la AWS Command Line Interface AWS CLI. Si aún no lo ha hecho, siga las instrucciones que figuran en la [Guía del usuario de AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/) para instalar y configurar la AWS CLI.

## Paso 1: crear una tabla de DynamoDB con un flujo habilitado
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

En este paso, va a crear una tabla de DynamoDB (`BarkTable`) para almacenar todos los ladridos de los usuarios de Woofer. La clave principal consta de `Username` (clave de partición) y `Timestamp` (clave de ordenación). Ambos atributos son de tipo String.

`BarkTable` tiene una secuencia habilitada. Más adelante en este tutorial, asociará una función de AWS Lambda a la secuencia para crear un disparador.

1. Introduzca el siguiente comando para crear la tabla.

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. Busque en los resultados `LatestStreamArn`.

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   Anote el valor de `region` y `accountID`, porque los necesitará para los demás pasos de este tutorial.

## Paso 2: crear un rol de ejecución para Lambda
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

En este paso, va a crear un rol de AWS Identity and Access Management (IAM) (`WooferLambdaRole`) y a asignarle permisos. El rol lo utilizará la función de Lambda que va a crear en [Paso 4: crear y probar una función Lambda](#Streams.Lambda.Tutorial.LambdaFunction). 

Asimismo, va a crear una política para el rol. La política contendrá todos los permisos que la función de Lambda va a necesitar en tiempo de ejecución.

1. Cree un archivo denominado `trust-relationship.json` con el siguiente contenido.

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. Escriba el siguiente comando para crear `WooferLambdaRole`.

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. Cree un archivo denominado `role-policy.json` con el siguiente contenido. (Sustituya `region` y `accountID` por su región de AWSy su ID de cuenta).

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   La política tiene cuatro instrucciones que permiten que `WooferLambdaRole` realice las siguientes acciones:
   + Ejecute una función Lambda (`publishNewBark`). Creará la función más adelante en este tutorial.
   + Acceder a Amazon CloudWatch Logs. La función de Lambda escribe diagnósticos en CloudWatch Logs en tiempo de ejecución.
   + Leer datos de la transmisión de DynamoDB relativos a `BarkTable`.
   + Publicar mensajes en Amazon SNS.

1. Introduzca el siguiente comando para asociar la política al `WooferLambdaRole`.

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## Paso 3: crear un tema de Amazon SNS
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

En este paso, va a crear un tema de Amazon SNS (`wooferTopic`) y suscribe una dirección de correo electrónico a ese tema. La función de Lambda utiliza este tema para publicar los ladridos nuevos de los usuarios de Woofer.

1. Ingrese el siguiente comando para crear un nuevo tema de Amazon SNS.

   ```
   aws sns create-topic --name wooferTopic
   ```

1. Introduzca el siguiente comando para suscribir una dirección de correo electrónico a `wooferTopic`. (Sustituya `region` y `accountID` por su región de AWS y su ID de cuenta; además, sustituya `example@example.com` por una dirección de correo electrónico válida).

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS envía un mensaje de confirmación a la dirección de correo electrónico indicada. Elija el enlace **Confirm subscription (Confirmar suscripción)** del mensaje para completar el proceso de suscripción.

## Paso 4: crear y probar una función Lambda
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

En este paso, va a crear una función de AWS Lambda (`publishNewBark`) para procesar los registros de secuencia de `BarkTable`.

La función `publishNewBark` solamente procesa los eventos de la secuencia que corresponden a elementos nuevos de `BarkTable`. La función lee los datos de esos eventos y, a continuación, llama a Amazon SNS para publicarlos.

1. Cree un archivo denominado `publishNewBark.js` con el siguiente contenido. Sustituya `region` y `accountID` por su región de AWS y su ID de cuenta.

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. Cree un archivo zip que contenga `publishNewBark.js`. Si tiene la utilidad de línea de comandos zip, puede introducir el siguiente comando para hacerlo.

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. Al crear la función de Lambda, debe especificar el nombre de recurso de Amazon (ARN) de `WooferLambdaRole`, que creó en [Paso 2: crear un rol de ejecución para Lambda](#Streams.Lambda.Tutorial.CreateRole). Introduzca el siguiente comando para recuperar este ARN.

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   Busque ARN para en los resultados `WooferLambdaRole`.

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   Ingrese el siguiente comando para crear la función de Lambda. Sustituya *roleARN* por el ARN de `WooferLambdaRole`.

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. Ahora pruebe `publishNewBark` para comprobar que funciona. Para ello, le facilitamos información de entrada que parece un registro auténtico de DynamoDB Streams.

   Cree un archivo denominado `payload.json` con el siguiente contenido. Sustituya `region` y `accountID` por su Región de AWS y su ID de cuenta.

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   Introduzca el siguiente comando para probar la función de `publishNewBark`.

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   Si la prueba se realiza correctamente, aparecerá el siguiente resultado.

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   Además, el archivo `output.txt` contendrá el siguiente texto.

   ```
   "Successfully processed 1 records."
   ```

   También recibirá un nuevo mensaje de correo electrónico dentro de unos minutos.
**nota**  
AWS Lambda escribe la información de diagnóstico en Amazon CloudWatch Logs. Si se produce cualquier error en la función Lambda, puede utilizar esos diagnósticos para solucionar el problema:  
Abra la consola de CloudWatch en [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
En el panel de navegación, elija **Logs**.
Elija el grupo de log siguiente: `/aws/lambda/publishNewBark`
Elija la última secuencia de log para ver el resultado (y los errores) de la función.

## Paso 5: crear y probar un disparador
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

En [Paso 4: crear y probar una función Lambda](#Streams.Lambda.Tutorial.LambdaFunction), hemos probado la función Lambda para asegurarnos de que se ejecutaba correctamente. En este paso, va a crear un *disparador*. Para ello, asociará la función de Lambda (`publishNewBark`) con el origen de eventos (la secuencia `BarkTable`).

1. Al crear el disparador, debe especificar el ARN de la secuencia de `BarkTable`. Introduzca el siguiente comando para recuperar este ARN.

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   Busque en los resultados `LatestStreamArn`.

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. Introduzca el siguiente comando para crear el disparador. Sustituya `streamARN` por el ARN de la secuencia real.

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. Prueba el disparador. Introduzca el siguiente comando para agregar un elemento a `BarkTable`.

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   Debería recibir un nuevo mensaje de correo electrónico dentro de unos minutos.

1. Abra la consola de DynamoDB y agregue algunos elementos más a `BarkTable`. Debe especificar valores para los atributos `Username` y `Timestamp`. También debe especificar un valor para `Message`, aunque no es obligatorio. Debe recibir un nuevo mensaje de correo electrónico por cada elemento que agregue a `BarkTable`.

   La función de Lambda procesa solamente los elementos nuevos que se agregan a `BarkTable`. Si actualiza o elimina un elemento de la tabla, la función no hace nada.

**nota**  
AWS Lambda escribe la información de diagnóstico en Amazon CloudWatch Logs. Si se produce cualquier error en la función Lambda, puede utilizar esos diagnósticos para solucionar el problema.  
Abra la consola de CloudWatch en [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
En el panel de navegación, elija **Logs**.
Elija el grupo de log siguiente: `/aws/lambda/publishNewBark`
Elija la última secuencia de log para ver el resultado (y los errores) de la función.

# Tutorial n.º 2: uso de filtros para procesar algunos eventos con DynamoDB y Lambda
<a name="Streams.Lambda.Tutorial2"></a>

En este tutorial, creará un desencadenador AWS Lambda para procesar solo algunos eventos en un flujo de una tabla de DynamoDB.

**Topics**
+ [Resumen global: CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [Resumen global: CDK](#Streams.Lambda.Tutorial2.CDK)

Con el [filtrado de eventos de Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) puede utilizar expresiones de filtrado para controlar qué eventos envía Lambda a su función para su procesamiento. Puede configurar hasta cinco filtros diferentes por flujos de DynamoDB. Si utiliza intervalos de lotes, Lambda aplica los criterios de filtrado a cada nuevo evento para ver si debe incluirse en el lote actual.

Los filtros se aplican mediante estructuras denominadas `FilterCriteria`. Los 3 atributos principales de `FilterCriteria` son `metadata properties`, `data properties` y `filter patterns`. 

A continuación, se muestra un ejemplo de estructura de un evento de DynamoDB Streams:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

Las `metadata properties` son los campos del objeto de evento. En el caso de DynamoDB Streams, las `metadata properties` son campos como `dynamodb` o `eventName`. 

Las `data properties` son los campos del cuerpo de evento. Para filtrar las `data properties`, asegúrese de incluirlas en `FilterCriteria` en la clave adecuada. Para los orígenes de eventos de DynamoDB, la clave de datos es `NewImage` u `OldImage`.

Por último, las reglas de filtro definirán la expresión filtro que quiere aplicar a una propiedad específica. Estos son algunos ejemplos:


| Operador de comparación | Ejemplo | Sintaxis de reglas (parcial) | 
| --- | --- | --- | 
|  Nulo  |  El tipo de producto es nulo  |  `{ "product_type": { "S": null } } `  | 
|  Vacío  |  El nombre de producto está vacío  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Igual a  |  El estado es igual a Florida  |  `{ "state": { "S": ["FL"] } } `  | 
|  Y  |  El estado del producto es igual a Florida y la categoría del producto es Chocolate  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  O  |  El estado del producto es Florida o California  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  No (Negación)  |  El estado del producto no es Florida  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Exists  |  El producto Homemade existe  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  No existe  |  El producto Homemade no existe  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Comienza por  |  PK comienza por COMPANY  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Puede especificar hasta cinco patrones de filtrado de eventos para una función Lambda. Observe que cada uno de esos cinco eventos se evaluará como un O lógico. Por lo tanto, si configura dos filtros denominados `Filter_One` y `Filter_Two`, la función Lambda ejecutará `Filter_One` O `Filter_Two`.

**nota**  
En la página de [filtrado de eventos de Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) hay algunas opciones para filtrar y comparar valores numéricos; sin embargo, en el caso de los eventos de filtrado de DynamoDB, esto no se aplica porque los números en DynamoDB se almacenan como cadenas. Por ejemplo ` "quantity": { "N": "50" }`, sabemos que es un número debido a la propiedad `"N"`.

## Resumen global: CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

Para mostrar la funcionalidad del filtrado de eventos en la práctica, a continuación se muestra una plantilla de CloudFormation de ejemplo. Esta plantilla generará una tabla de DynamoDB Simple con una clave de partición PK y una clave de clasificación SK con Amazon DynamoDB Streams habilitado. Creará una función Lambda y un rol de ejecución Lambda simple que permitirá escribir registros en Amazon CloudWatch y leer los eventos de Amazon DynamoDB Streams. También agregará la asignación del origen de los eventos entre DynamoDB Streams y la función Lambda, para que la función pueda ejecutarse cada vez que haya un evento en Amazon DynamoDB Streams.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

Después de implementar esta plantilla de CloudFormation, puede insertar el siguiente elemento de Amazon DynamoDB:

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Gracias a la sencilla función Lambda incluida en línea en esta plantilla de CloudFormation, verá los eventos en los grupos de registro de Amazon CloudWatch para la función Lambda de la siguiente manera:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**Ejemplos de filtro**
+ **Solo productos que coincidan con un estado determinado**

Este ejemplo modifica la plantilla de CloudFormation para incluir un filtro que coincida con todos los productos que provienen de Florida, con la abreviatura “FL”.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Una vez que vuelva a implementar la pila, puede agregar el siguiente elemento de DynamoDB a la tabla. Tenga en cuenta que no aparecerá en los registros de la función Lambda, porque el producto de este ejemplo es de California.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **Solo los elementos que comienzan por algunos valores en PK y SK**

En este ejemplo se modifica la plantilla de CloudFormation para incluir la siguiente condición:

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Observe que la condición AND requiere que la condición esté en el patrón, donde las claves PK y SK están en la misma expresión separadas por una coma.

O bien empieza con algunos valores en PK y SK o es de cierto estado.

En este ejemplo se modifica la plantilla de CloudFormation para incluir las siguientes condiciones:

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Observe que la condición OR se agrega con la incorporación de nuevos patrones en la sección de filtro.

## Resumen global: CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

La siguiente plantilla de formación de proyectos CDK de ejemplo muestra la funcionalidad de filtrado de eventos. Antes de trabajar con este proyecto CDK deberá instalar los [requisitos previos](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html), incluida la [ejecución de los scripts de preparación](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html).

**Crear un proyecto CDK**

Primero cree un nuevo proyecto AWS CDK, mediante la invocación de `cdk init` en un directorio vacío.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

El comando `cdk init` utiliza el nombre de la carpeta del proyecto para asignar un nombre a varios elementos del proyecto, incluidas las clases, las subcarpetas y los archivos. Los guiones del nombre de la carpeta se convierten en guiones bajos. Por lo demás, el nombre debe seguir el formato de un identificador Python. Por ejemplo, no debe comenzar por un número ni contener espacios.

Para trabajar con el nuevo proyecto, active su entorno virtual. Esto permite que las dependencias del proyecto se instalen localmente en la carpeta del proyecto, en lugar de hacerlo globalmente.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**nota**  
Es posible que reconozca esto como el comando de Mac/Linux para activar un entorno virtual. Las plantillas de Python incluyen un archivo por lotes, `source.bat`, que permite utilizar el mismo comando en Windows. El comando tradicional de Windows `.venv\Scripts\activate.bat` también funciona. Si ha inicializado su proyecto AWS CDK con AWS CDK Toolkit v1.70.0 o anterior, su entorno virtual se encuentra en el directorio `.env` en lugar de `.venv`. 

**Infraestructura básica**

Abra el archivo `./ddb_filters/ddb_filters_stack.py` en el editor de texto que desee. Este archivo se generó automáticamente al crear el proyecto AWS CDK. 

A continuación, agregue las funciones `_create_ddb_table` y `_set_ddb_trigger_function`. Estas funciones crearán una tabla de DynamoDB con la clave de partición PK y la clave de clasificación SK en modo de aprovisionamiento bajo demanda, con Amazon DynamoDB Streams habilitado de forma predeterminada para mostrar las imágenes nueva y antigua.

La función Lambda se almacenará en la carpeta `lambda` debajo del archivo `app.py`. Este archivo se creará más adelante. Incluirá la variable de entorno `APP_TABLE_NAME`, que será el nombre de la tabla de Amazon DynamoDB creada por esta pila. En la misma función concederemos permisos de lectura del flujo a la función Lambda. Por último, se suscribirá a DynamoDB Streams como origen de eventos para la función Lambda. 

Al final del archivo en el método `__init__`, llamará a las construcciones respectivas para inicializarlas en la pila. Para proyectos más grandes que requieran componentes y servicios adicionales, podría ser mejor definir estas construcciones fuera de la pila base. 

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

Ahora crearemos una función Lambda muy sencilla que imprimirá los registros en Amazon CloudWatch. Para ello, cree una carpeta nueva llamada `lambda`.

```
mkdir lambda
touch app.py
```

Con su editor de texto favorito, agregue el siguiente contenido al archivo `app.py`:

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

Asegúrese de que está en la carpeta `/ddb_filters/` y escriba el siguiente comando para crear la aplicación de muestra:

```
cdk deploy
```

En algún momento se le pedirá que confirme si desea implementar la solución. Escriba `Y` para aceptar los cambios.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

Una vez implementados los cambios, abra la consola de AWS y agregue un elemento a la tabla. 

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Los registros de CloudWatch ahora deben contener toda la información de esta entrada. 

**Ejemplos de filtro**
+ **Solo productos que coincidan con un estado determinado**

Abra el archivo `ddb_filters/ddb_filters/ddb_filters_stack.py` y modifíquelo para incluir el filtro que coincide con todos los productos que son iguales a “FL”. Esto se puede revisar justo debajo de `event_subscription` en la línea 45.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **Solo los elementos que comienzan por algunos valores en PK y SK**

Modifique el script python para incluir la siguiente condición:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **O bien comience con algunos valores en PK y SK o desde un determinado estado.**

Modifique el script python para incluir las siguientes condiciones:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Observe que la condición OR se agrega al agregar más elementos a la matriz Filters.

**Limpieza**

Localice la pila de filtros en la base de su directorio de trabajo y ejecute `cdk destroy`. Se le pedirá que confirme la eliminación del recurso:

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# Prácticas recomendadas para usar DynamoDB Streams con Lambda
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

Una función AWS Lambda se ejecuta dentro de un *contenedor*, a saber, un entorno de ejecución aislado de las demás funciones. Cuando se ejecuta una función por primera vez, AWS Lambda crea un nuevo contenedor y comienza a ejecutar el código de la función.

Una función Lambda posee un *controlador* que se ejecuta una vez en cada invocación. El controlador contiene la lógica empresarial principal de la función. Por ejemplo, la función Lambda que se muestra en [Paso 4: crear y probar una función Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) tiene un controlador que puede procesar los registros de una transmisión de DynamoDB. 

También puede proporcionar código de inicialización que se ejecuta una sola vez después de crear el contenedor, pero antes de que AWS Lambda ejecute el controlador por primera vez. La función Lambda que se muestra en [Paso 4: crear y probar una función Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) incluye código de inicialización que importa el SDK para JavaScript en Node.js y crea un cliente para Amazon SNS. Estos objetos únicamente deben definirse una vez, fuera del controlador.

Después de ejecutar la función, AWS Lambda puede optar por reutilizar el contenedor en invocaciones posteriores de la función. En este caso, el controlador de la función podría volver a utilizar los recursos que se han definido en el código de inicialización. (No se puede controlar durante cuánto tiempo AWS Lambda conservará el contenedor ni si este se reutilizará o no).

Para los desencadenadores de DynamoDB que utilizan AWS Lambda, recomendamos lo siguiente:
+ AWSLas instancias de los clientes de servicios de deben crearse en el código de inicialización, no en el controlador. Esto permite que AWS Lambda reutilice las conexiones existentes mientras dure la vida útil del contenedor.
+ En general, no es necesario administrar de forma explícita las conexiones ni implementar la agrupación de conexiones, porque AWS Lambda lo hace automáticamente.

Un consumidor de Lambda para un flujo de DynamoDB no garantiza exactamente una entrega y puede generar duplicados. Asegúrese de que el código de la función de Lambda sea idempotente para evitar que surjan problemas inesperados por la aparición de duplicados.

Para obtener más información, consulte [Prácticas recomendadas para trabajar con las funciones AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html) en la *Guía para desarrolladores de AWS Lambda*.

# DynamoDB Streams y Apache Flink
<a name="StreamsApacheFlink.xml"></a>

Puede consumir registros de Amazon DynamoDB Streams con Apache Flink. Con [Amazon Managed Service para Apache Flink](https://aws.amazon.com/managed-service-apache-flink/), puede transformar y analizar datos de streaming en tiempo real mediante Apache Flink. Apache Flink es un marco de procesamiento de flujos de código abierto para procesar datos en tiempo real. El conector de Amazon DynamoDB Streams para Apache Flink simplifica la creación y la administración de las cargas de trabajo de Apache Flink y permite integrar aplicaciones con otros Servicios de AWS.

Amazon Managed Service para Apache Flink lo ayuda a crear rápidamente aplicaciones de procesamiento de flujos de extremo a extremo para análisis de registros, análisis de flujos de clics, Internet de las cosas (IoT), tecnología publicitaria, juegos y mucho más. Los cuatro casos de uso más comunes son la extracción-transformación-carga (ETL) de streaming, las aplicaciones controladas por eventos, los análisis en tiempo real con capacidad de respuesta y la consulta interactiva de flujos de datos. Para obtener más información sobre la escritura en Apache Flink desde Amazon DynamoDB Streams, consulte [Conector de Amazon DynamoDB Streams](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/).