

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utilisation de l’adaptateur DynamoDB Streams Kinesis pour traiter des enregistrements de flux
<a name="Streams.KCLAdapter"></a>

L’utilisation de l’adaptateur Amazon Kinesis est la méthode recommandée pour consommer des flux d’Amazon DynamoDB. L’API DynamoDB Streams est volontairement semblable à celle de Kinesis Data Streams. Dans les deux services, les flux de données sont composés de partitions qui sont des conteneurs pour enregistrements de flux. Les deux services APIs contiennent`ListStreams`, `DescribeStream``GetShards`, et `GetShardIterator` opérations. (Si ces actions de DynamoDB Streams sont similaires à leurs homologues dans Kinesis Data Streams, elles ne sont pas identiques à 100 %.)

En tant qu’utilisateur de DynamoDB Streams, vous pouvez utiliser les modèles de conception figurant dans la KCL pour traiter les partitions et les enregistrements de flux de DynamoDB Streams. Pour ce faire, vous utilisez l’adaptateur Kinesis DynamoDB Streams. L’adaptateur Kinesis implémente l’interface Kinesis Data Streams afin que la KCL puisse être utilisée pour la consommation et le traitement des enregistrements de DynamoDB Streams. [Pour obtenir des instructions sur la configuration et l'installation de l'adaptateur DynamoDB Streams Kinesis, consultez le référentiel. GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)

