

# Conceitos básicos do Kinesis Data Streams para o Amazon DynamoDB
<a name="kds_gettingstarted"></a>

Esta seção descreve como usar o Kinesis Data Streams para tabelas do Amazon DynamoDB com o console do Amazon DynamoDB, a AWS Command Line Interface (AWS CLI) e a API.

## Criar um fluxo de dados ativo do Amazon Kinesis
<a name="kds_gettingstarted.making-changes"></a>

Todos esses exemplos usam a tabela `Music` do DynamoDB que foi criada como parte do tutorial [Conceitos básicos do DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStartedDynamoDB.html).

Para saber mais sobre como criar consumidores e conectar seu fluxo de dados do Kinesis a outros produtos da AWS, consulte [Ler dados do Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*.

**nota**  
 Quando você estiver usando fragmentos do KDS pela primeira vez, recomendamos configurá-los para que aumentem e reduzam a escala verticalmente de acordo com os padrões de uso. Depois de acumular mais dados sobre os padrões de uso, você poderá ajustar os fragmentos em seu fluxo para fazer a correspondência. 

------
#### [ Console ]

1. Faça login no Console de gerenciamento da AWS e abra o console do Kinesis em [ https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/).

1. Escolha **Create data stream** (Criar fluxo de dados) e siga as instruções para criar um fluxo chamado `samplestream`. 

1. Abra o console do DynamoDB em [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. No painel de navegação, no lado esquerdo do console, selecione **Tables** (Tabelas).

1. Escolha a tabela **Music**.

1. Escolha a guia **Exports and streams** (Exportações e fluxos).

1. (Opcional) Em **Detalhes do stream de dados do Amazon Kinesis**, você pode alterar a precisão do carimbo de data e hora do registro de microssegundos (padrão) para milissegundos. 

1. Escolha **samplestream** na lista suspensa.

1. Escolha o botão **Ativar**.

------
#### [ AWS CLI ]

1. Criar um fluxo de dados do Kinesis denominado `samplestream` usando o [comando create-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html).

   ```
   aws kinesis create-stream --stream-name samplestream --shard-count 3 
   ```

   Consulte [Considerações sobre gerenciamento de fragmentos para o Kinesis Data Streams](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment) antes de definir o número de fragmentos para o fluxo de dados do Kinesis.

1. Verifique se o stream do Kinesis está ativo e pronto para uso usando o [comando describe-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/describe-stream.html).

   ```
   aws kinesis describe-stream --stream-name samplestream
   ```

1. Habilite o streaming do Kinesis na tabela do DynamoDB usando o comando `enable-kinesis-streaming-destination` do DynamoDB. Substitua o valor de `stream-arn` pelo que foi retornado por `describe-stream` na etapa anterior. Opcionalmente, habilite o streaming com uma precisão mais granular (microssegundos) dos valores de carimbo de data e hora retornados em cada registro.

   Habilite o streaming com precisão de carimbo de data e hora em microssegundos:

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
     --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

   Ou habilite o streaming com a precisão padrão do carimbo de data e hora (milissegundos):

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
   ```

1. Verifique se o streaming do Kinesis está ativa na tabela usando o comando `describe-kinesis-streaming-destination` do DynamoDB.

   ```
   aws dynamodb describe-kinesis-streaming-destination --table-name Music
   ```

1. Grave os dados na tabela do DynamoDB usando o comando `put-item`, conforme descrito no [Guia do desenvolvedor do DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-2.html).

   ```
   aws dynamodb put-item \
       --table-name Music  \
       --item \
           '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
   
   aws dynamodb put-item \
       --table-name Music \
       --item \
           '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
   ```

1. Use o comando [get-records](https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html) da CLI do Kinesis para recuperar o conteúdo de fluxo do Kinesis. Em seguida, use o trecho de código a seguir para desserializar o conteúdo do fluxo.

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we want to fetch the value
        * of this attribute from the new item image. The following code fetches this value.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------
#### [ Java ]

1. Siga as instruções no Guia do desenvolvedor do Kinesis Data Streams para [criar](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html) um fluxo de dados do Kinesis chamada `samplestream` usando Java.

   Consulte [Considerações sobre gerenciamento de fragmentos para o Kinesis Data Streams](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment) antes de definir o número de fragmentos para o fluxo de dados do Kinesis. 

1. Use o trecho de código a seguir para habilitar a transmissão do Kinesis na tabela do DynamoDB. Opcionalmente, habilite o streaming com uma precisão mais granular (microssegundos) dos valores de carimbo de data e hora retornados em cada registro. 

   Habilite o streaming com precisão de carimbo de data e hora em microssegundos:

   ```
   EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
     .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
     .build();
   
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .enableKinesisStreamingConfiguration(enableKdsConfig)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

   Ou habilite o streaming com a precisão padrão do carimbo de data e hora (milissegundos):

   ```
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

1. Siga as instruções do [Kinesis Data Streams developer guide](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html) (Guia do desenvolvedor do Kinesis Data Streams) para *ler* o fluxo de dados criado.

1. Use o trecho de código a seguir para desserializar o conteúdo do fluxo

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
        * of this attribute from the new item image, the below code would fetch this.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------

## Alterar um fluxo de dados ativo do Amazon Kinesis
<a name="kds_gettingstarted.making-changes"></a>

Esta seção descreve como alterar uma configuração ativa do Kinesis Data Streams para o DynamoDB usando o console, a AWS CLI e a API.

**Console de gerenciamento da AWS**

1. Abra o console do DynamoDB em [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. Acesse a tabela.

1. Escolha **Exportações e streams**.

**AWS CLI**

1. Chame `describe-kinesis-streaming-destination` para confirmar se o fluxo está `ACTIVE`. 

1. Chame `UpdateKinesisStreamingDestination`, como neste exemplo:

   ```
   aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

1. Chame `describe-kinesis-streaming-destination` para confirmar se o fluxo está `UPDATING`.

1. Chame `describe-kinesis-streaming-destination` periodicamente até que o status de streaming volte a ser `ACTIVE`. Normalmente, demora até 5 minutos para que as atualizações de precisão do carimbo de data e hora entrem em vigor. A atualização desse status indicará que a operação terá sido concluída e que o novo valor de precisão será aplicado em registros futuros.

1. Grave na tabela usando `putItem`.

1. Use o comando `get-records` do Kinesis para obter o conteúdo do fluxo.

1. Confirme se `ApproximateCreationDateTime` das gravações tem a precisão desejada.

**API Java**

1. Forneça um trecho de código que construa uma solicitação `UpdateKinesisStreamingDestination` e uma resposta `UpdateKinesisStreamingDestination`. 

1. Forneça um trecho de código que construa uma solicitação `DescribeKinesisStreamingDestination` e uma `DescribeKinesisStreamingDestination response`.

1. Chame `describe-kinesis-streaming-destination` periodicamente até que o status de streaming volte a ser `ACTIVE`, indicando que a atualização foi concluída e que o novo valor de precisão será aplicado em registros futuros.

1. Faça gravações na tabela.

1.  Faça uma leitura do fluxo e desserialize o conteúdo do fluxo.

1. Confirme se `ApproximateCreationDateTime` das gravações tem a precisão desejada.