

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Acquisizione dei dati di modifica per DynamoDB Streams
<a name="Streams"></a>

 DynamoDB Streams acquisisce una sequenza cronologica di modifiche a livello di elemento in una tabella DynamoDB qualsiasi e le archivia in un log per un massimo di 24 ore. Le applicazioni possono accedere a questo log e visualizzare gli elementi di dati nel rispettivo stato prima e dopo la modifica, praticamente in tempo reale.

 La crittografia a riposo crittografa i dati in DynamoDB Streams. Per ulteriori informazioni, consulta [Crittografia a riposo per DynamoDB](EncryptionAtRest.md).

Un *flusso DynamoDB* è un flusso ordinato di informazioni sulle modifiche apportate agli elementi in una tabella DynamoDB. Quando si abilita un flusso in una tabella, DynamoDB acquisisce informazioni su ogni modifica apportata agli elementi di dati nella tabella.

Ogni volta che un'applicazione crea, aggiorna o elimina elementi nella tabella, DynamoDB Streams scrive un record di flusso con gli attributi della chiave primaria degli elementi che sono stati modificati. Un *record di flusso* contiene informazioni su una modifica di dati apportata a un singolo elemento in una tabella DynamoDB. Puoi configurare il flusso in modo che i record di flusso acquisiscano informazioni aggiuntive, come le immagini "prima" e "dopo" degli elementi modificati.

DynamoDB Streams garantisce quanto segue:
+ Ogni record di flusso viene visualizzato esattamente una volta nel flusso.
+ Per ogni elemento modificato in una tabella DynamoDB, i record di flusso vengono visualizzati nella stessa sequenza delle modifiche effettive apportate all'elemento.

DynamoDB Streams scrive i record di flusso praticamente in tempo reale per permettere di creare applicazioni che utilizzano questi flussi e agiscono in base al contenuto.