Vous pouvez écrire des applications pour Kinesis Data Streams à l’aide de la bibliothèque client Kinesis (KCL). La KCL simplifie le codage en fournissant des abstractions utiles par-dessus l’API Kinesis Data Streams de bas niveau. Pour en savoir plus sur la KCL, consultez [Développement d’applications consommateur à l’aide de la bibliothèque client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

DynamoDB recommande d'utiliser la version 3.x de KCL avec le SDK AWS pour Java v2.x. [La version 1.x de l'adaptateur DynamoDB Streams Kinesis actuelle AWS avec SDK AWS SDK pour Java pour v1.x continuera d'être entièrement prise en charge tout au long de son cycle de vie, comme prévu pendant la période de transition, conformément à la politique de maintenance des outils.AWS SDKs ](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)

**Note**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous recommandons vivement de migrer vos applications KCL utilisant la version 1.x vers la version la plus récente de la KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page de la bibliothèque [client Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour en savoir plus sur la bibliothèque client Kinesis, consultez [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Pour en savoir plus sur la migration de la KCL 1.x vers la KCL 3.x, consultez Migration de la KCL 1.x vers la KCL 3.x.

Le diagramme suivant illustre la manière dont ces bibliothèques interagissent entre elles.

![\[Interactions entre DynamoDB Streams, Kinesis Data Streams et la KCL pour le traitement des enregistrements DynamoDB Streams.\]](http://docs.aws.amazon.com/fr_fr/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Avec l’adaptateur Kinesis DynamoDB Streams en place, vous pouvez commencer à développer sur l’interface KCL, avec les appels d’API dirigés de manière transparente vers le point de terminaison DynamoDB Streams.

Lorsque votre application démarre, elle appelle la KCL pour instancier un worker. Vous devez fournir au travailleur les informations de configuration de l'application, telles que le descripteur de flux et les AWS informations d'identification, ainsi que le nom d'une classe de processeur d'enregistrements que vous fournissez. À mesure qu’il exécute le code dans le processeur d’enregistrements, le worker effectue les tâches suivantes :
+ Se connecte au flux
+ Énumère les partitions dans le flux
+ Vérifie et énumère les partitions enfants d’une partition parent fermée dans le flux
+ Coordonne les associations de partition avec les autres travaux (le cas échéant)
+ Instancie un processeur d’enregistrements pour chaque partition qu’il gère
+ Extrait des enregistrements du flux
+ Évalue le taux d'appels d' GetRecords API en cas de débit élevé (si le mode rattrapage est configuré)
+ Pousse les enregistrements sur le processeur d’enregistrements correspondant
+ Contrôle les enregistrements traités
+ Équilibre les associations partition-worker lorsque le nombre d’instances de worker change
+ Équilibre les associations partition-worker quand des partitions sont fractionnées

L'adaptateur KCL prend en charge le mode rattrapage, une fonction de réglage automatique du débit d'appels permettant de gérer les augmentations de débit temporaires. Lorsque le délai de traitement du flux dépasse un seuil configurable (une minute par défaut), le mode rattrapage redimensionne la fréquence des appels d' GetRecords API d'une valeur configurable (3 fois par défaut) pour récupérer les enregistrements plus rapidement, puis revient à la normale une fois le décalage réduit. Cela est utile pendant les périodes de haut débit où l'activité d'écriture DynamoDB peut submerger les consommateurs en utilisant les taux d'interrogation par défaut. Le mode rattrapage peut être activé via le paramètre de `catchupEnabled` configuration (faux par défaut).

**Note**  
Pour une description des concepts de KCL évoqués ici, consultez [Développement d’applications consommateur à l’aide de la bibliothèque client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.  
Pour plus d'informations sur l'utilisation des flux avec, AWS Lambda voir [Streams et déclencheurs DynamoDB AWS Lambda](Streams.Lambda.md)

# Migration de la KCL 1.x vers la KCL 3.x
<a name="streams-migrating-kcl"></a>

## Présentation de
<a name="migrating-kcl-overview"></a>

Ce guide fournit des instructions pour migrer votre application consommateur de la KCL 1.x vers la KCL 3.x. En raison des différences architecturales entre la KCL 1.x et la KCL 3.x, la migration nécessite la mise à jour de plusieurs composants pour garantir la compatibilité.

La KCL 1.x utilise des classes et interfaces différentes rapport à la KCL 3.x. Vous devez d’abord migrer le processeur d’enregistrements, la fabrique de processeurs d’enregistrements et les classes de workers vers le format compatible avec la KCL 3.x, puis suivre les étapes de migration de la KCL 1.x vers la KCL 3.x.

## Étapes de la migration
<a name="migration-steps"></a>

**Topics**
+ [Étape 1 : migrer le processeur d’enregistrements](#step1-record-processor)
+ [Étape 2 : migrer la fabrique de processeurs d’enregistrements](#step2-record-processor-factory)
+ [Étape 3 : migrer le worker](#step3-worker-migration)
+ [Étape 4 : présentation de la configuration de la KCL 3.x et recommandations](#step4-configuration-migration)
+ [Étape 5 : migrer de la KCL 2.x vers la KCL 3.x](#step5-kcl2-to-kcl3)

### Étape 1 : migrer le processeur d’enregistrements
<a name="step1-record-processor"></a>

L’exemple suivant illustre un processeur d’enregistrements implémenté pour l’adaptateur DynamoDB Streams Kinesis version 1.x :

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Pour migrer la RecordProcessor classe**

1. Remplacez les interfaces `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` et `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` par `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` comme suit :

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Mettez à jour les instructions d’importation des méthodes `initialize` et `processRecords` :

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Remplacez la méthode `shutdownRequested` par les nouvelles méthodes suivantes : `leaseLost`, `shardEnded` et `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Voici la version mise à jour de la classe du processeur d’enregistrements :

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**Note**  
L'adaptateur DynamoDB Streams Kinesis utilise désormais le modèle Record. SDKv2 Dans SDKv2, les `AttributeValue` objets complexes (`BS`,,`NS`, `M``L`,`SS`) ne renvoient jamais la valeur nulle. Vérifiez si ces valeurs existent à l’aide des méthodes `hasBs()`, `hasNs()`, `hasM()`, `hasL()` et `hasSs()`.

### Étape 2 : migrer la fabrique de processeurs d’enregistrements
<a name="step2-record-processor-factory"></a>

La fabrique de processeurs d’enregistrements est responsable de la création des processeurs d’enregistrements lorsqu’un bail est acquis. Voici un exemple de fabrique KCL 1.x :

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Pour migrer la `RecordProcessorFactory`**
+ Remplacez l’interface implémentée `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` par `software.amazon.kinesis.processor.ShardRecordProcessorFactory` comme suit :

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Voici un exemple de fabrique de processeurs d’enregistrements dans 3.0 :

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Étape 3 : migrer le worker
<a name="step3-worker-migration"></a>

Dans la version 3.0 de la KCL, une nouvelle classe, appelée **Scheduler**, remplace la classe **Worker**. Voici un exemple de worker KCL 1.x :

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Pour migrer le worker**

1. Modifiez l’instruction `import` de la classe `Worker` pour les instructions d’importation pour les classes `Scheduler` et `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importez `StreamTracker` et remplacez l’importation `StreamsWorkerFactory` par `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Choisissez la position à partir de laquelle vous souhaitez démarrer l’application. Vous avez le choix entre `TRIM_HORIZON` et `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Créez une instance `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Créez l’objet `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Créez l’objet `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Créez le `Scheduler` à l’aide de `ConfigsBuilder`, comme illustré dans l’exemple suivant :

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Important**  
Le paramètre `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` assure la compatibilité entre l’adaptateur DynamoDB Streams Kinesis pour la KCL v3 et la KCL v1, et non entre la KCL v2 et la KCL v3.

### Étape 4 : présentation de la configuration de la KCL 3.x et recommandations
<a name="step4-configuration-migration"></a>

Pour obtenir une description détaillée des configurations introduites après la KCL 1.x qui sont pertinentes dans la KCL 3.x, consultez [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) et [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Important**  
Au lieu de créer directement des objets de `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` et `retrievalConfig`, nous vous recommandons de définir des configurations dans la KCL 3.x et versions ultérieures à l’aide de `ConfigsBuilder`, afin d’éviter les problèmes d’initialisation du Scheduler. `ConfigsBuilder` fournit une méthode plus flexible et plus facile à gérer pour configurer votre application KCL.

#### Configurations avec mise à jour de la valeur par défaut dans la KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Dans la KCL version 1.x, la valeur par défaut de `billingMode` est définie sur `PROVISIONED`. En revanche, avec la KCL version 3.x, le `billingMode` par défaut est `PAY_PER_REQUEST` (mode à la demande). Nous vous recommandons d’utiliser le mode de capacité à la demande pour votre table de baux, afin d’ajuster automatiquement la capacité en fonction de votre utilisation. Pour obtenir des conseils sur l’utilisation de la capacité allouée pour vos tables de baux, consultez [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
Dans la KCL version 1.x, la valeur par défaut de `idleTimeBetweenReadsInMillis` est définie sur 1 000 (soit 1 seconde). La KCL version 3.x définit la valeur par défaut de i`dleTimeBetweenReadsInMillis` sur 1 500 (soit 1,5 seconde), mais l’adaptateur Amazon DynamoDB Streams Kinesis remplace la valeur par défaut par 1 000 (soit 1 seconde).

#### Nouvelles configurations de la KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Cette configuration définit l’intervalle de temps avant que les partitions récemment découvertes ne commencent à être traitées. Elle est calculée comme suit : 1,5 × `leaseAssignmentIntervalMillis`. Si ce paramètre n’est pas explicitement configuré, l’intervalle de temps est défini par défaut sur 1,5 × `failoverTimeMillis`. Le traitement des nouvelles partitions consiste à analyser la table de baux et à interroger un index secondaire global (GSI) de la table de baux. La réduction de `leaseAssignmentIntervalMillis` augmente la fréquence de ces opérations d’analyse et d’interrogation, ce qui entraîne une augmentation des coûts de DynamoDB. Nous vous recommandons de définir cette valeur sur 2 000 (soit 2 secondes) afin de réduire le délai de traitement des nouvelles partitions.

`shardConsumerDispatchPollIntervalMillis`  
Cette configuration définit l’intervalle entre les interrogations successives effectuées par le consommateur de partitions pour déclencher des transitions d’état. Dans la KCL version 1.x, ce comportement était contrôlé par le paramètre `idleTimeInMillis`, qui n’était pas exposé en tant que paramètre configurable. Avec la KCL version 3.x, nous vous recommandons de définir cette configuration de sorte qu’elle corresponde à la valeur utilisée pour ` idleTimeInMillis` dans votre configuration de la KCL version 1.x.

### Étape 5 : migrer de la KCL 2.x vers la KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Pour garantir une transition fluide et une compatibilité avec la version la plus récente de la bibliothèque client Kinesis (KCL), suivez les étapes 5 à 8 des instructions du guide de migration pour la [mise à niveau de la KCL 2.x vers la KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Pour la résolution des problèmes courants liés à la KCL 3.x, consultez [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Restauration par régression de la version précédente de la KCL
<a name="kcl-migration-rollback"></a>

Cette rubrique explique comment restaurer la version précédente de la KCL pour votre application consommateur. Le processus de restauration par régression comprend deux étapes :

1. Exécuter l’[outil de migration de la KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)

1. Redéployer le code de la version précédente de la KCL

## Étape 1 : exécuter l’outil de migration de la KCL
<a name="kcl-migration-rollback-step1"></a>

Si vous avez besoin de restaurer la version précédente de la KCL, vous devez exécuter l’outil de migration de la KCL. L’outil effectue deux tâches importantes :
+ Il supprime une table de métadonnées appelée table des métriques de worker, ainsi qu’un index secondaire global de la table des baux dans DynamoDB. Ces artefacts sont créés par la KCL 3.x, mais ils ne sont pas nécessaires lorsque vous restaurez la version précédente.
+ Ainsi, tous les workers peuvent s’exécuter dans un mode compatible avec la KCL 1.x et commencer à utiliser l’algorithme d’équilibrage de charge utilisé dans les versions précédentes de la KCL. Si vous rencontrez des problèmes avec le nouvel algorithme d’équilibrage de charge dans la KCL 3.x, cela permettra de les résoudre immédiatement.

**Important**  
La table des états de coordinateur de DynamoDB doit exister et ne doit pas être supprimée pendant les processus de migration, de restauration par régression et de restauration par progression.

**Note**  
Il est important que tous les workers de votre application consommateur utilisent le même algorithme d’équilibrage de charge à un moment donné. L’outil de migration de la KCL s’assure que tous les workers de votre application consommateur KCL 3.x basculent vers le mode compatible avec la KCL 1.x, afin qu’ils exécutent le même algorithme d’équilibrage de charge lors de la restauration par régression de l’application vers la version précédente de la KCL.

Vous pouvez télécharger l'[outil de migration KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) dans le répertoire des scripts du référentiel [KCL GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master). Exécutez le script à partir d’un worker ou d’un hôte disposant des autorisations appropriées pour écrire dans la table des états de coordinateur, la table des métriques de worker et la table des baux. Assurez-vous que les [autorisations IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) appropriées sont configurées pour les applications consommateur KCL. Exécutez le script une seule fois par application KCL à l’aide de la commande spécifiée :

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
*region*Remplacez-le par votre Région AWS.

`--application_name`  
Ce paramètre est obligatoire si vous utilisez des noms par défaut pour vos tables de métadonnées DynamoDB (table des baux, table des états de coordinateur et table des métriques de worker). Si vous avez spécifié des noms personnalisés pour ces tables, vous pouvez omettre ce paramètre. *applicationName*Remplacez-le par le nom réel de votre application KCL. L’outil l’utilise pour créer les noms de table par défaut si aucun nom personnalisé n’est fourni.

`--lease_table_name`  
Ce paramètre est nécessaire si vous avez défini un nom personnalisé pour la table des baux dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. *leaseTableName*Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table de location.

`--coordinator_state_table_name`  
Ce paramètre est nécessaire si vous avez défini un nom personnalisé pour la table des états de coordinateur dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. *coordinatorStateTableName*Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table d'état des coordinateurs.

`--worker_metrics_table_name`  
Ce paramètre est nécessaire si vous avez défini un nom personnalisé pour la table des métriques de worker dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. *workerMetricsTableName*Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre tableau des métriques des travailleurs.

## Étape 2 : redéployer le code avec la version précédente de la KCL
<a name="kcl-migration-rollback-step2"></a>

**Important**  
Toute mention de la version 2.x dans la sortie générée par l’outil de migration de la KCL doit être interprétée comme faisant référence à la KCL version 1.x. L’exécution du script n’effectue pas de restauration par régression complète : elle fait uniquement basculer l’algorithme d’équilibrage de charge vers celui utilisé dans la KCL version 1.x.

Après avoir exécuté l’outil de migration de la KCL pour effectuer une restauration par régression, l’un des messages suivants s’affiche :

Message 1  
« Restauration par régression terminée. Votre application exécutait une fonctionnalité compatible avec la version 2x. Veuillez restaurer les fichiers binaires précédents de votre application en déployant le code avec votre version précédente de la KCL. »  
**Action requise :** vos workers s’exécutaient dans le mode compatible avec la KCL 1.x. Redéployez le code avec la version précédente de la KCL sur vos workers.

Message 2  
« Restauration par régression terminée. Votre application KCL exécutait une fonctionnalité compatible avec la version 3x. Une fonctionnalité compatible avec la version 2x a été restaurée par régression. Si vous ne constatez aucune amélioration après un court laps de temps, restaurez les fichiers binaires précédents de votre application en déployant le code avec votre version précédente de la KCL. »  
**Action requise :** vos workers s’exécutaient dans le mode compatible avec la KCL 3.x et l’outil de migration de la KCL a fait basculer tous les workers vers le mode compatible avec la KCL 1.x. Redéployez le code avec la version précédente de la KCL sur vos workers.

Message 3  
« L’application a déjà fait l’objet d’une restauration par régression. Toutes KCLv3 les ressources susceptibles d'être supprimées ont été nettoyées afin d'éviter des frais jusqu'à ce que l'application puisse être reportée avec la migration. »  
**Action requise :** vos workers ont déjà fait l’objet d’une restauration par régression pour s’exécuter dans le mode compatible avec la KCL 1.x. Redéployez le code avec la version précédente de la KCL sur vos workers.

# Restauration par progression de la KCL 3.x après une restauration par régression
<a name="kcl-migration-rollforward"></a>

Cette rubrique explique comment restaurer par progression votre application consommateur vers la KCL 3.x après une restauration par régression. Lorsque vous devez effectuer une restauration par progression, vous devez suivre un processus en deux étapes :

1. Exécuter l’[outil de migration de la KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)

1. Déployer le code avec la KCL 3.x

## Étape 1 : exécuter l’outil de migration de la KCL
<a name="kcl-migration-rollforward-step1"></a>

Exécutez l’outil de migration de la KCL avec la commande suivante pour effectuer une restauration par progression vers la KCL 3.x :

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
*region*Remplacez-le par votre Région AWS.

`--application_name`  
Ce paramètre est obligatoire si vous utilisez des noms par défaut pour votre table des états de coordinateur. Si vous avez spécifié des noms personnalisés pour la table des états de coordinateur, vous pouvez omettre ce paramètre. *applicationName*Remplacez-le par le nom réel de votre application KCL. L’outil l’utilise pour créer les noms de table par défaut si aucun nom personnalisé n’est fourni.

`--coordinator_state_table_name`  
Ce paramètre est nécessaire si vous avez défini un nom personnalisé pour la table des états de coordinateur dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. *coordinatorStateTableName*Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table d'état des coordinateurs.

Une fois que vous avez exécuté l’outil de migration en mode restauration par progression, la KCL crée les ressources DynamoDB suivantes requises pour la KCL 3.x :
+ Un index secondaire global sur la table des baux
+ Une table des métriques de worker

## Étape 2 : déployer le code avec la KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Après avoir exécuté l’outil de migration de la KCL pour une restauration par progression, déployez votre code avec la KCL 3.x sur vos workers. Pour terminer votre migration, consultez [Step 8: Complete the migration](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Démonstration : adaptateur Kinesis DynamoDB Streams
<a name="Streams.KCLAdapter.Walkthrough"></a>

Cette section est une démonstration d’une application Java qui utilise la bibliothèque client Amazon Kinesis et l’adaptateur Amazon DynamoDB Streams. L’application illustre un exemple de la réplication de données, où l’activité d’écriture d’une table est appliquée à une seconde table, avec le contenu des deux tables demeurant synchronisé. Pour le code source, consultez [Programme complet : adaptateur DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Le programme exécute les tâches suivantes :

1. Crée deux tables DynamoDB nommées `KCL-Demo-src` et `KCL-Demo-dst`. Chacune de ces tables dispose d’un flux activé sur elle-même.

1. Génère une activité de mise à jour de la table source en ajoutant, mettant à jour et supprimant des éléments. Cela entraîne l’écriture des données sur le flux de la table.

1. Lit les enregistrements du flux, les reconstruit en tant que demandes DynamoDB, et applique les demandes à la table de destination.

1. Analyse les tables source et de destination afin de s’assurer que leurs contenus sont identiques.

1. Nettoie en supprimant les tables.

Ces étapes sont décrites dans les sections suivantes et l’application complète est illustrée à la fin de la procédure pas à pas.

**Topics**
+ [Étape 1 : créer des tables DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Étape 2 : générer une activité de mise à jour de la table source](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Étape 3 : traiter le flux](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Étape 4 : s’assurer que les deux tables ont un contenu identique](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Étape 5 : nettoyer](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programme complet : adaptateur DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Étape 1 : créer des tables DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

La première étape consiste à créer deux tables DynamoDB : une table source et une table de destination. Le `StreamViewType` sur le flux de la table source est `NEW_IMAGE`. Cela signifie que chaque fois qu’un élément est modifié dans la table, l’image « après » de l’élément est écrite dans le flux. Ainsi, le flux assure le suivi des toute l’activité d’écriture sur la table.

L’extrait de code suivant illustre le code utilisé pour la création des deux tables.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Étape 2 : générer une activité de mise à jour de la table source
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

L’étape suivante consiste à créer une activité d’écriture sur la table source. Tandis que cette activité a lieu, le flux de la table source est aussi mis à jour en quasi-temps réel.

L’application définit une classe d’assistance avec les méthodes qui appellent les actions d’API `PutItem`, `UpdateItem` et `DeleteItem` pour écrire les données. L’extrait de code suivant montre comment ces méthodes sont utilisées.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Étape 3 : traiter le flux
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Maintenant le programme commence à traiter le flux. L’adaptateur DynamoDB Streams Kinesis agit comme une couche transparente entre la KCL et le point de terminaison DynamoDB Streams, afin que le code puisse pleinement exploiter la KCL au lieu de devoir effectuer des appels DynamoDB Streams de bas niveau. Le programme effectue les tâches suivantes :
+ Il définit une classe de processeur d’enregistrements, `StreamsRecordProcessor`, avec des méthodes conformes à la définition de l’interface KCL : `initialize`, `processRecords` et `shutdown`. La méthode `processRecords` contient la logique nécessaire pour lire à partir du flux de la table source et écrire dans la table de destination.
+ Il définit une fabrique de classe pour la classe de processeur d’enregistrements (`StreamsRecordProcessorFactory`). Cette action est obligatoire pour les programmes Java qui utilisent le KCL.
+ Il instancie un nouveau KCL `Worker`, associé à la fabrique de classe.
+ Il arrête le `Worker` lorsque le traitement des enregistrements est terminé.

Activez éventuellement le mode rattrapage dans la configuration de votre adaptateur Streams KCL pour augmenter automatiquement le taux d'appels d' GetRecords API de 3 fois (par défaut) lorsque le délai de traitement des flux dépasse une minute (par défaut), afin d'aider votre consommateur de flux à gérer les pics de débit élevés dans votre table.

Pour en savoir plus sur la définition de l’interface de la KCL, consultez [Développement d’applications consommateur à l’aide de la bibliothèque client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) dans le *Guide du développeur Amazon Kinesis Data Streams*. 

L’extrait de code suivant illustre la boucle principale dans `StreamsRecordProcessor`. L’instruction `case` détermine l’action à exécuter, en fonction de l’`OperationType` qui s’affiche dans l’enregistrement de flux.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Étape 4 : s’assurer que les deux tables ont un contenu identique
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

À ce stade, le contenu des tables source et destination est synchronisé. L’application émet des demandes `Scan` sur les deux tables afin de vérifier que leurs contenus sont, en fait, identiques.

La classe `DemoHelper` contient une méthode `ScanTable` qui appelle l’API `Scan` de bas niveau. L’exemple suivant illustre la marche à suivre.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Étape 5 : nettoyer
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

Comme la démonstration est terminée, l’application supprime les tables source et destination. Consultez l’exemple de code suivant. Même après que les tables sont supprimées, leurs flux demeurent accessibles 24 heures, délai au-delà duquel ils sont automatiquement supprimés.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Programme complet : adaptateur DynamoDB Streams Kinesis
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Voici le programme Java complet qui effectue les tâches décrites dans [Démonstration : adaptateur Kinesis DynamoDB Streams](Streams.KCLAdapter.Walkthrough.md). Lorsque vous l’exécutez, vous devez visualiser une sortie similaire à ce qui suit.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**Important**  
 Pour exécuter ce programme, assurez-vous que l'application cliente a accès à DynamoDB et à CloudWatch Amazon à l'aide de politiques. Pour de plus amples informations, veuillez consulter [Politiques basées sur l’identité pour DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

Le code source se compose de quatre `.java` fichiers. Pour créer ce programme, ajoutez la dépendance suivante, qui inclut la bibliothèque client Amazon Kinesis (KCL) 3.x et le SDK pour AWS Java v2 en tant que dépendances transitives :

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

Les fichiers sources sont les suivants :
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```