

# Introducción a Kinesis Data Streams para Amazon DynamoDB
<a name="kds_gettingstarted"></a>

En esta sección se describe cómo usar las tablas de Kinesis Data Streams para Amazon DynamoDB con la consola de Amazon DynamoDB, la AWS Command Line Interface (AWS CLI) y la API.

## Creación de un flujo de datos de Amazon Kinesis activo
<a name="kds_gettingstarted.making-changes"></a>

Todos estos ejemplos utilizan la tabla `Music` de DynamoDB que se creó como parte del tutorial [Introducción a DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStartedDynamoDB.html).

Para obtener más información sobre cómo crear consumidores y conectar el flujo de datos de Kinesis a otros servicios de AWS, consulte [Lectura de datos de Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*.

**nota**  
 Cuando utilice particiones de KDS por primera vez, le recomendamos configurar los fragmentos para escalar verticalmente y reducir verticalmente según los patrones de uso. Cuando haya acumulado más datos sobre los patrones de uso, podrá ajustar las particiones de la transmisión para que coincidan. 

------
#### [ Console ]

1. Inicie sesión en la Consola de administración de AWS y abra la consola de Kinesis en [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/).

1. Seleccionar **Creación de flujos de datos** y siga las instrucciones para crear una transmisión llamada `samplestream`. 

1. Abra la consola de DynamoDB en [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. En el panel de navegación del lado izquierdo de la consola, elija **Tablas**.

1. Elija la tabla **Music**.

1. Elija la pestaña **Exports and streams** (Exportaciones y flujos).

1. (Opcional) En **Detalles de Amazon Kinesis Data Streams**, puede cambiar la precisión de la marca de tiempo del registro de microsegundos (predeterminado) a milisegundos. 

1. Elija **samplestream** de la lista desplegable.

1. Seleccione el botón **Activar**.

------
#### [ AWS CLI ]

1. Cree un flujo de datos de Kinesis llamado `samplestream` mediante el uso del [comando create-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html).

   ```
   aws kinesis create-stream --stream-name samplestream --shard-count 3 
   ```

   Consulte [Consideraciones sobre la administración de particiones para Kinesis Data Streams](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment) antes de configurar el número de fragmentos para el flujo de datos de Kinesis.

1. Verifique que la transmisión de Kinesis esté activa y lista para su uso mediante el [comando describe-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/describe-stream.html).

   ```
   aws kinesis describe-stream --stream-name samplestream
   ```

1. Habilite el streaming de Kinesis en la tabla de DynamoDB mediante el `enable-kinesis-streaming-destination` comando. Reemplace el valor de `stream-arn` por el que `describe-stream` devolvió en el paso anterior. Si lo desea, active la transmisión con una precisión más detallada (microsegundos) de los valores de marca de tiempo devueltos en cada registro.

   Active la transmisión con una precisión de marca de tiempo de microsegundos:

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
     --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

   O active la transmisión con la precisión de marca de tiempo predeterminada (milisegundos):

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
   ```

1. Verifique si el streaming de Kinesis está activa en la tabla mediante el comando `describe-kinesis-streaming-destination` de DynamoDB.

   ```
   aws dynamodb describe-kinesis-streaming-destination --table-name Music
   ```

1. Escribir datos en la tabla de DynamoDB utilizando el comando `put-item`, tal y como se describe en la [Guía para desarrolladores de DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-2.html).

   ```
   aws dynamodb put-item \
       --table-name Music  \
       --item \
           '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
   
   aws dynamodb put-item \
       --table-name Music \
       --item \
           '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
   ```

1. Uso del comando de la CLI [get-records](https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html) para Kinesis para recuperar el contenido del flujo de Kinesis. A continuación, utilice el siguiente fragmento de código para deserializar el contenido de la transmisión.

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we want to fetch the value
        * of this attribute from the new item image. The following code fetches this value.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------
#### [ Java ]

1. Siga las instrucciones de la guía para desarrolladores de Kinesis Data Streams para [crear](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html) una secuencia de datos de Kinesis llamada `samplestream` usando Java.

   Consulte [Consideraciones sobre la administración de particiones para Kinesis Data Streams](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment) antes de configurar el número de fragmentos para el flujo de datos de Kinesis. 

1. Use el siguiente fragmento de código para habilitar el streaming de Kinesis en la tabla de DynamoDB. Si lo desea, active la transmisión con una precisión más detallada (microsegundos) de los valores de marca de tiempo devueltos en cada registro. 

   Active la transmisión con una precisión de marca de tiempo de microsegundos:

   ```
   EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
     .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
     .build();
   
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .enableKinesisStreamingConfiguration(enableKdsConfig)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

   O active la transmisión con la precisión de marca de tiempo predeterminada (milisegundos):

   ```
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

1. Siga las instrucciones de la *Guía para desarrolladores de Kinesis Data Streams* para [leer](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html) desde el flujo de datos creado.

1. Utilice el siguiente fragmento de código para deserializar el contenido del flujo.

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
        * of this attribute from the new item image, the below code would fetch this.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------

## Aplicación de cambios en un flujo de datos de Amazon Kinesis activo
<a name="kds_gettingstarted.making-changes"></a>

En esta sección se describe cómo realizar cambios en una configuración de Kinesis Data Streams para DynamoDB desde la consola, la AWS CLI o la API.

**Consola de administración de AWS**

1. Abra la consola de DynamoDB desde [ttps://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. Busque su tabla.

1. Elija la pestaña **Exportaciones y flujos**.

**AWS CLI**

1. Llame a `describe-kinesis-streaming-destination` para confirmar que el flujo es `ACTIVE`. 

1. Llame a `UpdateKinesisStreamingDestination`, como en este ejemplo:

   ```
   aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

1. Llame a `describe-kinesis-streaming-destination` para confirmar que el flujo es `UPDATING`.

1. Llame a `describe-kinesis-streaming-destination` periódicamente hasta que el estado de la transmisión sea `ACTIVE` de nuevo. Las actualizaciones de precisión de la marca de tiempo suelen aplicarse al cabo de unos cinco minutos. Una vez actualizado el estado, significa que la actualización se ha completado y que el nuevo valor de precisión se aplicará a los registros futuros.

1. Escriba en la tabla con `putItem`.

1. Uso el comando `get-records` de Kinesis para recuperar el contenido del flujo.

1. Confirme que el `ApproximateCreationDateTime` de las escrituras tienen la precisión deseada.

**API de Java**

1. Proporcione un fragmento de código que construya una solicitud `UpdateKinesisStreamingDestination` y una respuesta `UpdateKinesisStreamingDestination`. 

1. Proporcione un fragmento de código que construya una solicitud `DescribeKinesisStreamingDestination` y una `DescribeKinesisStreamingDestination response`.

1. Llame a `describe-kinesis-streaming-destination` periódicamente hasta que el estado del flujo sea `ACTIVE` de nuevo, lo que significa que la actualización se ha completado y que el nuevo valor de precisión se aplicará a los registros futuros.

1. Realice escrituras en la tabla.

1.  Lea el flujo y deserialice el contenido del flujo.

1. Confirme que el `ApproximateCreationDateTime` de las escrituras tienen la precisión deseada.