**Topics**
+ [Endpoint per DynamoDB Streams](#Streams.Endpoints)
+ [Abilitazione di un flusso](#Streams.Enabling)
+ [Lettura ed elaborazione di un flusso](#Streams.Processing)
+ [DynamoDB Streams e Time to Live](time-to-live-ttl-streams.md)
+ [Utilizzo dell'adattatore DynamoDB Streams Kinesis per elaborare i record di flusso](Streams.KCLAdapter.md)
+ [API di basso livello DynamoDB Streams: esempio Java](Streams.LowLevel.Walkthrough.md)
+ [Streams e trigger DynamoDB AWS Lambda](Streams.Lambda.md)
+ [Flussi DynamoDB e Apache Flink](StreamsApacheFlink.xml.md)

## Endpoint per DynamoDB Streams
<a name="Streams.Endpoints"></a>

AWS mantiene endpoint separati per DynamoDB e DynamoDB Streams. Per usare tabelle e indici di database, l'applicazione deve accedere a un endpoint DynamoDB. Per leggere ed elaborare i record di DynamoDB Streams, l'applicazione deve accedere a un endpoint DynamoDB Streams nella stessa regione.

DynamoDB Streams offre due set di endpoint. Questi sono:
+ **IPv4-only endpoint: endpoint** con la convenzione di denominazione. `streams.dynamodb.<region>.amazonaws.com`
+ Endpoint **dual-stack: nuovi endpoint** compatibili con entrambi e che seguono la convenzione di denominazione. IPv4 IPv6 `streams-dynamodb.<region>.api.aws`

**Nota**  
Per l'elenco completo delle regioni e degli endpoint di DynamoDB e DynamoDB Streams, consulta [Regioni ed endpoint](https://docs.aws.amazon.com/general/latest/gr/rande.html) in *Riferimenti generali di AWS*.

 AWS SDKs Forniscono client separati per DynamoDB e DynamoDB Streams. A seconda dei requisiti, l'applicazione può accedere a un endpoint DynamoDB, a un endpoint DynamoDB Streams o a entrambi contemporaneamente. Per connettersi a entrambi gli endpoint, l'applicazione deve creare un'istanza di due client, uno per DynamoDB e uno per DynamoDB Streams.

## Abilitazione di un flusso
<a name="Streams.Enabling"></a>

È possibile abilitare uno stream su una nuova tabella quando lo si crea utilizzando o uno dei AWS CLI . AWS SDKs È inoltre possibile abilitare o disabilitare un flusso in una tabella esistente oppure modificare le impostazioni di un flusso. DynamoDB Streams opera in modo asincrono, pertanto non vi è alcun impatto sulle prestazioni di una tabella se si abilita un flusso.

Il modo più semplice per gestire DynamoDB Streams consiste nell'usare la Console di gestione AWS.

1. Accedi Console di gestione AWS e apri la console DynamoDB all'indirizzo. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Nel pannello di controllo della console DynamoDB, scegli **Tabelle** e seleziona una tabella esistente.

1. Scegli la scheda **Exports and streams (Esportazioni e flussi)**.

1. Nella sezione **Dettagli del flusso DynamoDB**, seleziona **Attiva.**

1. Nella pagina **Attiva il flusso DynamoDB**, seleziona le informazioni che verranno scritte nel flusso a ogni modifica dei dati nella tabella:
   + **Key attributes only (Solo attributi chiave)**: solo gli attributi chiave dell'elemento modificato.
   + **Nuova immagine**: l'intero elemento, così come visualizzato dopo che è stato modificato.
   + **Vecchia immagine**: l'intero elemento, così come visualizzato prima della modifica.
   + **Immagini nuove e vecchie**: le immagini dell'elemento nuove e vecchie.

   Dopo aver configurato le impostazioni, seleziona **Attiva il flusso**.

1. (Facoltativo) Per disabilitare un flusso esistente, seleziona **Disattiva**in **Dettagli del flusso DynamoDB**.

Per abilitare o modificare un flusso, puoi anche usare le operazioni API `CreateTable` o `UpdateTable`. Il parametro `StreamSpecification` determina la configurazione del flusso:
+ `StreamEnabled`: specifica se un flusso è abilitato (`true`) o disabilitato (`false`) per la tabella.
+ `StreamViewType`: specifica le informazioni che verranno scritte nel flusso a ogni modifica dei dati nella tabella:
  + `KEYS_ONLY`: solo gli attributi chiave dell'elemento modificato.
  + `NEW_IMAGE`: l'intero elemento, così come visualizzato dopo che è stato modificato.
  + `OLD_IMAGE`: l'intero elemento, così come visualizzato prima della modifica.
  + `NEW_AND_OLD_IMAGES`: le immagini dell'elemento nuove e vecchie.

Puoi abilitare o disabilitare un flusso in qualsiasi momento. Tuttavia, se provi ad abilitare un flusso in una tabella che include già un flusso riceverai un'eccezione `ValidationException`. Inoltre, se si tenta di disabilitare un flusso in una tabella che non include alcun flusso, verrà mostrata un’eccezione `ValidationException`.

Quando `StreamEnabled` viene impostato su `true`, DynamoDB crea un nuovo flusso con un descrittore di flusso univoco assegnato. Se disabiliti e quindi riabiliti un flusso nella tabella, viene creato un nuovo flusso con un descrittore di flusso diverso.

Ogni flusso è identificato in modo univoco da un Amazon Resource Name (ARN). Di seguito è riportato un ARN di esempio per un flusso in una tabella DynamoDB denominata `TestTable`.

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

Per determinare l'ultimo descrittore di flusso per una tabella, inviare una richiesta `DescribeTable` DynamoDB e cercare l'elemento `LatestStreamArn` nella risposta.

**Nota**  
Non è possibile modificare un `StreamViewType` una volta configurato un flusso. Se è necessario apportare modifiche a un flusso dopo che è stato configurato, è necessario disabilitare il flusso corrente e crearne uno nuovo.

## Lettura ed elaborazione di un flusso
<a name="Streams.Processing"></a>

Per leggere ed elaborare un flusso, l'applicazione deve connettersi a un endpoint DynamoDB Streams ed emettere le richieste API.

Un flusso è costituito da *record di flusso*. Ogni record di flusso rappresenta una singola modifica dei dati nella tabella DynamoDB cui appartiene il flusso. A ogni record di flusso è assegnato un numero in sequenza, corrispondente all'ordine in cui il record è stato pubblicato nel flusso.

I record di flusso sono organizzati in gruppi o *shard*. Ogni shard funge da container per più record di flusso e contiene le informazioni necessarie per l'accesso e l'iterazione attraverso i record. I record di flusso all'interno di uno shard vengono automaticamente rimossi dopo 24 ore.

Gli shard sono effimeri, ovvero vengono creati ed eliminati automaticamente in base alle necessità. Qualsiasi shard può anche essere suddiviso in più nuovi shard e anche questa operazione viene eseguita automaticamente. È anche possibile che per uno shard padre esista un solo shard figlio. Uno shard può essere suddiviso in risposta a livelli elevati di attività di scrittura nella tabella padre corrispondente, in modo che le applicazioni possano elaborare record da più shard in parallelo.

Se disabiliti un flusso, tutti gli shard aperti verranno chiusi. I dati nel flusso continueranno a essere leggibili per 24 ore.

Poiché gli shard hanno una derivazione (padre e figlio), un'applicazione deve sempre elaborare uno shard padre prima dello shard figlio. In questo modo, anche l'elaborazione dei record di flusso avviene sicuramente in base all'ordine corretto. Se si utilizza DynamoDB Streams Kinesis Adapter, questa operazione viene gestita automaticamente. L'applicazione elabora le partizioni e i record di flusso nell'ordine corretto e gestisce automaticamente partizioni nuove o scadute, nonché le partizioni che vengono suddivise durante l'esecuzione dell'applicazione. Per ulteriori informazioni, consultare[Utilizzo dell'adattatore DynamoDB Streams Kinesis per elaborare i record di flusso](Streams.KCLAdapter.md).

Il diagramma seguente mostra la relazione tra un flusso, gli shard nel flusso e i record di flusso negli shard.

![\[Struttura dei flussi DynamoDB. I record dei flussi che rappresentano le modifiche dei dati sono organizzati in shard.\]](http://docs.aws.amazon.com/it_it/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**Nota**  
Se si esegue un'operazione `PutItem` o `UpdateItem` che non modifica alcun dato in un elemento, DynamoDB Streams *non* scrive un record di flusso per tale operazione.

Per accedere a un flusso ed elaborare i record di flusso al suo interno, devi eseguire queste operazioni:
+ Determinare l'ARN univoco del flusso a cui vuoi accedere.
+ Determinare quali shard nel flusso contengono i record di flusso che ti interessano.
+ Accedi alle partizioni e recupera i record di flusso desiderati.

**Nota**  
Non più di due processi devono leggere dalla stessa partizione di flussi contemporaneamente. La presenza di più di due lettori per shard può causare il throttling.

L'API DynamoDB Streams fornisce le operazioni seguenti, che possono essere usate dai programmi applicativi:
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)`: restituisce un elenco dei descrittori di flusso per il conto corrente e l'endpoint. Puoi facoltativamente richiedere solo i descrittori di flusso per un nome di tabella specifico.
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)`: restituisce informazioni su un flusso, inclusi lo stato attuale del flusso, il suo nome della risorsa Amazon (ARN), la composizione dei suoi shard e la corrispondente tabella su DynamoDB. Facoltativamente, è possibile utilizzare il campo `ShardFilter` per recuperare lo shard secondario esistente associato allo shard principale.
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)`: restituisce un *iteratore di partizioni*, che descrive una posizione all'interno di una partizione. Puoi richiedere che l'iterazione fornisca accesso al punto meno recente, al punto più recente o a un punto particolare nel flusso.
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)`: restituisce i record di flusso in una determinata partizione. Devi specificare l'iterazione shard restituita da una richiesta `GetShardIterator`.

Per la descrizione completa di queste operazioni API, incluse le richieste e le risposte di esempio, consulta la [Documentazione di riferimento delle API di Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html).

### Individuazione di shard
<a name="Streams.ShardDiscovery"></a>



Come individuare nuovi shard in un flusso DynamoDB con due potenti metodi. Gli utenti dei flussi Amazon DynamoDB hanno a disposizione due modi efficaci per tracciare e identificare nuovi shard:

**Polling della topologia dell’intero flusso**  
Utilizzo dell’API `DescribeStream` per eseguire regolarmente il polling del flusso. Questa operazione restituisce tutti gli shard del flusso, inclusi tutti i nuovi shard creati. Confrontando i risultati nel tempo è possibile rilevare i nuovi shard aggiunti.

**Individuazione di shard secondari**  
Usa l’API `DescribeStream`con il parametro `ShardFilter` per trovare un sottoinsieme di shard. Specificando uno shard principale nella richiesta, i flussi DynamoDB restituiranno i relativi shard secondari immediati. Questo approccio è utile quando è necessario solo tracciare la derivazione degli shard senza scansionare l’intero flusso.   
Le applicazioni che utilizzano dati dai flussi DynamoDB possono passare in modo efficiente dalla lettura di uno shard chiuso al relativo shard secondario utilizzando questo parametro `ShardFilter`, evitando chiamate ripetute all’API `DescribeStream` per recuperare e attraversare la mappa degli shard per tutti gli shard chiusi e aperti. Questo aiuta a individuare rapidamente gli shard secondari dopo la chiusura di uno shard principale, rendendo le applicazioni di elaborazione dei flussi più reattive e convenienti.

Entrambi i metodi consentono di rimanere aggiornati sulla struttura in evoluzione dei propri flussi DynamoDB, assicurandosi di non perdere mai aggiornamenti cruciali dei dati o modifiche agli shard.

### Limite di conservazione dei dati per DynamoDB Streams
<a name="Streams.DataRetention"></a>

Tutti i dati in DynamoDB Streams hanno una durata di 24 ore. Puoi recuperare e analizzare le ultime 24 ore di attività per qualsiasi tabella specifica. Tuttavia, i dati più vecchi di 24 ore sono soggetti a taglio (rimozione) in qualsiasi momento.

Se disabiliti un flusso in una tabella, i dati nel flusso restano leggibili per 24 ore. Dopo questo intervallo di tempo, i dati scadono e i record di flusso vengono automaticamente eliminati. Non esiste un meccanismo per eliminare manualmente un flusso esistente. Devi attendere fino allo scadere del limite di conservazione (24 ore) perché tutti i record di flusso vengano eliminati.

# DynamoDB Streams e Time to Live
<a name="time-to-live-ttl-streams"></a>

È possibile eseguire il backup o elaborare gli elementi che sono stati eliminati da [Time to Live](TTL.md) (TTL, Time to Live) abilitando Amazon DynamoDB Streams sulla tabella ed elaborando i record di flusso degli elementi scaduti. Per ulteriori informazioni, consulta [Lettura ed elaborazione di un flusso](Streams.md#Streams.Processing).

Il record Streams contiene un campo di identità utente `Records[<index>].userIdentity`.

Gli elementi eliminati dal processo Time to Live (TTL, Time to Live) dopo la scadenza hanno i seguenti campi:
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**Nota**  
Quando si utilizza il TTL in una tabella globale, il campo `userIdentity` verrà impostato nella Regione in cui è stato eseguito il TTL. Questo campo non verrà impostato in altre Regioni quando l’eliminazione viene replicata.

Il JSON seguente mostra la porzione rilevante di un singolo record Streams.

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## Utilizzo di DynamoDB Streams e Lambda per archiviare gli elementi eliminati TTL
<a name="streams-archive-ttl-deleted-items"></a>

La combinazione di [DynamoDB Time to Live (TTL)](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html), [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) e [AWS Lambda](https://aws.amazon.com/lambda/) può contribuire a semplificare l'archiviazione dei dati, ridurre i costi di storage DynamoDB e ridurre la complessità del codice. L'utilizzo di Lambda come consumatore di flussi offre molti vantaggi, in particolare la riduzione dei costi rispetto ad altri consumatori come Kinesis Client Library (KCL). Non ti viene addebitato alcun costo per le chiamate API `GetRecords` sul flusso DynamoDB quando si utilizza Lambda per consumare eventi e Lambda può fornire il filtraggio degli eventi identificando i modelli JSON in un evento stream. Attraverso il filtraggio dei contenuti del modello di eventi, è possibile definire fino a cinque filtri diversi per controllare quali eventi vengono inviati a Lambda per l'elaborazione. Ciò aiuta a ridurre le invocazioni delle funzioni Lambda, semplifica il codice e riduce i costi complessivi.

Sebbene DynamoDB Streams contenga tutte le modifiche ai dati, ad esempio `Create`, `Modify` e le azioni `Remove`, questo può causare invocazioni indesiderate della funzione Lambda di archivio. Ad esempio, supponiamo di avere una tabella con 2 milioni di modifiche ai dati all'ora che fluiscono nello stream, ma meno del 5% di queste sono eliminazioni di elementi che scadranno durante il processo TTL e devono essere archiviate. Con [Filtri di origine evento Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), la funzione Lambda richiamerà solo 100.000 volte all'ora. Il risultato con il filtraggio degli eventi è che ti vengono addebitati solo le invocazioni necessarie anziché i 2 milioni di invocazioni che avresti avuto senza filtraggio degli eventi.

Il filtraggio degli eventi viene applicato alla [mappatura di origine evento Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html), una risorsa che legge da un evento scelto, ovvero il flusso DynamoDB, e invoca una funzione Lambda. Nel diagramma seguente, puoi vedere come un elemento eliminato Time to Live viene consumato da una funzione Lambda utilizzando flussi e filtri eventi.

![\[Un elemento eliminato tramite il processo TTL avvia una funzione Lambda che utilizza flussi e filtri di eventi.\]](http://docs.aws.amazon.com/it_it/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### Modello di filtro evento DynamoDB Time to Live
<a name="ttl-event-filter-pattern"></a>

L'aggiunta del seguente JSON ai [criteri di filtro](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html) della mappatura dell'origine eventi consente l'invocazione della funzione Lambda solo per gli elementi eliminati dal TTL:

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### AWS Lambda Crea una mappatura delle sorgenti degli eventi
<a name="create-event-source-mapping"></a>

Utilizza i seguenti frammenti di codice per creare una mappatura dell'origine eventi filtrata che è possibile connettere al flusso DynamoDB di una tabella. Ciascun blocco di codice include il modello di filtro eventi.

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# Utilizzo dell'adattatore DynamoDB Streams Kinesis per elaborare i record di flusso
<a name="Streams.KCLAdapter"></a>

L'utilizzo di Amazon Kinesis Adapter è il modo consigliato per utilizzare flussi da Amazon DynamoDB. L’API dei flussi DynamoDB è progettata in maniera simile a quella del flusso di dati Kinesis. In entrambi i servizi, i flussi di dati sono composti da partizioni, che sono container per i record di flusso. Entrambi i servizi APIs contengono`ListStreams`, `DescribeStream``GetShards`, e `GetShardIterator` operazioni. Sebbene queste operazioni DynamoDB Streams siano simili alle loro controparti in Kinesis Data Streams, non sono identiche al 100%.

Come utente di DynamoDB Streams, è possibile utilizzare i modelli di progettazione presenti all'interno di KCL per elaborare partizioni e record di flusso DynamoDB Streams. Per fare ciò, si utilizza l'adattatore Kinesis DynamoDB Streams. L'adattatore Kinesis implementa l'interfaccia Kinesis Data Streams in modo che si possa utilizzare KCL per l'uso e l'elaborazione di record da DynamoDB Streams. [Per istruzioni su come configurare e installare il DynamoDB Streams Kinesis Adapter, consulta il repository. GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)

È possibile scrivere applicazioni per Kinesis Data Streams utilizzando la libreria client Kinesis (Kinesis Client Library, KCL). KCL semplifica la codifica fornendo astrazioni utili sull'API Kinesis Data Streams di basso livello. Per ulteriori informazioni su KCL, consulta [Sviluppo di consumatori utilizzando la libreria client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*.

DynamoDB consiglia di utilizzare la versione 3.x di KCL con SDK AWS for Java v2.x. [L'attuale versione 1.x di DynamoDB Streams Kinesis Adapter con AWS SDK per Java SDK per la versione AWS 1.x continuerà a essere pienamente supportata per tutto il suo ciclo di vita, come previsto durante il periodo di transizione in linea con la politica di manutenzione degli strumenti.AWS SDKs ](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)

**Nota**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. end-of-supportKCL 1.x sarà disponibile il 30 gennaio 2026. Si consiglia vivamente di migrare le applicazioni KCL che utilizzano la versione 1.x all’ultima versione KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client](https://github.com/awslabs/amazon-kinesis-client) Library su. GitHub Per informazioni sulle versioni più recenti di Kinesis Client Library, consulta [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Per ulteriori informazioni sulla migrazione da KCL 1.x a 3.x, consulta Migrating from KCL 1.x to KCL 3.x.

Nel seguente diagramma viene illustrato come queste librerie interagiscono tra loro.

![\[Interazione tra flussi DynamoDB, flusso di dati Kinesis e KCL per l’elaborazione dei record dei flussi DynamoDB.\]](http://docs.aws.amazon.com/it_it/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Con l'adattatore Kinesis DynamoDB Streams abilitato, è possibile iniziare a sviluppare rispetto a KCL, con le chiamate API dirette senza soluzione di continuità all'endpoint DynamoDB Streams.

Quando si avvia l'applicazione , quest'ultima richiama la libreria KCL per creare un'istanza di un worker. È necessario fornire al lavoratore le informazioni di configurazione per l'applicazione, come il descrittore di flusso e AWS le credenziali, e il nome di una classe di processore di record fornita. Durante l'esecuzione del codice nel processore di record, il worker completa le seguenti attività:
+ Si collega al flusso
+ Enumera le partizioni all'interno del flusso
+ Controlla ed enumera gli shard secondari di uno shard principale chiuso all’interno del flusso
+ Coordina le associazioni di shard con altri processi di lavoro (se presenti)
+ Crea istanze di un elaboratore di record per ogni shard che gestisce
+ Estrae i record di dati dal flusso
+ Scala la velocità di chiamata delle GetRecords API durante un throughput elevato (se è configurata la modalità catch-up)
+ Inserisce i record nell'elaboratore di record corrispondente
+ Controlla i record elaborati
+ Bilancia le associazioni tra shard e processi di lavoro quando il conteggio delle istanze del lavoro cambia
+ Bilancia le associazioni tra partizioni e worker quando le partizioni sono suddivise

L'adattatore KCL supporta la modalità catch-up, una funzione di regolazione automatica della frequenza di chiamata per gestire aumenti temporanei della velocità di trasmissione. Quando il ritardo di elaborazione del flusso supera una soglia configurabile (impostazione predefinita: un minuto), la modalità catch-up ridimensiona la frequenza di chiamata dell' GetRecords API in base a un valore configurabile (impostazione predefinita 3x) per recuperare i record più velocemente, quindi torna alla normalità una volta che il ritardo diminuisce. Ciò è utile durante i periodi ad alto throughput in cui l'attività di scrittura di DynamoDB può sopraffare i consumatori utilizzando i tassi di polling predefiniti. La modalità Catch-up può essere abilitata tramite il parametro di configurazione (default false). `catchupEnabled`

**Nota**  
Per una descrizione dei concetti su KCL elencati qui, consulta [Sviluppo di consumatori utilizzando la libreria client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*.  
Per ulteriori informazioni sull'utilizzo degli stream, vedere AWS Lambda [Streams e trigger DynamoDB AWS Lambda](Streams.Lambda.md)

# Migrazione di KLC da 1.x a 3.x
<a name="streams-migrating-kcl"></a>

## Panoramica di
<a name="migrating-kcl-overview"></a>

Questa guida fornisce istruzioni per la migrazione dell’applicazione consumer da KCL 1.x a KCL 3.x. A causa delle differenze di architettura tra KCL 1.x e KCL 3.x, la migrazione richiede l’aggiornamento di diversi componenti per garantire la compatibilità.

KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 3.x. È necessario prima migrare l’elaboratore di record, il generatore dell’elaboratore di record e le classi di lavoratori al formato compatibile con KCL 3.x e seguire la procedura di migrazione per la migrazione da KCL 1.x a KCL 3.x.

## Fasi della migrazione
<a name="migration-steps"></a>

**Topics**
+ [Fase 1: migrare l’elaboratore di record](#step1-record-processor)
+ [Fase 2: migrare il generatore dell’elaboratore di record](#step2-record-processor-factory)
+ [Fase 3: eseguire la migrazione del lavoratore](#step3-worker-migration)
+ [Fase 4: panoramica e consigli sulla configurazione di KCL 3.x](#step4-configuration-migration)
+ [Fase 5: migrare da KCL 2.x a KCL 3.x](#step5-kcl2-to-kcl3)

### Fase 1: migrare l’elaboratore di record
<a name="step1-record-processor"></a>

L’esempio seguente mostra un elaboratore di record implementato per l’Adattatore Kinesis per i flussi DynamoDB con KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Per migrare la classe RecordProcessor**

1. Modifica le interfacce da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` e `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` verso `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` come segue:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Aggiorna le istruzioni di importazione per i metodi `initialize` e `processRecords`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Sostituisci il metodo `shutdownRequested` con i seguenti nuovi metodi: `leaseLost`, `shardEnded` e `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Segue la versione aggiornata della classe dell’elaboratore di record:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**Nota**  
L'adattatore Kinesis di DynamoDB Streams ora utilizza il modello Record. SDKv2 In SDKv2, `AttributeValue` gli oggetti complessi (,`BS`,, `NS` `M``L`,`SS`) non restituiscono mai null. Utilizza i metodi `hasBs()`, `hasNs()`, `hasM()`, `hasL()`, `hasSs()` per verificare se questi valori esistono.

### Fase 2: migrare il generatore dell’elaboratore di record
<a name="step2-record-processor-factory"></a>

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è illustrato un esempio di un generatore di KCL 1.x:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Per migrare `RecordProcessorFactory`**
+ Modifica l’interfaccia implementata da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, come segue:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Di seguito è riportato un esempio di generatore di elaboratore di record in 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Fase 3: eseguire la migrazione del lavoratore
<a name="step3-worker-migration"></a>

Nella versione 3.0 della KCL, una nuova classe, denominata **Pianificatore**, sostituisce la classe **Lavoratore**. Di seguito è illustrato un esempio di un lavoratore di KCL 1.x:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Per migrare il lavoratore**

1. Modifica la dichiarazione `import` per la classe `Worker` nelle dichiarazioni di importazione delle classi `Scheduler` e `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importa `StreamTracker` e modifica l’importazione di `StreamsWorkerFactory` in `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Seleziona la posizione da cui avviare l’applicazione. Può essere `TRIM_HORIZON` o `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Creazione di un’istanza `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Crea l’oggetto `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Crea l’oggetto `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Crea `Scheduler` con `ConfigsBuilder` come mostrato nell’esempio seguente:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Importante**  
L’impostazione `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` mantiene la compatibilità tra l’Adattatore Kinesis per i flussi DynamoDB per KCL v3 e KCL v1, non tra KCL v2 e v3.

### Fase 4: panoramica e consigli sulla configurazione di KCL 3.x
<a name="step4-configuration-migration"></a>

Per una descrizione dettagliata delle configurazioni introdotte dopo KCL 1.x e rilevanti in KCL 3.x, consulta [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) and [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Importante**  
Invece di creare direttamente oggetti di `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` e `retrievalConfig`, si consiglia di utilizzare `ConfigsBuilder` per impostare le configurazioni in KCL 3.x e versioni successive per evitare problemi di inizializzazione del Pianificatore. `ConfigsBuilder` offre un modo più flessibile e gestibile per configurare l’applicazione KCL.

#### Configurazioni con aggiornamento del valore predefinito in KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Nella versione 1.x di KCL, il valore predefinito per `billingMode` è impostato su `PROVISIONED`. Tuttavia, con la versione 3.x di KCL, l’impostazione predefinita `billingMode` è `PAY_PER_REQUEST` (modalità on demand). Si consiglia di utilizzare la modalità con capacità on demand per la tabella di lease per regolare automaticamente la capacità in base all’utilizzo. Per indicazioni sull’utilizzo della capacità allocata per le tabelle di lease, consulta [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
Nella versione 1.x di KCL, il valore predefinito per `idleTimeBetweenReadsInMillis` è impostato su 1.000 (o 1 secondo). La versione 3.x di KCL imposta il valore predefinito per i`dleTimeBetweenReadsInMillis` su 1.500 (o 1,5 secondi), ma l’Adattatore Kinesis per i flussi Amazon DynamoDB sostituisce il valore predefinito su 1.000 (o 1 secondo).

#### Nuove configurazioni in KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Questa configurazione definisce l’intervallo di tempo prima che gli shard di nuovo riscontro inizino l’elaborazione e viene calcolata come 1,5 × `leaseAssignmentIntervalMillis`. Se questa impostazione non è configurata in modo esplicito, l’intervallo di tempo predefinito è 1,5 × `failoverTimeMillis`. L’elaborazione di nuovi shard prevede la scansione della tabella di lease e l’interrogazione di un indice secondario globale (GSI) sulla tabella di lease. La riduzione di `leaseAssignmentIntervalMillis` aumenta la frequenza delle operazioni Scan e Query, con conseguente aumento dei costi di DynamoDB. Si consiglia di impostare questo valore su 2000 (o 2 secondi) per ridurre al minimo il ritardo nell’elaborazione di nuovi shard.

`shardConsumerDispatchPollIntervalMillis`  
Questa configurazione definisce l’intervallo tra polling successivi da parte del consumer dello shard per attivare le transizioni di stato. Nella versione 1.x di KCL, questo comportamento era controllato dal parametro `idleTimeInMillis`, che non era esposto come impostazione configurabile. Con la versione 3.x di KCL, si consiglia di impostare questa configurazione in modo che corrisponda al valore utilizzato per ` idleTimeInMillis` nella configurazione della versione 1.x di KCL.

### Fase 5: migrare da KCL 2.x a KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Per garantire una transizione e una compatibilità fluide con l’ultima versione di Kinesis Client Library (KCL), segui le fasi 5 – 8 nelle istruzioni della guida alla migrazione per l’[aggiornamento da KCL 2.x a KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Per la risoluzione dei problemi più comuni di KCL 3.x, consulta [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Rollback a una versione di KCL precedente
<a name="kcl-migration-rollback"></a>

Questo argomento illustra come eseguire il rollback di un’applicazione consumer alla versione di KCL precedente. Il processo di rollback consiste in due fasi:

1. Esegui lo [Strumento di migrazione di KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Reimplementa il codice della versione precedente di KCL.

## Fase 1: eseguire lo Strumento di migrazione di KCL
<a name="kcl-migration-rollback-step1"></a>

Quando è necessario tornare alla versione precedente di KCL, occorre eseguire lo Strumento di migrazione KCL. Lo strumento svolge due attività importanti:
+ Rimuove una tabella di metadati denominata tabella delle metriche dei lavoratori e l’indice secondario globale nella tabella di lease in DynamoDB. Questi artefatti sono creati da KCL 3.x ma non sono necessari al ritorno alla versione precedente.
+ Consente a tutti i lavoratori di funzionare in una modalità compatibile con KCL 1.x e di iniziare a utilizzare l’algoritmo di bilanciamento del carico utilizzato nelle versioni precedenti di KCL. In caso di problemi con il nuovo algoritmo di bilanciamento del carico in KCL 3.x, questo ridurrà immediatamente il problema.

**Importante**  
La tabella dello stato del coordinatore in DynamoDB deve esistere e non deve essere eliminata durante il processo di migrazione, rollback e rollforward.

**Nota**  
È importante che tutti i lavoratori dell’applicazione consumer utilizzino lo stesso algoritmo di bilanciamento del carico in un determinato momento. Lo Strumento di migrazione di KCL assicura che tutti i lavoratori dell’applicazione consumer KCL 3.x passino alla modalità compatibile con KCL 1.x in modo che tutti i lavoratori eseguano lo stesso algoritmo di bilanciamento del carico durante il rollback dell’applicazione alla versione precedente di KCL.

[Puoi scaricare lo [strumento di migrazione KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) nella directory degli script del repository KCL. GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master) Esegui lo script da un lavoratore o da un host con le autorizzazioni appropriate per scrivere nella tabella dello stato del coordinatore, nella tabella delle metriche dei lavoratori e nella tabella di lease. Assicurati che per le applicazioni consumer KCL siano configurate le [autorizzazioni IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) appropriate. Esegui lo script solo una volta per applicazione KCL utilizzando il comando specificato:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Sostituisci con il tuo. *region* Regione AWS

`--application_name`  
Questo parametro è obbligatorio in caso di utilizzo di nomi predefiniti per le tabelle dei metadati di DynamoDB (tabella di lease, tabella dello stato del coordinatore e tabella delle metriche dei lavoratori). Se sono stati specificati nomi personalizzati per queste tabelle, è possibile omettere questo parametro. Sostituisci *applicationName* con il nome effettivo dell'applicazione KCL. Lo strumento utilizza questo nome per ricavare i nomi delle tabelle predefiniti se non vengono forniti nomi personalizzati.

`--lease_table_name`  
Questo parametro è necessario se si imposta un nome personalizzato per la tabella di lease nella configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. Sostituisci *leaseTableName* con il nome della tabella personalizzata che hai specificato per la tabella di leasing.

`--coordinator_state_table_name`  
Questo parametro è necessario se si imposta un nome personalizzato per la tabella dello stato del coordinatore nella configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. *coordinatorStateTableName*Sostituiscilo con il nome della tabella personalizzata che hai specificato per la tabella degli stati del coordinatore.

`--worker_metrics_table_name`  
Questo parametro è necessario se si imposta un nome personalizzato per la tabella delle metriche dei lavoratori nella configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. *workerMetricsTableName*Sostituiscilo con il nome della tabella personalizzata che hai specificato per la tabella delle metriche dei lavoratori.

## Fase 2: reimplementare il codice con la versione precedente di KCL
<a name="kcl-migration-rollback-step2"></a>

**Importante**  
Qualsiasi menzione della versione 2.x nell’output generato dallo Strumento di migrazione di KCL deve essere interpretata come riferita alla versione 1.x di KCL. L’esecuzione dello script non esegue un rollback completo, ma passa solo l’algoritmo di bilanciamento del carico a quello utilizzato nella versione 1.x di KCL.

Dopo aver eseguito lo strumento di migrazione di KCL per un rollback, verrà mostrato uno di questi messaggi:

Messaggio 1  
“Rollback completato. L’applicazione eseguiva funzionalità della versione 2x compatibili. Torna ai file binari dell’applicazione precedente implementando il codice con la versione precedente di KCL.”  
**Azione richiesta:** significa che i lavoratori erano in esecuzione nella modalità compatibile con KCL 1.x. Reimplementa il codice con la versione precedente di KCL sui lavoratori.

Messaggio 2  
“Rollback completato. L’applicazione KCL eseguiva funzionalità della versione 3 e verrà ripristinata a funzionalità compatibili con la versione 2x. Se non è possibile visualizzare alcuna mitigazione dopo un breve periodo di tempo, esegui il rollback ai file binari dell’applicazione precedente implementando il codice con la versione KCL precedente.”  
**Azione richiesta:** significa che i lavoratori erano in esecuzione in modalità KCL 3.x e lo Strumento di migrazione di KCL ha portato tutti i lavoratori alla modalità compatibile con KCL 1.x. Reimplementa il codice con la versione precedente di KCL sui lavoratori.

Messaggio 3  
“Il rollback dell’applicazione è già stato effettuato. Tutte KCLv3 le risorse che potevano essere eliminate sono state ripulite per evitare addebiti fino a quando non è stato possibile avviare l'applicazione con la migrazione».  
**Azione richiesta:** significa che i lavoratori erano già stati sottoposti a rollback per essere eseguiti nella modalità compatibile con KCL 1.x. Reimplementa il codice con la versione precedente di KCL sui lavoratori.

# Rollforward a KCL 3.x dopo un rollback
<a name="kcl-migration-rollforward"></a>

Questo argomento illustra come eseguire il rollforward di un’applicazione consumer a KCL 3.x dopo un rollback. Quando è necessario eseguire il rollforward, occorre completare una procedura in due fasi:

1. Esegui lo [Strumento di migrazione di KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Implementa il codice con KCL 3.x.

## Fase 1: eseguire lo Strumento di migrazione di KCL
<a name="kcl-migration-rollforward-step1"></a>

Esegui lo Strumento di migrazione di KCL con il seguente comando per eseguire il rollforward a KCL 3.x:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Sostituisci *region* con il tuo Regione AWS.

`--application_name`  
Questo parametro è obbligatorio se utilizzi i nomi predefiniti per la tabella dello stato del coordinatore. Se sono stati specificati nomi personalizzati per la tabella dello stato del coordinatore, è possibile omettere questo parametro. Sostituisci *applicationName* con il nome effettivo dell'applicazione KCL. Lo strumento utilizza questo nome per ricavare i nomi delle tabelle predefiniti se non vengono forniti nomi personalizzati.

`--coordinator_state_table_name`  
Questo parametro è necessario se si imposta un nome personalizzato per la tabella dello stato del coordinatore nella configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. Sostituisci *coordinatorStateTableName* con il nome della tabella personalizzata che hai specificato per la tabella degli stati del coordinatore.

Dopo aver eseguito lo strumento di migrazione in modalità rollforward, KCL crea le seguenti risorse DynamoDB necessarie per KCL 3.x:
+ Un indice secondario globale nella tabella di lease
+ Una tabella delle metriche dei lavoratori

## Fase 2: implementare il codice con KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Dopo aver eseguito lo Strumento di migrazione di KCL per un rollforward, implementa il codice con KCL 3.x sui lavoratori. Per completare la migrazione, consulta [Fase 8: completare la migrazione](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams
<a name="Streams.KCLAdapter.Walkthrough"></a>

In questa sezione viene riportata una spiegazione passo per passo di un'applicazione Java che utilizza Amazon Kinesis Client Library e l'adattatore Amazon DynamoDB Streams Kinesis. L'applicazione mostra un esempio di replica dei dati, in cui l'attività di scrittura da una tabella viene applicata a una seconda tabella e i contenuti di entrambe le tabelle rimangono sincronizzati. Per il codice sorgente, consulta [Programma completo: Adattatore Kinesis di DynamoDB Streams](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Il programma effettua le seguenti operazioni:

1. Crea due tabelle DynamoDB denominate `KCL-Demo-src` e `KCL-Demo-dst`. Su ognuna di queste tabelle è abilitato un flusso.

1. Genera l'attività di aggiornamento nella tabella di origine aggiungendo, aggiornando ed eliminando gli elementi. Questo fa sì che i dati vengano scritti nel flusso della tabella.

1. Legge i record dal flusso, li ricostruisce come richieste DynamoDB e applica le richieste alla tabella di destinazione.

1. Esegue la scansione delle tabelle di origine e di destinazione per garantire che i contenuti siano identici.

1. Esegue la pulizia eliminando le tabelle.

Queste fasi sono descritte nelle sezioni seguenti e l'applicazione completa viene mostrata alla fine della procedura guidata.

**Topics**
+ [Fase 1: creazione di tabelle DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Fase 2: generazione dell'attività di aggiornamento nella tabella di origine](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Fase 3: elaborazione del flusso](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Fase 4: verifica che entrambe le tabelle abbiano contenuti identici](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Fase 5: rimozione](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programma completo: Adattatore Kinesis di DynamoDB Streams](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Fase 1: creazione di tabelle DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

Il primo passo consiste nel creare due tabelle DynamoDB, una di origine e una di destinazione. `StreamViewType` sul flusso della tabella di origine è `NEW_IMAGE`. Questo significa che ogni volta che un item viene modificato in questa tabella, l'immagine "successiva" dell'item viene scritta nel flusso. In questo modo, il flusso tiene traccia di tutte le attività di scrittura della tabella.

Il seguente esempio mostra il codice utilizzato per creare entrambe le tabelle.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Fase 2: generazione dell'attività di aggiornamento nella tabella di origine
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

La fase successiva consiste nel generare le attività di scrittura sulla tabella di origine. Mentre questa attività è in corso, il flusso della tabella di origine viene aggiornato pressoché in tempo reale.

L'applicazione definisce una classe helper con metodi che chiamano le operazioni API `PutItem`, `UpdateItem` e `DeleteItem` per scrivere i dati. Il seguente esempio di codice mostra come vengono utilizzati questi metodi.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Fase 3: elaborazione del flusso
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Ora il programma inizia l'elaborazione del flusso. L'adattatore Kinesis di DynamoDB Streams agisce come un livello trasparente tra KCL e l'endpoint DynamoDB Streams in modo che il codice possa utilizzare appieno KCL piuttosto che effettuare chiamate a DynamoDB Streams di basso livello. Il programma esegue le attività di seguito elencate:
+ Definisce una classe di elaboratore di record, `StreamsRecordProcessor`, con metodi conformi alla definizione dell'interfaccia KCL: `initialize`, `processRecords` e `shutdown`. Il metodo `processRecords` contiene la logica necessaria per la lettura dal flusso della tabella di origine e la scrittura nella tabella di destinazione.
+ Definisce una factory di classe per la classe di elaboratore di record (`StreamsRecordProcessorFactory`). Ciò è richiesto per i programmi Java che utilizzano KCL.
+ Crea un'istanza di un nuovo `Worker` KCL che è associato alla factory di classe.
+ Arresta il `Worker` quando l'elaborazione del record è completata.

Facoltativamente, abilita la modalità catch-up nella configurazione dell'adattatore Streams KCL per scalare automaticamente la velocità di chiamata GetRecords API di 3 volte (impostazione predefinita) quando il ritardo di elaborazione del flusso supera un minuto (impostazione predefinita), aiutando l'utente di streaming a gestire picchi di throughput elevati nella tabella.

Per ulteriori informazioni sulla definizione dell'interfaccia KCL, consulta [Sviluppo di consumatori utilizzando la Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*. 

Il seguente esempio di codice mostra il loop principale in `StreamsRecordProcessor`. L'istruzione `case` determina quale operazione eseguire, sulla base dell'item `OperationType` presente nel record del flusso.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Fase 4: verifica che entrambe le tabelle abbiano contenuti identici
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

A questo punto, i contenuti delle tabelle di origine e destinazione sono sincronizzati. L'applicazione emette le richieste `Scan` su entrambe le tabelle per verificare che i loro contenuti siano effettivamente identici.

La classe `DemoHelper` contiene un metodo `ScanTable` che chiama l'API `Scan` di basso livello. L'esempio seguente mostra come viene utilizzato.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Fase 5: rimozione
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

La demo è completata, quindi l'applicazione elimina le tabelle di origine e di destinazione. Vedere l'esempio di codice seguente. Anche dopo l'eliminazione delle tabelle, i flussi rimangono disponibili per altre 24 ore, dopo di che vengono automaticamente eliminati.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Programma completo: Adattatore Kinesis di DynamoDB Streams
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Di seguito è riportato il programma Java completo che esegue le attività descritte in [Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams](Streams.KCLAdapter.Walkthrough.md). Quando lo esegui, dovresti vedere un output simile al seguente.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**Importante**  
 Per eseguire questo programma, assicurati che l'applicazione client abbia accesso a DynamoDB e CloudWatch Amazon utilizzando le policy. Per ulteriori informazioni, consulta [Policy basate su identità per DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

Il codice sorgente è composto da quattro `.java` file. Per creare questo programma, aggiungi la seguente dipendenza, che include Amazon Kinesis Client Library (KCL) 3.x e SDK AWS for Java v2 come dipendenze transitive:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

I file sorgente sono:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# API di basso livello DynamoDB Streams: esempio Java
<a name="Streams.LowLevel.Walkthrough"></a>

**Nota**  
Il codice in questa pagina non è completo e non prevede la gestione di tutti gli scenari di utilizzo di Amazon DynamoDB Streams. Il modo consigliato di utilizzare record di flusso da DynamoDB è tramite l'adattatore Amazon Kinesis utilizzando la Kinesis Client Library (KCL), come descritto in [Utilizzo dell'adattatore DynamoDB Streams Kinesis per elaborare i record di flusso](Streams.KCLAdapter.md).

Questa sezione contiene un programma Java che mostra il funzionamento di DynamoDB Streams. Il programma effettua le seguenti operazioni:

1. Crea una tabella DynamoDB con un flusso abilitato.

1. Descrive le impostazioni del flusso per questa tabella.

1. Modifica i dati nella tabella.

1. Descrive gli shard nel flusso.

1. Legge i record del flusso dagli shard.

1. Recupera gli shard secondari e continua a leggere i record.

1. Elimina.

Quando esegui il programma, vedrai un output simile al seguente:

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example Esempio**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# Streams e trigger DynamoDB AWS Lambda
<a name="Streams.Lambda"></a>

Amazon DynamoDB è integrato AWS Lambda in modo da poter *creare* trigger, parti di codice che rispondono automaticamente agli eventi in DynamoDB Streams. Con i trigger è possibile creare applicazioni che rispondono alle modifiche di dati nelle tabelle DynamoDB.

**Topics**
+ [Tutorial \$11: Utilizzo dei filtri per elaborare tutti gli eventi con Amazon DynamoDB e utilizzo di AWS Lambda AWS CLI](Streams.Lambda.Tutorial.md)
+ [Tutorial n. 2: utilizzo di filtri per elaborare alcuni eventi con DynamoDB e Lambda](Streams.Lambda.Tutorial2.md)
+ [Best practice per l’utilizzo dei flussi DynamoDB con Lambda](Streams.Lambda.BestPracticesWithDynamoDB.md)

Se abiliti DynamoDB Streams su una tabella, puoi associare lo stream Amazon Resource Name (ARN) a una funzione che scrivi. AWS Lambda Tutte le operazioni di mutazione su quella tabella DynamoDB possono quindi essere acquisite come elemento nel flusso. Ad esempio, è possibile impostare un trigger in modo che quando un elemento in una tabella viene modificato, nel flusso della tabella venga immediatamente visualizzato un nuovo record. 

**Nota**  
In caso di sottoscrizione di più di due funzioni Lambda in un flusso DynamoDB, potrebbe verificarsi una limitazione (della larghezza di banda della rete) della lettura.

Il servizio [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) esegue il polling del flusso alla ricerca di nuovi record quattro volte al secondo. Quando sono disponibili nuovi record di flusso, la funzione Lambda viene richiamata in modo sincrono. Puoi sottoscrivere fino a due funzioni Lambda allo stesso flusso DynamoDB. In caso di sottoscrizione di più di due funzioni Lambda nello stesso flusso DynamoDB, potrebbe verificarsi una limitazione (della larghezza di banda della rete) della lettura.

La funzione Lambda può inviare una notifica, avviare un flusso di lavoro o eseguire numerose altre operazioni specificate. Ad esempio, è possibile scrivere una funzione Lambda semplicemente per copiare ogni record di flusso in un'archiviazione persistente, come il Gateway di file di Amazon S3 (Amazon S3), per creare un percorso di verifica permanente dell'attività di scrittura della tabella. Oppure, supponi di avere un'applicazione di gioco per dispositivi mobili che scrive in una tabella `GameScores`. Quando l'attributo `TopScore` della tabella `GameScores` viene aggiornato, viene scritto un record di flusso corrispondente nel flusso della tabella. Questo evento potrebbe quindi attivare una funzione Lambda che pubblica un messaggio di congratulazioni su un social network. È anche possibile scrivere questa funzione per ignorare tutti i record di flusso che non sono aggiornamenti di `GameScores` o che non modificano l'attributo `TopScore`.

Se la funzione restituisce un errore, Lambda ritenta il batch fino a quando l'elaborazione non riesce o i dati scadono. È inoltre possibile configurare Lambda in modo da riprovare con un batch di dimensioni inferiori, limitare il numero di tentativi, eliminare i record una volta che diventano troppo vecchi e altre opzioni.

Come best practice in materia di prestazioni, la funzione Lambda deve essere di breve durata. Per evitare di introdurre ritardi di elaborazione non necessari, inoltre, non dovrebbe eseguire una logica complessa. In particolare, per un flusso a velocità elevata, è meglio attivare flussi di lavoro Step Function di post-elaborazione asincrona rispetto a funzioni Lambda sincrone a lunga durata.

 Puoi utilizzare i trigger Lambda su diversi AWS account configurando una policy basata sulle risorse sul flusso DynamoDB per concedere l'accesso in lettura tra account diversi alla funzione Lambda. Per ulteriori informazioni su come configurare lo stream per consentire l'accesso tra account diversi, consulta [Condividi l'accesso con le funzioni AWS Lambda tra account nella](rbac-cross-account-access.md#shared-access-cross-acount-lambda) DynamoDB Developer Guide.

[Per ulteriori informazioni, consulta la Guida per gli AWS Lambda sviluppatori.AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/)

# Tutorial \$11: Utilizzo dei filtri per elaborare tutti gli eventi con Amazon DynamoDB e utilizzo di AWS Lambda AWS CLI
<a name="Streams.Lambda.Tutorial"></a>

 

In questo tutorial, creerai un AWS Lambda trigger per elaborare un flusso da una tabella DynamoDB.

**Topics**
+ [Fase 1: creazione di una tabella DynamoDB con un flusso abilitato](#Streams.Lambda.Tutorial.CreateTable)
+ [Fase 2: creazione di un ruolo di esecuzione Lambda](#Streams.Lambda.Tutorial.CreateRole)
+ [Fase 3: creazione di un argomento Amazon SNS](#Streams.Lambda.Tutorial.SNSTopic)
+ [Fase 4: creazione e test di una funzione Lambda](#Streams.Lambda.Tutorial.LambdaFunction)
+ [Fase 5: creazione e test di un trigger](#Streams.Lambda.Tutorial.CreateTrigger)

Lo scenario di questo tutorial è Woofer, un semplice social network. Gli utenti di Woofer comunicano tramite i *bark*, brevi messaggi di testo che vengono inviati ad altri utenti di Woofer. Il seguente diagramma illustra i componenti e il flusso di lavoro di questa applicazione.

![\[Flusso di lavoro dell’applicazione Woofer di una tabella DynamoDB, record dei flussi, funzione Lambda e argomento Amazon SNS.\]](http://docs.aws.amazon.com/it_it/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. Un utente scrive un elemento in una tabella DynamoDB (`BarkTable`). Ogni item della tabella rappresenta un bark.

1. Viene scritto un nuovo record di flusso per riflettere l'aggiunta di un nuovo item a `BarkTable`.

1. Il nuovo record di flusso attiva una AWS Lambda funzione (). `publishNewBark`

1. Se il record di flusso indica che è stato aggiunto un nuovo elemento a `BarkTable`, la funzione Lambda legge i dati dal record di flusso e pubblica un messaggio in un argomento di Amazon Simple Notification Service (Amazon SNS).

1. Questo messaggio è ricevuto dai sottoscrittori dell'argomento Amazon SNS. (In questo tutorial, l'unico sottoscrittore è un indirizzo e-mail).

**Prima di iniziare**  
Questo tutorial utilizza il. AWS Command Line Interface AWS CLI Se non è stato ancora fatto, seguire le istruzioni contenute nella [Guida per l'utente di AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/) per installare e configurare la AWS CLI.

## Fase 1: creazione di una tabella DynamoDB con un flusso abilitato
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

In questa fase, viene creata crea una tabella DynamoDB (`BarkTable`) per memorizzare tutti i bark degli utenti di Woofer. La chiave primaria è costituita da `Username` (chiave di partizione) e `Timestamp` (chiave di ordinamento). Entrambi questi attributi sono di tipo stringa.

In `BarkTable` è abilitato un flusso. Più avanti in questo tutorial, crei un trigger associando una AWS Lambda funzione allo stream.

1. Immetti il seguente comando per creare la tabella.

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. Nell'output, cerca `LatestStreamArn`.

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   Prendi nota dei valori `region` e `accountID`, in quanto sono necessari nelle altre fasi di questo tutorial.

## Fase 2: creazione di un ruolo di esecuzione Lambda
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

In questo passaggio, crei un ruolo AWS Identity and Access Management (IAM) (`WooferLambdaRole`) e gli assegni le autorizzazioni. Questo ruolo viene utilizzato dalla funzione Lambda creata in [Fase 4: creazione e test di una funzione Lambda](#Streams.Lambda.Tutorial.LambdaFunction). 

Puoi creare anche una policy per il ruolo. La policy conterrà tutte le autorizzazioni necessarie alla funzione Lambda nella fase di runtime.

1. Crea un file denominato `trust-relationship.json` con i seguenti contenuti.

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. Immetti il seguente comando per creare `WooferLambdaRole`.

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. Crea un file denominato `role-policy.json` con i seguenti contenuti. (Sostituisci `region` e inserisci `accountID` la tua AWS regione e l'ID dell'account.)

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   La policy include quattro dichiarazioni che consentono a `WooferLambdaRole` di eseguire le seguenti operazioni:
   + Eseguire una funzione Lambda (`publishNewBark`). Questa funzione viene creata in una fase successiva di questo tutorial.
   + Accedi ad Amazon CloudWatch Logs. La funzione Lambda scrive la diagnostica nei CloudWatch registri in fase di esecuzione.
   + Leggere i dati dal flusso di DynamoDB per `BarkTable`.
   + Pubblicare i messaggi su Amazon SNS.

1. Immetti il seguente comando per collegare la policy a `WooferLambdaRole`.

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## Fase 3: creazione di un argomento Amazon SNS
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

In questa fase, viene creato un argomento Amazon SNS (`wooferTopic`) e viene registrato un indirizzo e-mail su di esso. La funzione Lambda utilizza questo argomento per pubblicare nuovi bark degli utenti di Woofer.

1. Immettere il seguente comando per creare un nuovo argomento Amazon SNS.

   ```
   aws sns create-topic --name wooferTopic
   ```

1. Immetti il comando seguente per sottoscrivere un indirizzo e-mail a `wooferTopic`. Sostituisci `region` e `accountID` con la regione e l'ID account AWS e sostituisci `example@example.com` con un indirizzo e-mail valido.

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS invia un messaggio di conferma al tuo indirizzo e-mail. Scegli il link **Confirm subscription (Conferma sottoscrizione)** per completare la procedura di sottoscrizione.

## Fase 4: creazione e test di una funzione Lambda
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

In questo passaggio, crei una AWS Lambda funzione (`publishNewBark`) da cui elaborare i record di flusso. `BarkTable`

La funzione `publishNewBark` elabora solo gli eventi di flusso che corrispondono a nuovi item in `BarkTable`. La funzione legge i dati di questo evento, quindi richiama Amazon SNS perché li pubblichi.

1. Crea un file denominato `publishNewBark.js` con i seguenti contenuti. Sostituisci `region` e `accountID` con la tua AWS regione e l'ID dell'account.

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. Crea un file zip che contenga `publishNewBark.js`. Per fare ciò, se disponi di una utility a riga di comando zip, puoi immettere il comando seguente.

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. Quando si crea la funzione Lambda, si specifica l'Amazon Resource Name (ARN) per `WooferLambdaRole` che hai creato in [Fase 2: creazione di un ruolo di esecuzione Lambda](#Streams.Lambda.Tutorial.CreateRole). Immetti il comando seguente per recuperare questo ARN.

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   Nell'output, cerca l'ARN di `WooferLambdaRole`.

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   Immetti il seguente comando per creare la funzione Lambda. Sostituire *roleARN* con l'ARN per. `WooferLambdaRole`

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. Ora esegui il test di `publishNewBark` per verificare che funziona. Per fare ciò, fornire un input simile a un record reale da DynamoDB Streams.

   Crea un file denominato `payload.json` con i seguenti contenuti. Sostituisci `region` e `accountID` con la Regione AWS e l’ID account.

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   Immetti il comando seguente per eseguire il test della funzione `publishNewBark`.

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   Se il test viene superato, viene visualizzato il seguente output.

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   Inoltre, il file `output.txt` conterrà il testo seguente.

   ```
   "Successfully processed 1 records."
   ```

   Entro pochi minuti riceverai anche un nuovo messaggio e-mail.
**Nota**  
AWS Lambda scrive informazioni diagnostiche su Amazon CloudWatch Logs. Se si verificano errori nella funzione Lambda, è possibile utilizzare queste informazioni diagnostiche per la risoluzione dei problemi:  
Apri la CloudWatch console all'indirizzo [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
Nel riquadro di navigazione scegli **Logs (Log)**.
Scegli il seguente gruppo di log: `/aws/lambda/publishNewBark`
Scegli il flusso di log più recente per visualizzare l'output e gli errori della funzione.

## Fase 5: creazione e test di un trigger
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

In [Fase 4: creazione e test di una funzione Lambda](#Streams.Lambda.Tutorial.LambdaFunction), è stato eseguito il test della funzione Lambda per verificarne la corretta esecuzione. In questa fase, è possibile creare un *trigger* associando la funzione Lambda (`publishNewBark`) a un'origine eventi (il flusso `BarkTable`).

1. Quando crei il trigger, è necessario specificare l'ARN del flusso `BarkTable`. Immetti il comando seguente per recuperare questo ARN.

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   Nell'output, cerca `LatestStreamArn`.

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. Immetti il comando seguente per creare il trigger. Sostituisci `streamARN` con l'ARN del flusso effettivo.

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. Esegui il test del trigger. Immetti il comando seguente per aggiungere un elemento a `BarkTable`.

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   Dovresti ricevere un nuovo messaggio e-mail entro pochi minuti.

1. Aprire la console DynamoDB e aggiungere altri elementi a `BarkTable`. È necessario specificare i valori degli attributi `Username` e `Timestamp`. (Sebbene non sia obbligatorio, si dovrebbe inoltre specificare un valore per `Message`) Dovresti ricevere un nuovo messaggio e-mail per ogni item aggiunto a `BarkTable`.

   La funzione Lambda elabora solo i nuovi elementi aggiunti a `BarkTable`. Se aggiorni o elimini un item della tabella, la funzione non esegue alcuna azione.

**Nota**  
AWS Lambda scrive informazioni diagnostiche su Amazon CloudWatch Logs. Se si verificano errori nella funzione Lambda, è possibile utilizzare queste informazioni diagnostiche per la risoluzione dei problemi:  
Apri la CloudWatch console all'indirizzo [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
Nel riquadro di navigazione scegli **Logs (Log)**.
Scegli il seguente gruppo di log: `/aws/lambda/publishNewBark`
Scegli il flusso di log più recente per visualizzare l'output e gli errori della funzione.

# Tutorial n. 2: utilizzo di filtri per elaborare alcuni eventi con DynamoDB e Lambda
<a name="Streams.Lambda.Tutorial2"></a>

In questo tutorial, creerai un AWS Lambda trigger per elaborare solo alcuni eventi in un flusso da una tabella DynamoDB.

**Topics**
+ [Mettere tutto insieme - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [Mettere tutto insieme - CDK](#Streams.Lambda.Tutorial2.CDK)

Con il [filtro eventi Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) è possibile utilizzare espressioni di filtro per controllare quali eventi Lambda invia alla funzione per l'elaborazione. Puoi configurare fino a 5 diversi filtri per i flussi DynamoDB. Se si utilizzano finestre di batch, Lambda applica i criteri di filtro a ogni nuovo evento per stabilire se aggiungerlo al batch corrente.

I filtri vengono applicati tramite strutture chiamate `FilterCriteria`. I 3 attributi principali di `FilterCriteria` sono `metadata properties`, `data properties` e `filter patterns`. 

Ecco una struttura di esempio di un evento di flussi DynamoDB:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` sono i campi dell'oggetto evento. Nel caso di flussi DynamoDB, `metadata properties` sono campi come `dynamodb` o `eventName`. 

`data properties` sono i campi del corpo dell'evento. Per filtrare su `data properties`, bisogna assicurarsi di contenerli in `FilterCriteria` all'interno della chiave appropriata. Per le origini eventi Dynamo DB, la chiave dati è `NewImage` o `OldImage`.

Infine, le regole di filtro definiranno l'espressione del filtro che si desidera applicare a una proprietà specifica. Ecco alcuni esempi:


| Operatore di confronto | Esempio | Sintassi delle regole (parziale) | 
| --- | --- | --- | 
|  Null  |  Il tipo di prodotto è null  |  `{ "product_type": { "S": null } } `  | 
|  Empty  |  Il nome del prodotto è vuoto  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Equals  |  Lo stato equivale a Florida  |  `{ "state": { "S": ["FL"] } } `  | 
|  And  |  Lo stato del prodotto equivale a Florida e la categoria di prodotto è Chocolate  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  Or  |  Lo stato del prodotto è Florida o California  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  Lo stato del prodotto non è Florida  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Exists  |  Esiste il prodotto artigianale  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  Does not exist  |  Il prodotto "homemade" non esiste  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Begins with  |  PK inizia con COMPANY  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Per una funzione Lambda è possibile specificare fino a 5 modelli di filtro eventi. Si noti che ognuno di questi 5 eventi verrà valutato come un OR logico. Quindi, se configuri due filtri denominati `Filter_One` e `Filter_Two`, la funzione Lambda eseguirà `Filter_One` OR `Filter_Two`.

**Nota**  
Nella pagina di [filtraggio degli eventi Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) ci sono alcune opzioni per filtrare e confrontare valori numerici, tuttavia nel caso degli eventi di filtro DynamoDB ciò non si applica perché i numeri in DynamoDB vengono memorizzati come stringhe. Ad esempio ` "quantity": { "N": "50" }`, sappiamo che è un numero a causa della proprietà `"N"`.

## Mettere tutto insieme - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

Per mostrare in pratica la funzionalità di filtraggio degli eventi, ecco un CloudFormation modello di esempio. Questo modello genererà una tabella DynamoDB semplice con una chiave di partizione PK e una chiave di ordinamento SK con flussi Amazon DynamoDB abilitati. Creerà una funzione Lambda e un semplice ruolo di esecuzione Lambda che consentirà di scrivere registri su Amazon Cloudwatch e leggere gli eventi dai flussi Amazon DynamoDB. Aggiungerà anche la mappatura dell'origine eventi tra flussi DynamoDB e la funzione Lambda, in modo che la funzione possa essere eseguita ogni volta che c'è un evento nei flussi Amazon DynamoDB.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

Dopo aver distribuito questo modello di CloudFormation, puoi inserire il seguente elemento Amazon DynamoDB:

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Grazie alla semplice funzione lambda inclusa in linea in questo modello di formazione cloud, vedrai gli eventi nei gruppi di CloudWatch log di Amazon per la funzione lambda come segue:

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**Esempi di filtri**
+ **Solo prodotti che corrispondono a un determinato stato**

Questo esempio modifica il CloudFormation modello per includere un filtro per abbinare tutti i prodotti provenienti dalla Florida, con l'abbreviazione «FL».

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Dopo aver ridistribuito lo stack, puoi aggiungere il seguente elemento DynamoDB alla tabella. Si noti che non verrà visualizzato nei registri delle funzioni Lambda, poiché il prodotto in questo esempio proviene dalla California.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **Solo gli elementi che iniziano con alcuni valori in PK e SK**

Questo esempio modifica il CloudFormation modello per includere la seguente condizione:

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Si noti che la condizione AND richiede che la condizione sia all'interno del modello, dove le chiavi PK e SK sono nella stessa espressione separate da una virgola.

O inizia con alcuni valori su PK e SK o proviene da un determinato stato.

Questo esempio modifica il CloudFormation modello per includere le seguenti condizioni:

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Si noti che la condizione OR viene aggiunta introducendo nuovi modelli nella sezione del filtro.

## Mettere tutto insieme - CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

Il seguente modello di formazione del progetto CDK di esempio illustra la funzionalità di filtro degli eventi. Prima di lavorare con questo progetto CDK è necessario [installare i prerequisiti](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html), inclusa l'[esecuzione degli script di preparazione](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html).

**Creazione di un progetto CDK**

Per prima cosa crea un nuovo AWS CDK progetto, invocandolo `cdk init` in una directory vuota.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

Il comando `cdk init` utilizza il nome della cartella del progetto per denominare vari elementi del progetto, tra cui classi, sottocartelle e file. Tutti i trattini nel nome della cartella vengono convertiti in caratteri di sottolineatura. Altrimenti il nome dovrebbe seguire il formato di un identificatore Python. Ad esempio, non dovrebbe iniziare con un numero o contenere spazi.

Per lavorare con il nuovo progetto, attivare il suo ambiente virtuale. Ciò consente di installare le dipendenze del progetto localmente nella cartella del progetto, anziché globalmente.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**Nota**  
Potresti riconoscerlo come il Mac/Linux comando per attivare un ambiente virtuale. I modelli Python includono un file batch, `source.bat`, che consente di utilizzare lo stesso comando su Windows. Funziona anche il comando Windows tradizionale `.venv\Scripts\activate.bat`. Se hai inizializzato il tuo AWS CDK progetto utilizzando AWS CDK Toolkit v1.70.0 o precedente, il tuo ambiente virtuale si trova invece nella directory. `.env` `.venv` 

**Infrastruttura di base**

Apri il file `./ddb_filters/ddb_filters_stack.py` con l'editor di testo preferito. Questo file è stato generato automaticamente al momento della creazione del AWS CDK progetto. 

Quindi, aggiungi le funzioni `_create_ddb_table` e `_set_ddb_trigger_function`. Queste funzioni creeranno una tabella DynamoDB con chiave di partizione PK e una chiave di ordinamento SK in modalità di assegnazione in modalità on-demand, con flussi Amazon DynamoDB abilitato per impostazione predefinita per mostrare immagini nuove e vecchie.

La funzione Lambda verrà archiviata nella cartella `lambda` nel file `app.py`. Questo file verrà creato in seguito. Comprenderà una variabile di ambiente `APP_TABLE_NAME`, che sarà il nome della tabella Amazon DynamoDB creata da questo stack. Nella stessa funzione concederemo alla funzione Lambda le autorizzazioni di lettura del flusso. Infine, verrà effettuata la sottoscrizione a flussi DynamoDB come origine degli eventi per la funzione Lambda. 

Alla fine del file nel metodo `__init__`, richiamerai i rispettivi costrutti per inizializzarli nello stack. Per progetti più grandi che richiedono componenti e servizi aggiuntivi, potrebbe essere meglio definire questi costrutti al di fuori dello stack di base. 

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

Ora creeremo una funzione lambda molto semplice che stamperà i log in Amazon. CloudWatch Per farlo, crea una nuova cartella denominata `lambda`.

```
mkdir lambda
touch app.py
```

Usando l'editor di testo preferito, aggiungi il seguente contenuto al file `app.py`:

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

Assicurati di essere nella cartella `/ddb_filters/`, digita il seguente comando per creare l'applicazione di esempio:

```
cdk deploy
```

A un certo punto ti verrà chiesto di confermare se desideri implementare la soluzione. Accetta le modifiche digitando `Y`.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

Una volta implementate le modifiche, apri la AWS console e aggiungi un elemento alla tabella. 

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

I CloudWatch log dovrebbero ora contenere tutte le informazioni di questa voce. 

**Esempi di filtri**
+ **Solo prodotti che corrispondono a un determinato stato**

Apri il file `ddb_filters/ddb_filters/ddb_filters_stack.py` e modificalo per includere il filtro che corrisponde a tutti i prodotti equivalenti a "FL". Questo può essere modificato appena sotto `event_subscription` nella riga 45.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **Solo gli elementi che iniziano con alcuni valori in PK e SK**

Modifica lo script Python per includere la seguente condizione:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **O inizia con alcuni valori su PK e SK o proviene da un determinato stato.**

Modifica lo script Python per includere le seguenti condizioni:

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Si noti che la condizione OR viene aggiunta aggiungendo altri elementi all'array Filters.

**Pulizia**

Individua lo stack di filtri nella base della tua directory di lavoro ed esegui `cdk destroy`. Ti verrà chiesto di confermare l'eliminazione della risorsa:

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# Best practice per l’utilizzo dei flussi DynamoDB con Lambda
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

Una AWS Lambda funzione viene eseguita all'interno di un *contenitore*, un ambiente di esecuzione isolato da altre funzioni. Quando esegui una funzione per la prima volta, AWS Lambda crea un nuovo contenitore e inizia a eseguire il codice della funzione.

Una funzione Lambda dispone di un *gestore* che viene eseguito a ogni richiamo. Il gestore contiene la logica di business principale della funzione. Ad esempio, la funzione Lambda illustrata in [Fase 4: creazione e test di una funzione Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) dispone di un gestore in grado di elaborare record in un flusso DynamoDB. 

Puoi anche fornire un codice di inizializzazione che venga eseguito una sola volta, dopo la creazione del contenitore, ma prima che il gestore venga AWS Lambda eseguito per la prima volta. La funzione Lambda mostrata in [Fase 4: creazione e test di una funzione Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) ha un codice di inizializzazione che importa l'SDK per JavaScript Node.js e crea un client per Amazon SNS. Questi oggetti dovrebbero essere definiti una sola volta, esternamente al gestore.

Dopo l'esecuzione della funzione, AWS Lambda potrebbe scegliere di riutilizzare il contenitore per le successive chiamate della funzione. In questo caso, il gestore della funzione potrebbe essere in grado di utilizzare nuovamente le risorse definite nel codice di inizializzazione. (Non puoi controllare per quanto tempo AWS Lambda conserva il container, né se questo verrà nuovamente utilizzato o meno).

Per l' AWS Lambda utilizzo dei trigger DynamoDB, consigliamo quanto segue:
+ AWS i client di servizio devono essere istanziati nel codice di inizializzazione, non nel gestore. Ciò consente di AWS Lambda riutilizzare le connessioni esistenti, per tutta la durata del contenitore.
+ In generale, non è necessario gestire in modo esplicito le connessioni o implementare il pool di connessioni perché lo AWS Lambda gestisce per te.

Un consumer Lambda per un flusso DynamoDB non garantisce una consegna esatta di una sola volta e può portare a duplicati occasionali. Assicurati che il codice della funzione Lambda sia idempotente per evitare che si verifichino problemi imprevisti dovuti all’elaborazione duplicata.

Per ulteriori informazioni, consulta [Best practice per l'utilizzo delle AWS Lambda funzioni](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html) nella *AWS Lambda Developer* Guide.

# Flussi DynamoDB e Apache Flink
<a name="StreamsApacheFlink.xml"></a>

È possibile utilizzare i record dei flussi Amazon DynamoDB con Apache Flink. Con il [Servizio gestito da Amazon per Apache Flink](https://aws.amazon.com/managed-service-apache-flink/) è possibile trasformare e analizzare i dati in streaming in tempo reale con Apache Flink. Apache Flink è un framework di elaborazione di flussi open source per l’elaborazione di dati in tempo reale. Il connettore dei flussi Amazon DynamoDB per Apache Flink semplifica la creazione e la gestione dei carichi di lavoro Apache Flink e consente di integrare le applicazioni con altri Servizi AWS.

Amazon Managed Service for Apache Flink ti aiuta a creare rapidamente applicazioni di elaborazione dei end-to-end flussi per l'analisi dei log, l'analisi dei clickstream, l'Internet of Things (IoT), la tecnologia pubblicitaria, i giochi e altro ancora. I quattro casi d'uso più comuni sono lo streaming extract-transform-load (ETL), le applicazioni basate sugli eventi, l'analisi reattiva in tempo reale e l'interrogazione interattiva dei flussi di dati. Per ulteriori informazioni sulla scrittura in Apache Flink dai flussi Amazon DynamoDB, consulta [Amazon DynamoDB Streams Connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/).