

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Nozioni di base su Kinesis Data Streams per Amazon DynamoDB
<a name="kds_gettingstarted"></a>

Questa sezione descrive come utilizzare Kinesis Data Streams per le tabelle Amazon DynamoDB con la console Amazon DynamoDB, il () e l'API AWS Command Line Interface .AWS CLI

## Creazione di un flusso di dati Amazon Kinesis attivo
<a name="kds_gettingstarted.making-changes"></a>

Tutti questi esempi utilizzano la tabella DynamoDB `Music` che è stata creata come parte del tutorial [Nozioni di base su DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStartedDynamoDB.html).

Per ulteriori informazioni su come creare consumatori e connettere il flusso di dati Kinesis ad altri servizi AWS , consulta [Leggere i dati dal flusso di dati Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html), nella *Guida per gli sviluppatori del flusso di dati Amazon Kinesis*.

**Nota**  
 Quando utilizzi per la prima volta le partizioni KDS, consigliamo di impostarle in modo che aumentino o diminuiscano in base ai modelli di utilizzo. Dopo aver accumulato ulteriori dati sui modelli di utilizzo, è possibile adattare le partizioni nel flusso di conseguenza. 

------
#### [ Console ]

1. Accedi Console di gestione AWS e apri la console Kinesis all'indirizzo. [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)

1. Scegli **Crea flusso di dati** e segui le istruzioni per creare un flusso denominato `samplestream`. 

1. Apri la console DynamoDB all'indirizzo. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Nel riquadro di navigazione sul lato sinistro della console scegli **Tables (Tabelle)**.

1. Seleziona la tabella **Music**.

1. Scegli la scheda **Exports and streams (Esportazioni e flussi)**.

1. (Facoltativo) Nella sezione **Dettagli del flusso di dati Amazon Kinesis**, è possibile modificare la precisione del timestamp del record da microsecondi (impostazione predefinita) a millisecondi. 

1. Scegli **samplestream** dall'elenco a discesa.

1. Seleziona il pulsante **Attiva**.

------
#### [ AWS CLI ]

1. Crea un flusso di dati Kinesis denominato `samplestream` utilizzando il comando [create-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html).

   ```
   aws kinesis create-stream --stream-name samplestream --shard-count 3 
   ```

   Prima di configurare il numero di partizioni per il flusso di dati Kinesis, consulta [Considerazioni sulla gestione delle partizioni per Kinesis Data Streams](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment).

1. Verificare che il flusso Kinesis sia attivo e pronto per l'uso utilizzando il comando [describe-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/describe-stream.html).

   ```
   aws kinesis describe-stream --stream-name samplestream
   ```

1. Abilitare lo streaming di Kinesis sulla tabella DynamoDB utilizzando il comando DynamoDB `enable-kinesis-streaming-destination`. Sostituisci il valore `stream-arn` con quello restituito da `describe-stream` nella fase precedente. Facoltativamente, abilita lo streaming con una precisione più granulare (microsecondi) dei valori di timestamp restituiti su ogni record.

   Abilita lo streaming con la precisione del timestamp in microsecondi:

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
     --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

   Oppure abilita lo streaming con la precisione del timestamp predefinita (millisecondi):

   ```
   aws dynamodb enable-kinesis-streaming-destination \
     --table-name Music \
     --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
   ```

1. Abilitare lo streaming di Kinesis sulla tabella DynamoDB utilizzando il comando `describe-kinesis-streaming-destination` DynamoDB.

   ```
   aws dynamodb describe-kinesis-streaming-destination --table-name Music
   ```

1. Scrivere i dati nella tabella DynamoDB utilizzando il comando `put-item`, come descritto nella [Guida per gli sviluppatori di DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-2.html).

   ```
   aws dynamodb put-item \
       --table-name Music  \
       --item \
           '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
   
   aws dynamodb put-item \
       --table-name Music \
       --item \
           '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
   ```

1. Utilizza il comando [get-record](https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html) della CLI di Kinesis per recuperare il contenuto del flusso Kinesis. Quindi utilizzare il seguente frammento di codice per deserializzare il contenuto del flusso.

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we want to fetch the value
        * of this attribute from the new item image. The following code fetches this value.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------
#### [ Java ]

