

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Utilizzo dell'adattatore DynamoDB Streams Kinesis per elaborare i record di flusso
<a name="Streams.KCLAdapter"></a>

L'utilizzo di Amazon Kinesis Adapter è il modo consigliato per utilizzare flussi da Amazon DynamoDB. L’API dei flussi DynamoDB è progettata in maniera simile a quella del flusso di dati Kinesis. In entrambi i servizi, i flussi di dati sono composti da partizioni, che sono container per i record di flusso. Entrambi i servizi APIs contengono`ListStreams`, `DescribeStream``GetShards`, e `GetShardIterator` operazioni. Sebbene queste operazioni DynamoDB Streams siano simili alle loro controparti in Kinesis Data Streams, non sono identiche al 100%.

Come utente di DynamoDB Streams, è possibile utilizzare i modelli di progettazione presenti all'interno di KCL per elaborare partizioni e record di flusso DynamoDB Streams. Per fare ciò, si utilizza l'adattatore Kinesis DynamoDB Streams. L'adattatore Kinesis implementa l'interfaccia Kinesis Data Streams in modo che si possa utilizzare KCL per l'uso e l'elaborazione di record da DynamoDB Streams. [Per istruzioni su come configurare e installare il DynamoDB Streams Kinesis Adapter, consulta il repository. GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)

È possibile scrivere applicazioni per Kinesis Data Streams utilizzando la libreria client Kinesis (Kinesis Client Library, KCL). KCL semplifica la codifica fornendo astrazioni utili sull'API Kinesis Data Streams di basso livello. Per ulteriori informazioni su KCL, consulta [Sviluppo di consumatori utilizzando la libreria client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*.

DynamoDB consiglia di utilizzare la versione 3.x di KCL con SDK AWS for Java v2.x. [L'attuale versione 1.x di DynamoDB Streams Kinesis Adapter con AWS SDK per Java SDK per la versione AWS 1.x continuerà a essere pienamente supportata per tutto il suo ciclo di vita, come previsto durante il periodo di transizione in linea con la politica di manutenzione degli strumenti.AWS SDKs ](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)

**Nota**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. end-of-supportKCL 1.x sarà disponibile il 30 gennaio 2026. Si consiglia vivamente di migrare le applicazioni KCL che utilizzano la versione 1.x all’ultima versione KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client](https://github.com/awslabs/amazon-kinesis-client) Library su. GitHub Per informazioni sulle versioni più recenti di Kinesis Client Library, consulta [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Per ulteriori informazioni sulla migrazione da KCL 1.x a 3.x, consulta Migrating from KCL 1.x to KCL 3.x.

Nel seguente diagramma viene illustrato come queste librerie interagiscono tra loro.

![\[Interazione tra flussi DynamoDB, flusso di dati Kinesis e KCL per l’elaborazione dei record dei flussi DynamoDB.\]](http://docs.aws.amazon.com/it_it/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Con l'adattatore Kinesis DynamoDB Streams abilitato, è possibile iniziare a sviluppare rispetto a KCL, con le chiamate API dirette senza soluzione di continuità all'endpoint DynamoDB Streams.

Quando si avvia l'applicazione , quest'ultima richiama la libreria KCL per creare un'istanza di un worker. È necessario fornire al lavoratore le informazioni di configurazione per l'applicazione, come il descrittore di flusso e AWS le credenziali, e il nome di una classe di processore di record fornita. Durante l'esecuzione del codice nel processore di record, il worker completa le seguenti attività:
+ Si collega al flusso
+ Enumera le partizioni all'interno del flusso
+ Controlla ed enumera gli shard secondari di uno shard principale chiuso all’interno del flusso
+ Coordina le associazioni di shard con altri processi di lavoro (se presenti)
+ Crea istanze di un elaboratore di record per ogni shard che gestisce
+ Estrae i record di dati dal flusso
+ Scala la velocità di chiamata delle GetRecords API durante un throughput elevato (se è configurata la modalità catch-up)
+ Inserisce i record nell'elaboratore di record corrispondente
+ Controlla i record elaborati
+ Bilancia le associazioni tra shard e processi di lavoro quando il conteggio delle istanze del lavoro cambia
+ Bilancia le associazioni tra partizioni e worker quando le partizioni sono suddivise

L'adattatore KCL supporta la modalità catch-up, una funzione di regolazione automatica della frequenza di chiamata per gestire aumenti temporanei della velocità di trasmissione. Quando il ritardo di elaborazione del flusso supera una soglia configurabile (impostazione predefinita: un minuto), la modalità catch-up ridimensiona la frequenza di chiamata dell' GetRecords API in base a un valore configurabile (impostazione predefinita 3x) per recuperare i record più velocemente, quindi torna alla normalità una volta che il ritardo diminuisce. Ciò è utile durante i periodi ad alto throughput in cui l'attività di scrittura di DynamoDB può sopraffare i consumatori utilizzando i tassi di polling predefiniti. La modalità Catch-up può essere abilitata tramite il parametro di configurazione (default false). `catchupEnabled`

**Nota**  
Per una descrizione dei concetti su KCL elencati qui, consulta [Sviluppo di consumatori utilizzando la libreria client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*.  
Per ulteriori informazioni sull'utilizzo degli stream, vedere AWS Lambda [Streams e trigger DynamoDB AWS Lambda](Streams.Lambda.md)

# Migrazione di KLC da 1.x a 3.x
<a name="streams-migrating-kcl"></a>

## Panoramica di
<a name="migrating-kcl-overview"></a>

Questa guida fornisce istruzioni per la migrazione dell’applicazione consumer da KCL 1.x a KCL 3.x. A causa delle differenze di architettura tra KCL 1.x e KCL 3.x, la migrazione richiede l’aggiornamento di diversi componenti per garantire la compatibilità.

KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 3.x. È necessario prima migrare l’elaboratore di record, il generatore dell’elaboratore di record e le classi di lavoratori al formato compatibile con KCL 3.x e seguire la procedura di migrazione per la migrazione da KCL 1.x a KCL 3.x.

## Fasi della migrazione
<a name="migration-steps"></a>

**Topics**
+ [Fase 1: migrare l’elaboratore di record](#step1-record-processor)
+ [Fase 2: migrare il generatore dell’elaboratore di record](#step2-record-processor-factory)
+ [Fase 3: eseguire la migrazione del lavoratore](#step3-worker-migration)
+ [Fase 4: panoramica e consigli sulla configurazione di KCL 3.x](#step4-configuration-migration)
+ [Fase 5: migrare da KCL 2.x a KCL 3.x](#step5-kcl2-to-kcl3)

### Fase 1: migrare l’elaboratore di record
<a name="step1-record-processor"></a>

L’esempio seguente mostra un elaboratore di record implementato per l’Adattatore Kinesis per i flussi DynamoDB con KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Per migrare la classe RecordProcessor**

1. Modifica le interfacce da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` e `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` verso `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` come segue:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Aggiorna le istruzioni di importazione per i metodi `initialize` e `processRecords`:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Sostituisci il metodo `shutdownRequested` con i seguenti nuovi metodi: `leaseLost`, `shardEnded` e `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Segue la versione aggiornata della classe dell’elaboratore di record:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**Nota**  
L'adattatore Kinesis di DynamoDB Streams ora utilizza il modello Record. SDKv2 In SDKv2, `AttributeValue` gli oggetti complessi (,`BS`,, `NS` `M``L`,`SS`) non restituiscono mai null. Utilizza i metodi `hasBs()`, `hasNs()`, `hasM()`, `hasL()`, `hasSs()` per verificare se questi valori esistono.

### Fase 2: migrare il generatore dell’elaboratore di record
<a name="step2-record-processor-factory"></a>

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è illustrato un esempio di un generatore di KCL 1.x:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Per migrare `RecordProcessorFactory`**
+ Modifica l’interfaccia implementata da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, come segue:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Di seguito è riportato un esempio di generatore di elaboratore di record in 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Fase 3: eseguire la migrazione del lavoratore
<a name="step3-worker-migration"></a>

Nella versione 3.0 della KCL, una nuova classe, denominata **Pianificatore**, sostituisce la classe **Lavoratore**. Di seguito è illustrato un esempio di un lavoratore di KCL 1.x:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Per migrare il lavoratore**

1. Modifica la dichiarazione `import` per la classe `Worker` nelle dichiarazioni di importazione delle classi `Scheduler` e `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importa `StreamTracker` e modifica l’importazione di `StreamsWorkerFactory` in `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Seleziona la posizione da cui avviare l’applicazione. Può essere `TRIM_HORIZON` o `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Creazione di un’istanza `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Crea l’oggetto `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Crea l’oggetto `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Crea `Scheduler` con `ConfigsBuilder` come mostrato nell’esempio seguente:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Importante**  
L’impostazione `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` mantiene la compatibilità tra l’Adattatore Kinesis per i flussi DynamoDB per KCL v3 e KCL v1, non tra KCL v2 e v3.

### Fase 4: panoramica e consigli sulla configurazione di KCL 3.x
<a name="step4-configuration-migration"></a>

Per una descrizione dettagliata delle configurazioni introdotte dopo KCL 1.x e rilevanti in KCL 3.x, consulta [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) and [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Importante**  
Invece di creare direttamente oggetti di `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` e `retrievalConfig`, si consiglia di utilizzare `ConfigsBuilder` per impostare le configurazioni in KCL 3.x e versioni successive per evitare problemi di inizializzazione del Pianificatore. `ConfigsBuilder` offre un modo più flessibile e gestibile per configurare l’applicazione KCL.

#### Configurazioni con aggiornamento del valore predefinito in KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Nella versione 1.x di KCL, il valore predefinito per `billingMode` è impostato su `PROVISIONED`. Tuttavia, con la versione 3.x di KCL, l’impostazione predefinita `billingMode` è `PAY_PER_REQUEST` (modalità on demand). Si consiglia di utilizzare la modalità con capacità on demand per la tabella di lease per regolare automaticamente la capacità in base all’utilizzo. Per indicazioni sull’utilizzo della capacità allocata per le tabelle di lease, consulta [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
Nella versione 1.x di KCL, il valore predefinito per `idleTimeBetweenReadsInMillis` è impostato su 1.000 (o 1 secondo). La versione 3.x di KCL imposta il valore predefinito per i`dleTimeBetweenReadsInMillis` su 1.500 (o 1,5 secondi), ma l’Adattatore Kinesis per i flussi Amazon DynamoDB sostituisce il valore predefinito su 1.000 (o 1 secondo).

#### Nuove configurazioni in KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Questa configurazione definisce l’intervallo di tempo prima che gli shard di nuovo riscontro inizino l’elaborazione e viene calcolata come 1,5 × `leaseAssignmentIntervalMillis`. Se questa impostazione non è configurata in modo esplicito, l’intervallo di tempo predefinito è 1,5 × `failoverTimeMillis`. L’elaborazione di nuovi shard prevede la scansione della tabella di lease e l’interrogazione di un indice secondario globale (GSI) sulla tabella di lease. La riduzione di `leaseAssignmentIntervalMillis` aumenta la frequenza delle operazioni Scan e Query, con conseguente aumento dei costi di DynamoDB. Si consiglia di impostare questo valore su 2000 (o 2 secondi) per ridurre al minimo il ritardo nell’elaborazione di nuovi shard.

`shardConsumerDispatchPollIntervalMillis`  
Questa configurazione definisce l’intervallo tra polling successivi da parte del consumer dello shard per attivare le transizioni di stato. Nella versione 1.x di KCL, questo comportamento era controllato dal parametro `idleTimeInMillis`, che non era esposto come impostazione configurabile. Con la versione 3.x di KCL, si consiglia di impostare questa configurazione in modo che corrisponda al valore utilizzato per ` idleTimeInMillis` nella configurazione della versione 1.x di KCL.

### Fase 5: migrare da KCL 2.x a KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Per garantire una transizione e una compatibilità fluide con l’ultima versione di Kinesis Client Library (KCL), segui le fasi 5 – 8 nelle istruzioni della guida alla migrazione per l’[aggiornamento da KCL 2.x a KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Per la risoluzione dei problemi più comuni di KCL 3.x, consulta [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Rollback a una versione di KCL precedente
<a name="kcl-migration-rollback"></a>

Questo argomento illustra come eseguire il rollback di un’applicazione consumer alla versione di KCL precedente. Il processo di rollback consiste in due fasi:

1. Esegui lo [Strumento di migrazione di KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Reimplementa il codice della versione precedente di KCL.

## Fase 1: eseguire lo Strumento di migrazione di KCL
<a name="kcl-migration-rollback-step1"></a>

Quando è necessario tornare alla versione precedente di KCL, occorre eseguire lo Strumento di migrazione KCL. Lo strumento svolge due attività importanti:
+ Rimuove una tabella di metadati denominata tabella delle metriche dei lavoratori e l’indice secondario globale nella tabella di lease in DynamoDB. Questi artefatti sono creati da KCL 3.x ma non sono necessari al ritorno alla versione precedente.
+ Consente a tutti i lavoratori di funzionare in una modalità compatibile con KCL 1.x e di iniziare a utilizzare l’algoritmo di bilanciamento del carico utilizzato nelle versioni precedenti di KCL. In caso di problemi con il nuovo algoritmo di bilanciamento del carico in KCL 3.x, questo ridurrà immediatamente il problema.

**Importante**  
La tabella dello stato del coordinatore in DynamoDB deve esistere e non deve essere eliminata durante il processo di migrazione, rollback e rollforward.

**Nota**  
È importante che tutti i lavoratori dell’applicazione consumer utilizzino lo stesso algoritmo di bilanciamento del carico in un determinato momento. Lo Strumento di migrazione di KCL assicura che tutti i lavoratori dell’applicazione consumer KCL 3.x passino alla modalità compatibile con KCL 1.x in modo che tutti i lavoratori eseguano lo stesso algoritmo di bilanciamento del carico durante il rollback dell’applicazione alla versione precedente di KCL.

[Puoi scaricare lo [strumento di migrazione KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) nella directory degli script del repository KCL. GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master) Esegui lo script da un lavoratore o da un host con le autorizzazioni appropriate per scrivere nella tabella dello stato del coordinatore, nella tabella delle metriche dei lavoratori e nella tabella di lease. Assicurati che per le applicazioni consumer KCL siano configurate le [autorizzazioni IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) appropriate. Esegui lo script solo una volta per applicazione KCL utilizzando il comando specificato:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Sostituisci con il tuo. *region* Regione AWS

`--application_name`  
Questo parametro è obbligatorio in caso di utilizzo di nomi predefiniti per le tabelle dei metadati di DynamoDB (tabella di lease, tabella dello stato del coordinatore e tabella delle metriche dei lavoratori). Se sono stati specificati nomi personalizzati per queste tabelle, è possibile omettere questo parametro. Sostituisci *applicationName* con il nome effettivo dell'applicazione KCL. Lo strumento utilizza questo nome per ricavare i nomi delle tabelle predefiniti se non vengono forniti nomi personalizzati.

`--lease_table_name`  
Questo parametro è necessario se si imposta un nome personalizzato per la tabella di lease nella configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. Sostituisci *leaseTableName* con il nome della tabella personalizzata che hai specificato per la tabella di leasing.

`--coordinator_state_table_name`  
Questo parametro è necessario se si imposta un nome personalizzato per la tabella dello stato del coordinatore nella configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. *coordinatorStateTableName*Sostituiscilo con il nome della tabella personalizzata che hai specificato per la tabella degli stati del coordinatore.

`--worker_metrics_table_name`  
Questo parametro è necessario se si imposta un nome personalizzato per la tabella delle metriche dei lavoratori nella configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. *workerMetricsTableName*Sostituiscilo con il nome della tabella personalizzata che hai specificato per la tabella delle metriche dei lavoratori.

## Fase 2: reimplementare il codice con la versione precedente di KCL
<a name="kcl-migration-rollback-step2"></a>

**Importante**  
Qualsiasi menzione della versione 2.x nell’output generato dallo Strumento di migrazione di KCL deve essere interpretata come riferita alla versione 1.x di KCL. L’esecuzione dello script non esegue un rollback completo, ma passa solo l’algoritmo di bilanciamento del carico a quello utilizzato nella versione 1.x di KCL.

Dopo aver eseguito lo strumento di migrazione di KCL per un rollback, verrà mostrato uno di questi messaggi:

Messaggio 1  
“Rollback completato. L’applicazione eseguiva funzionalità della versione 2x compatibili. Torna ai file binari dell’applicazione precedente implementando il codice con la versione precedente di KCL.”  
**Azione richiesta:** significa che i lavoratori erano in esecuzione nella modalità compatibile con KCL 1.x. Reimplementa il codice con la versione precedente di KCL sui lavoratori.

Messaggio 2  
“Rollback completato. L’applicazione KCL eseguiva funzionalità della versione 3 e verrà ripristinata a funzionalità compatibili con la versione 2x. Se non è possibile visualizzare alcuna mitigazione dopo un breve periodo di tempo, esegui il rollback ai file binari dell’applicazione precedente implementando il codice con la versione KCL precedente.”  
**Azione richiesta:** significa che i lavoratori erano in esecuzione in modalità KCL 3.x e lo Strumento di migrazione di KCL ha portato tutti i lavoratori alla modalità compatibile con KCL 1.x. Reimplementa il codice con la versione precedente di KCL sui lavoratori.

Messaggio 3  
“Il rollback dell’applicazione è già stato effettuato. Tutte KCLv3 le risorse che potevano essere eliminate sono state ripulite per evitare addebiti fino a quando non è stato possibile avviare l'applicazione con la migrazione».  
**Azione richiesta:** significa che i lavoratori erano già stati sottoposti a rollback per essere eseguiti nella modalità compatibile con KCL 1.x. Reimplementa il codice con la versione precedente di KCL sui lavoratori.

# Rollforward a KCL 3.x dopo un rollback
<a name="kcl-migration-rollforward"></a>

Questo argomento illustra come eseguire il rollforward di un’applicazione consumer a KCL 3.x dopo un rollback. Quando è necessario eseguire il rollforward, occorre completare una procedura in due fasi:

1. Esegui lo [Strumento di migrazione di KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Implementa il codice con KCL 3.x.

## Fase 1: eseguire lo Strumento di migrazione di KCL
<a name="kcl-migration-rollforward-step1"></a>

Esegui lo Strumento di migrazione di KCL con il seguente comando per eseguire il rollforward a KCL 3.x:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Sostituisci *region* con il tuo Regione AWS.

`--application_name`  
Questo parametro è obbligatorio se utilizzi i nomi predefiniti per la tabella dello stato del coordinatore. Se sono stati specificati nomi personalizzati per la tabella dello stato del coordinatore, è possibile omettere questo parametro. Sostituisci *applicationName* con il nome effettivo dell'applicazione KCL. Lo strumento utilizza questo nome per ricavare i nomi delle tabelle predefiniti se non vengono forniti nomi personalizzati.

`--coordinator_state_table_name`  
Questo parametro è necessario se si imposta un nome personalizzato per la tabella dello stato del coordinatore nella configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. Sostituisci *coordinatorStateTableName* con il nome della tabella personalizzata che hai specificato per la tabella degli stati del coordinatore.

Dopo aver eseguito lo strumento di migrazione in modalità rollforward, KCL crea le seguenti risorse DynamoDB necessarie per KCL 3.x:
+ Un indice secondario globale nella tabella di lease
+ Una tabella delle metriche dei lavoratori

## Fase 2: implementare il codice con KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Dopo aver eseguito lo Strumento di migrazione di KCL per un rollforward, implementa il codice con KCL 3.x sui lavoratori. Per completare la migrazione, consulta [Fase 8: completare la migrazione](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams
<a name="Streams.KCLAdapter.Walkthrough"></a>

In questa sezione viene riportata una spiegazione passo per passo di un'applicazione Java che utilizza Amazon Kinesis Client Library e l'adattatore Amazon DynamoDB Streams Kinesis. L'applicazione mostra un esempio di replica dei dati, in cui l'attività di scrittura da una tabella viene applicata a una seconda tabella e i contenuti di entrambe le tabelle rimangono sincronizzati. Per il codice sorgente, consulta [Programma completo: Adattatore Kinesis di DynamoDB Streams](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Il programma effettua le seguenti operazioni:

1. Crea due tabelle DynamoDB denominate `KCL-Demo-src` e `KCL-Demo-dst`. Su ognuna di queste tabelle è abilitato un flusso.

1. Genera l'attività di aggiornamento nella tabella di origine aggiungendo, aggiornando ed eliminando gli elementi. Questo fa sì che i dati vengano scritti nel flusso della tabella.

1. Legge i record dal flusso, li ricostruisce come richieste DynamoDB e applica le richieste alla tabella di destinazione.

1. Esegue la scansione delle tabelle di origine e di destinazione per garantire che i contenuti siano identici.

1. Esegue la pulizia eliminando le tabelle.

Queste fasi sono descritte nelle sezioni seguenti e l'applicazione completa viene mostrata alla fine della procedura guidata.

**Topics**
+ [Fase 1: creazione di tabelle DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Fase 2: generazione dell'attività di aggiornamento nella tabella di origine](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Fase 3: elaborazione del flusso](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Fase 4: verifica che entrambe le tabelle abbiano contenuti identici](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Fase 5: rimozione](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programma completo: Adattatore Kinesis di DynamoDB Streams](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Fase 1: creazione di tabelle DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

Il primo passo consiste nel creare due tabelle DynamoDB, una di origine e una di destinazione. `StreamViewType` sul flusso della tabella di origine è `NEW_IMAGE`. Questo significa che ogni volta che un item viene modificato in questa tabella, l'immagine "successiva" dell'item viene scritta nel flusso. In questo modo, il flusso tiene traccia di tutte le attività di scrittura della tabella.

Il seguente esempio mostra il codice utilizzato per creare entrambe le tabelle.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Fase 2: generazione dell'attività di aggiornamento nella tabella di origine
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

La fase successiva consiste nel generare le attività di scrittura sulla tabella di origine. Mentre questa attività è in corso, il flusso della tabella di origine viene aggiornato pressoché in tempo reale.

L'applicazione definisce una classe helper con metodi che chiamano le operazioni API `PutItem`, `UpdateItem` e `DeleteItem` per scrivere i dati. Il seguente esempio di codice mostra come vengono utilizzati questi metodi.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Fase 3: elaborazione del flusso
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Ora il programma inizia l'elaborazione del flusso. L'adattatore Kinesis di DynamoDB Streams agisce come un livello trasparente tra KCL e l'endpoint DynamoDB Streams in modo che il codice possa utilizzare appieno KCL piuttosto che effettuare chiamate a DynamoDB Streams di basso livello. Il programma esegue le attività di seguito elencate:
+ Definisce una classe di elaboratore di record, `StreamsRecordProcessor`, con metodi conformi alla definizione dell'interfaccia KCL: `initialize`, `processRecords` e `shutdown`. Il metodo `processRecords` contiene la logica necessaria per la lettura dal flusso della tabella di origine e la scrittura nella tabella di destinazione.
+ Definisce una factory di classe per la classe di elaboratore di record (`StreamsRecordProcessorFactory`). Ciò è richiesto per i programmi Java che utilizzano KCL.
+ Crea un'istanza di un nuovo `Worker` KCL che è associato alla factory di classe.
+ Arresta il `Worker` quando l'elaborazione del record è completata.

Facoltativamente, abilita la modalità catch-up nella configurazione dell'adattatore Streams KCL per scalare automaticamente la velocità di chiamata GetRecords API di 3 volte (impostazione predefinita) quando il ritardo di elaborazione del flusso supera un minuto (impostazione predefinita), aiutando l'utente di streaming a gestire picchi di throughput elevati nella tabella.

Per ulteriori informazioni sulla definizione dell'interfaccia KCL, consulta [Sviluppo di consumatori utilizzando la Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) nella *Guida per gli sviluppatori di Amazon Kinesis Data Streams*. 

Il seguente esempio di codice mostra il loop principale in `StreamsRecordProcessor`. L'istruzione `case` determina quale operazione eseguire, sulla base dell'item `OperationType` presente nel record del flusso.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Fase 4: verifica che entrambe le tabelle abbiano contenuti identici
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

A questo punto, i contenuti delle tabelle di origine e destinazione sono sincronizzati. L'applicazione emette le richieste `Scan` su entrambe le tabelle per verificare che i loro contenuti siano effettivamente identici.

La classe `DemoHelper` contiene un metodo `ScanTable` che chiama l'API `Scan` di basso livello. L'esempio seguente mostra come viene utilizzato.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Fase 5: rimozione
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

La demo è completata, quindi l'applicazione elimina le tabelle di origine e di destinazione. Vedere l'esempio di codice seguente. Anche dopo l'eliminazione delle tabelle, i flussi rimangono disponibili per altre 24 ore, dopo di che vengono automaticamente eliminati.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Programma completo: Adattatore Kinesis di DynamoDB Streams
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Di seguito è riportato il programma Java completo che esegue le attività descritte in [Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams](Streams.KCLAdapter.Walkthrough.md). Quando lo esegui, dovresti vedere un output simile al seguente.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**Importante**  
 Per eseguire questo programma, assicurati che l'applicazione client abbia accesso a DynamoDB e CloudWatch Amazon utilizzando le policy. Per ulteriori informazioni, consulta [Policy basate su identità per DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

Il codice sorgente è composto da quattro `.java` file. Per creare questo programma, aggiungi la seguente dipendenza, che include Amazon Kinesis Client Library (KCL) 3.x e SDK AWS for Java v2 come dipendenze transitive:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

I file sorgente sono:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```