1. Seguire le istruzioni contenute nella guida per gli sviluppatori di Kinesis Data Streams per [creare](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-create-stream.html) un flusso di dati Kinesis denominato `samplestream` tramite Java.

   Prima di configurare il numero di partizioni per il flusso di dati Kinesis, consulta [Considerazioni sulla gestione delle partizioni per Kinesis Data Streams](kds_using-shards-and-metrics.md#kds_using-shards-and-metrics.shardmanagment). 

1. Utilizzo il seguente frammento di codice per abilitare lo streamingKinesis sulla tabella DynamoDB. Facoltativamente, abilita lo streaming con una precisione più granulare (microsecondi) dei valori di timestamp restituiti su ogni record. 

   Abilita lo streaming con la precisione del timestamp in microsecondi:

   ```
   EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
     .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
     .build();
   
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .enableKinesisStreamingConfiguration(enableKdsConfig)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

   Oppure abilita lo streaming con la precisione del timestamp predefinita (millisecondi):

   ```
   EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
     .tableName(tableName)
     .streamArn(kdsArn)
     .build();
   
   EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
   ```

1. Segui le istruzioni contenute nella guida per gli sviluppatori di *Kinesis Data Streams* per [leggere](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html) dal flusso di dati creato.

1. Utilizza il seguente frammento di codice per deserializzare il contenuto del flusso.

   ```
   /**
    * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
    */
   public void processRecord(Record kinesisRecord) throws IOException {
       ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
       JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
       JsonNode dynamoDBRecord = rootNode.get("dynamodb");
       JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
       JsonNode newItemImage = dynamoDBRecord.get("NewImage");
       Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
   
       /**
        * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
        * of this attribute from the new item image, the below code would fetch this.
        */
       JsonNode attributeNode = newItemImage.get("stringName");
       JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
       String attributeValue = attributeValueNode.textValue();
       System.out.println(attributeValue);
   }
   
   private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
       JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
       JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
       if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
           return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
       }
       return Instant.ofEpochMilli(timestampJson.longValue());
   }
   ```

------

## Applicazione di modifiche a un flusso di dati Amazon Kinesis attivo
<a name="kds_gettingstarted.making-changes"></a>

Questa sezione descrive come apportare modifiche a una configurazione attiva di Kinesis Data Streams for DynamoDB utilizzando la console e l'API. AWS CLI 

**Console di gestione AWS**

1. Apri la console DynamoDB all'indirizzo [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Vai alla tabella.

1. Seleziona la scheda **Esportazioni e flussi**.

**AWS CLI**

1. Chiama `describe-kinesis-streaming-destination` per confermare che il flusso sia `ACTIVE`. 

1. Chiama `UpdateKinesisStreamingDestination`, come in questo esempio:

   ```
   aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
   ```

1. Chiama `describe-kinesis-streaming-destination` per confermare che il flusso sia `UPDATING`.

1. Chiama `describe-kinesis-streaming-destination` periodicamente fino a quando lo stato dello streaming non è nuovamente `ACTIVE`. In genere sono necessari fino a 5 minuti perché gli aggiornamenti alla precisione del timestamp entrino in vigore. Una volta che lo stato è aggiornato, significa che l’aggiornamento è completo e il nuovo valore di precisione verrà applicato ai record futuri.

1. Scrivi nella tabella usando `putItem`.

1. Utilizza il comando `get-records` di Kinesis per recuperare il contenuto del flusso.

1. Verifica che `ApproximateCreationDateTime` delle scritture presenti la precisione desiderata.

**API Java**

1. Fornisci un frammento di codice che costruisce una richiesta `UpdateKinesisStreamingDestination` e una risposta `UpdateKinesisStreamingDestination`. 

1. Fornisci un frammento di codice che costruisce una richiesta `DescribeKinesisStreamingDestination` e una `DescribeKinesisStreamingDestination response`.

1. Chiama `describe-kinesis-streaming-destination` periodicamente fino a quando lo stato dello streaming non è nuovamente `ACTIVE`, a indicare che l’aggiornamento è completo e il nuovo valore di precisione verrà applicato ai record futuri.

1. Effettua le scritture sulla tabella.

1.  Effettua la lettura dal flusso e deserializza il contenuto del flusso.

1. Verifica che `ApproximateCreationDateTime` delle scritture presenti la precisione desiderata.