

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Modifier la récupération de données pour DynamoDB Streams
<a name="Streams"></a>

 DynamoDB Streams récupère une séquence chronologique des modifications au niveau élément dans toute table DynamoDB et stocke ces informations dans un journal pendant jusqu’à 24 heures. Les applications ont accès à ce journal et affichent les éléments de données à mesure qu’ils s’affichent avant et après qu’ils ont été modifiés, pratiquement en temps réel.

 Le chiffrement au repos chiffre les données dans les flux DynamoDB Streams. Pour en savoir plus, consultez [Chiffrement de DynamoDB au repos](EncryptionAtRest.md).

Un *flux DynamoDB* est un flux ordonné d’informations sur les modifications apportées aux éléments d’une table DynamoDB. Lorsque vous activez un flux sur une table, DynamoDB récupère des informations sur chaque modification apportée à des éléments de données dans la table.

Chaque fois qu’une application crée, met à jour ou supprime des éléments dans la table, DynamoDB Streams écrit un enregistrement de flux avec le ou les attributs de clé primaire des éléments qui ont été modifiés. Un *registre de flux* contient des informations sur une modification de données dans un seul élément d’une table DynamoDB. Vous pouvez configurer le flux de telle sorte que les enregistrements de flux récupèrent des informations supplémentaires, telles que les images « avant » et « après » d’éléments modifiés.

DynamoDB Streams permet de s’assurer de ce qui suit :
+ Chaque enregistrement de flux s’affiche exactement une fois dans le flux.
+ Pour chaque élément modifié dans une table DynamoDB, les enregistrements de flux apparaissent dans l’ordre des modifications réelles.

DynamoDB Streams écrit les enregistrements de flux en quasi-temps réel, afin que vous puissiez créer des applications qui consomment ces flux et entreprennent des actions basées sur le contenu.

**Topics**
+ [Points de terminaison pour DynamoDB Streams](#Streams.Endpoints)
+ [Activation d’un flux](#Streams.Enabling)
+ [Lecture et traitement de flux](#Streams.Processing)
+ [DynamoDB Streams et time-to-live](time-to-live-ttl-streams.md)
+ [Utilisation de l’adaptateur DynamoDB Streams Kinesis pour traiter des enregistrements de flux](Streams.KCLAdapter.md)
+ [API de bas niveau DynamoDB Streams : exemple Java](Streams.LowLevel.Walkthrough.md)
+ [Streams et déclencheurs DynamoDB AWS Lambda](Streams.Lambda.md)
+ [DynamoDB Streams et Apache Flink](StreamsApacheFlink.xml.md)

## Points de terminaison pour DynamoDB Streams
<a name="Streams.Endpoints"></a>

AWS gère des points de terminaison distincts pour DynamoDB et DynamoDB Streams. Pour utiliser des tables et index de base de données, votre application doit accéder à un point de terminaison DynamoDB. Pour lire et traiter des enregistrements DynamoDB Streams, votre application doit pouvoir accéder à un point de terminaison DynamoDB Streams dans la même région.

DynamoDB Streams propose deux ensembles de points de terminaison. Il s'agit des options suivantes :
+ **IPv4-points de terminaison uniquement : points** de terminaison dotés de la `streams.dynamodb.<region>.amazonaws.com` convention de dénomination.
+ **Points de terminaison à double pile** : nouveaux points de terminaison compatibles avec les deux IPv4 IPv6 et conformes à la `streams-dynamodb.<region>.api.aws` convention de dénomination.

**Note**  
Pour accéder à la liste complète des régions et points de terminaison DynamoDB et DynamoDB Streams, consultez [Régions et points de terminaison](https://docs.aws.amazon.com/general/latest/gr/rande.html) dans *Références générales AWS*.

Ils AWS SDKs fournissent des clients distincts pour DynamoDB et DynamoDB Streams. En fonction de vos exigences, votre application peut accéder à un point de terminaison DynamoDB, un point de terminaison DynamoDB Streams, ou aux deux en même temps. Pour vous connecter aux deux points de terminaison, votre application doit instancier deux clients, l’un pour DynamoDB, et l’autre pour DynamoDB Streams.

## Activation d’un flux
<a name="Streams.Enabling"></a>

Vous pouvez activer un flux sur une nouvelle table lorsque vous le créez à l'aide du AWS CLI ou de l'un des AWS SDKs. Vous pouvez également activer ou désactiver un flux sur une table existante, ou modifier les paramètres d’un flux. DynamoDB Streams opérant de manière asynchrone, l’activation d’un flux n’a aucune incidence sur les performances d’une table.

La manière la plus simple de gérer DynamoDB Streams consiste à utiliser l’ AWS Management Console.

1. Connectez-vous à la console DynamoDB AWS Management Console et ouvrez-la à l'adresse. [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. Dans le tableau de bord de la console DynamoDB, choisissez **Tables**, puis sélectionnez une table.

1. Choisissez l’onglet **Exportations et flux**.

1. Dans la section **Détails du flux DynamoDB**, choisissez **Activer**.

1. Sur la page **Activer le flux DynamoDB**, choisissez les informations qui seront écrites dans le flux chaque fois que des données de la table seront modifiées :
   + **Attributs de clés uniquement** – Uniquement les attributs de clé de l’élément modifié.
   + **New image (Nouvelle image)** – L’élément entier, tel qu’il apparaît après sa modification.
   + **Old image (Ancienne image)** – L’élément entier, tel qu’il apparaissait avant sa modification.
   + **New and old images (Nouvelle et ancienne images)** – La nouvelle image et l’ancienne image de l’élément.

   Lorsque les paramètres vous conviennent, choisissez **Activer le streaming**.

1. (Facultatif) Pour désactiver un flux existant, choisissez **Désactiver** sous **Détails du flux DynamoDB**.

Vous pouvez également utiliser les API `CreateTable` ou `UpdateTable` pour activer ou modifier un flux. Le paramètre `StreamSpecification` détermine la façon dont le flux est configuré :
+ `StreamEnabled` – Spécifique si un flux de données est activé (`true`) ou désactivé (`false`) pour la table.
+ `StreamViewType` – Spécifie les informations à écrire dans le flux à chaque modification des données de la table :
  + `KEYS_ONLY` – Uniquement les attributs de clé de l’élément modifié.
  + `NEW_IMAGE` – L’élément entier, tel qu’il apparaît après sa modification.
  + `OLD_IMAGE` – L’élément entier, tel qu’il apparaissait avant sa modification.
  + `NEW_AND_OLD_IMAGES` – La nouvelle image et l’ancienne image de l’élément.

Vous pouvez activer ou désactiver un flux à tout moment. Cependant, vous recevez un `ValidationException` si vous essayez d’activer un flux sur une table qui en possède déjà un. Vous recevez également un `ValidationException` si vous essayez de désactiver un flux sur une table qui n’en possède pas.

Lorsque vous définissez `StreamEnabled` sur `true`, DynamoDB crée un flux avec un descripteur de flux unique qui lui est attribué. Si vous désactivez et puis réactivez un flux sur la table, un flux est créé avec un descripteur de flux différent.

Chaque flux est identifié de manière unique par un Amazon Resource Name (ARN). Voici un exemple d’ARN pour un flux sur une table DynamoDB nommée `TestTable`.

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

Pour déterminer le dernier descripteur de flux pour une table, émettez une demande DynamoDB `DescribeTable`, puis recherchez l’élément `LatestStreamArn` dans la réponse.

**Note**  
Il n’est pas possible de modifier un `StreamViewType` une fois qu’un flux a été configuré. Si vous devez apporter des modifications à un flux après sa configuration, vous devez désactiver le flux actuel et en créer un nouveau.

## Lecture et traitement de flux
<a name="Streams.Processing"></a>

Pour lire et traiter un flux, votre application doit se connecter à un point de terminaison DynamoDB Streams et émettre des demandes d’API.

Un flux se compose d’*enregistrements de flux*. Chaque enregistrement de flux représente une modification de donnée unique dans la table DynamoDB à laquelle le flux appartient. Chaque enregistrement de flux se voit attribuer un numéro de séquence, ce qui reflète l’ordre dans lequel l’enregistrement a été publié dans le flux.

Les enregistrements de flux sont organisés en groupes, ou *partitions*. Chaque partition agit comme un conteneur pour plusieurs enregistrements de flux et contient les informations requises pour accéder à ces enregistrements et les itérer. Les enregistrements de flux au sein d’une partition sont automatiquement supprimés au bout de 24 heures.

Les partitions sont éphémères. Elles sont créées et supprimées automatiquement, en fonction des besoins. Toute partition peut également se diviser en plusieurs partitions nouvelles. Cela se produit également automatiquement. (Notez qu’il est aussi possible pour une partition parent d’avoir une seule partition enfant). Une partition peut se diviser en réponse à des niveaux élevés d’activité d’écriture sur sa table parent, de telle sorte que les applications puissent traiter les enregistrements issus de plusieurs partitions en parallèle.

Si vous désactivez un flux, toute partition ouverte sera fermée. Les données du flux restent lisibles pendant 24 heures.

Comme les partitions ont une lignée (parent et enfants), une application doit toujours traiter une partition parent avant de traiter une partition enfant. Cela garantit que les enregistrements de flux sont également traités dans l’ordre adéquat. (Si vous utilisez l’adaptateur DynamoDB Streams Kinesis, cela est géré automatiquement. Votre application traite les partitions et les enregistrements de flux dans l’ordre correct. Elle gère automatiquement les partitions nouvelles ou ayant expiré, en plus des partitions qui ont été scindées pendant l’exécution de l’application. Pour plus d’informations, consultez [Utilisation de l’adaptateur DynamoDB Streams Kinesis pour traiter des enregistrements de flux](Streams.KCLAdapter.md).)

Le schéma suivant illustre la relation entre un flux de données, les partitions dans le flux et les enregistrements de flux dans les partitions.

![\[Structure de DynamoDB Streams. Les enregistrements de flux qui représentent des modifications de données sont organisés en partitions.\]](http://docs.aws.amazon.com/fr_fr/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**Note**  
Si vous effectuez une opération `PutItem` ou `UpdateItem` qui ne modifie aucune donnée dans un élément, DynamoDB Streams n’écrit *pas* d’enregistrement de flux pour cette opération.

Pour accéder à un flux de données et traiter les enregistrements de flux qu’il contient, vous devez effectuer les opérations suivantes :
+ Déterminer l’Amazon Resource Name (ARN) unique du flux auquel vous souhaitez accéder.
+ Déterminer quelles sont la ou les partitions du flux qui contiennent les enregistrements de flux qui vous intéressent.
+ Accéder aux partitions et récupérer les enregistrements de flux que vous voulez.

**Note**  
Deux processus au maximum doivent lire simultanément à partir de la même partition de flux. Avoir plus de 2 lecteurs par partition peut entraîner une limitation.

L’API DynamoDB Streams fournit les actions suivantes à l’usage des programmes d’application :
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)` – Renvoie la liste des descripteurs de flux pour le compte et le point de terminaison actuels. Vous pouvez en option demander uniquement les descripteurs de flux pour un nom de table particulier.
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)` – Renvoie des informations sur un flux, y compris le statut actuel du flux, son Amazon Resource Name (ARN), la composition de ses partitions et sa table DynamoDB correspondante. Vous pouvez éventuellement récupérer la partition enfant associée à la partition parent à l’aide du champ `ShardFilter`.
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)` – Renvoie un *itérateur de partition* décrivant un emplacement au sein d’une partition. Vous pouvez demander que l’itérateur fournisse un accès au point le plus ancien, au point le plus récent ou à un point particulier dans le flux.
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)` – Renvoie les enregistrements de flux à partir d’une partition donnée. Vous devez fournir l’itérateur de partition renvoyé à partir d’une requête `GetShardIterator`.

Pour obtenir une description complète de ces opérations d’API, y compris des exemples de demandes et de réponses, consultez la [Référence des API Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html).

### Découverte de partitions
<a name="Streams.ShardDiscovery"></a>



Découvrez de nouvelles partitions dans votre flux DynamoDB à l’aide de deux méthodes puissantes. En tant qu’utilisateur d’Amazon DynamoDB Streams, vous disposez de deux méthodes efficaces pour suivre et identifier de nouvelles partitions :

**Interrogation de l’ensemble de la topologie du flux**  
Interrogez régulièrement le flux à l’aide de l’API `DescribeStream`. Celle-ci renvoie toutes les partitions du flux, y compris celles qui ont été créées. En comparant les résultats au fil du temps, vous pouvez détecter les partitions ajoutées récemment.

**Découverte de partitions enfants**  
Recherchez un sous-ensemble de partitions à l’aide de l’API `DescribeStream` avec le paramètre `ShardFilter`. En spécifiant une partition parent dans la demande, DynamoDB Streams renvoie ses partitions enfants immédiates. Cette approche est utile lorsque vous avez uniquement besoin de suivre la lignée des partitions sans analyser l’intégralité du flux.   
Les applications consommant des données de DynamoDB Streams peuvent passer efficacement de la lecture d’une partition fermée à sa partition enfant à l’aide du paramètre `ShardFilter`, évitant ainsi des appels répétés à l’API `DescribeStream` afin de récupérer et de parcourir la carte des partitions à la recherche des partitions fermées et ouvertes. Vous pouvez ainsi découvrir rapidement les partitions enfants après la fermeture d’une partition parent, ce qui rend vos applications de traitement de flux plus réactives et plus économiques.

Les deux méthodes vous permettent de suivre l’évolution de la structure de vos flux DynamoDB, afin de ne jamais manquer les mises à jour de données ou les modifications critiques de partitions.

### Limite de conservation des données pour DynamoDB Streams
<a name="Streams.DataRetention"></a>

Toutes les données dans DynamoDB Streams ont un time-to-live de 24 heures. Vous pouvez extraire et analyser les 24 dernières heures d’activité d’une table donnée. Cependant, les données datant de plus de 24 heures sont susceptibles d’être supprimées à tout moment.

Si vous désactivez un flux sur une table, les données du flux continueront d’être lisibles pendant 24 heures. Passé ce délai, les données expirent et les enregistrements de flux sont supprimés automatiquement. Il n’existe pas de mécanisme pour supprimer manuellement un flux existant. Vous devez attendre que la limite de rétention expire (24 heures) et tous les enregistrements de flux seront supprimés.

# DynamoDB Streams et time-to-live
<a name="time-to-live-ttl-streams"></a>

Vous pouvez sauvegarder, ou traiter de toute autre façon, les éléments supprimés par [Time-to-live](TTL.md) (TTL) en activant Amazon DynamoDB Streams sur la table et en traitant les enregistrements de flux des éléments expirés. Pour en savoir plus, consultez [Lecture et traitement de flux](Streams.md#Streams.Processing).

L’enregistrement de flux contient un champ d’identité utilisateur `Records[<index>].userIdentity`.

Les éléments supprimés par le processus Time-to-live après expiration ont les champs suivants :
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**Note**  
Lorsque vous utilisez le processus TTL dans une table globale, le champ `userIdentity` est défini dans la région dans laquelle ce processus a été effectué. Ce champ n’est pas défini dans les autres régions lorsque la suppression est répliquée.

Le JSON suivant montre la partie pertinente d’un seul enregistrement de flux.

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## Utilisation de DynamoDB Streams et Lambda pour archiver les éléments supprimés TTL
<a name="streams-archive-ttl-deleted-items"></a>

La combinaison de la [DynamoDB time-to-live (TTL)](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html), de [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), et de [AWS Lambda](https://aws.amazon.com/lambda/) peut permettre de simplifier l’archivage des données, de réduire les coûts de stockage de DynamoDB et de réduire la complexité du code. L’utilisation de Lambda comme consommateur de flux offre de nombreux avantages, notamment la réduction des coûts par rapport à d’autres consommateurs tels que Kinesis Client Library (KCL). Vous n’êtes pas facturé pour les appels d’API `GetRecords` sur votre flux DynamoDB lorsque vous utilisez Lambda pour consommer des événements, et Lambda peut fournir un filtrage d’événements en identifiant les modèles JSON dans un événement de flux. Avec le filtrage de contenu des modèles d’événements, vous pouvez définir jusqu’à cinq filtres différents pour contrôler quels événements sont envoyés à Lambda en vue d’être traités. Cela permet de réduire les appels de vos fonctions Lambda, de simplifier le code et de réduire le coût global.

Alors que DynamoDB Streams contient toutes les modifications de données, telles que les actions `Create`, `Modify` et `Remove`, cela peut entraîner des appels indésirables de votre fonction Lambda d’archivage. Par exemple, supposons que vous ayez une table contenant 2 millions de modifications de données par heure dans le flux, mais que moins de 5 % d’entre elles soient des suppressions d’éléments qui expireront via le processus TTL et devront être archivées. Avec les [filtres de source d’événement Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), la fonction Lambda n’effectue que 100 000 appels par heure. Il en résulte que le filtrage des événements vous est facturé uniquement pour les appels nécessaires au lieu des 2 millions d’appels que vous auriez sans le filtrage des événements.

Le filtrage des événements est appliqué au [mappage de source d’événement Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html), une ressource qui lit à partir d’un événement choisi (le flux DynamoDB) et appelle une fonction Lambda. Dans le diagramme suivant, vous pouvez voir comment un élément time-to-live supprimé est consommé par une fonction Lambda utilisant des flux et des filtres d’événements.

![\[Un élément supprimé au moyen du processus TTL lance une fonction Lambda qui utilise des flux et des filtres d’événements.\]](http://docs.aws.amazon.com/fr_fr/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### Modèle de filtrage d’événement DynamoDB time-to-live
<a name="ttl-event-filter-pattern"></a>

L’ajout du JSON suivant à vos [critères de filtrage](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html) du mappage de source d’événement permet l’appel de votre fonction Lambda uniquement pour les éléments supprimés TTL :

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### Création d'un mappage des sources d' AWS Lambda événements
<a name="create-event-source-mapping"></a>

Utilisez les extraits de code suivants pour créer un mappage de source d’événement filtré que vous pouvez connecter au flux DynamoDB d’une table. Chaque bloc de code inclut le modèle de filtre d’événement.

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# Utilisation de l’adaptateur DynamoDB Streams Kinesis pour traiter des enregistrements de flux
<a name="Streams.KCLAdapter"></a>

L’utilisation de l’adaptateur Amazon Kinesis est la méthode recommandée pour consommer des flux d’Amazon DynamoDB. L’API DynamoDB Streams est volontairement semblable à celle de Kinesis Data Streams. Dans les deux services, les flux de données sont composés de partitions qui sont des conteneurs pour enregistrements de flux. Les deux services APIs contiennent`ListStreams`, `DescribeStream``GetShards`, et `GetShardIterator` opérations. (Si ces actions de DynamoDB Streams sont similaires à leurs homologues dans Kinesis Data Streams, elles ne sont pas identiques à 100 %.)

En tant qu’utilisateur de DynamoDB Streams, vous pouvez utiliser les modèles de conception figurant dans la KCL pour traiter les partitions et les enregistrements de flux de DynamoDB Streams. Pour ce faire, vous utilisez l’adaptateur Kinesis DynamoDB Streams. L’adaptateur Kinesis implémente l’interface Kinesis Data Streams afin que la KCL puisse être utilisée pour la consommation et le traitement des enregistrements de DynamoDB Streams. [Pour obtenir des instructions sur la configuration et l'installation de l'adaptateur DynamoDB Streams Kinesis, consultez le référentiel. GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)

Vous pouvez écrire des applications pour Kinesis Data Streams à l’aide de la bibliothèque client Kinesis (KCL). La KCL simplifie le codage en fournissant des abstractions utiles par-dessus l’API Kinesis Data Streams de bas niveau. Pour en savoir plus sur la KCL, consultez [Développement d’applications consommateur à l’aide de la bibliothèque client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

DynamoDB recommande d'utiliser la version 3.x de KCL avec le SDK AWS pour Java v2.x. [La version 1.x de l'adaptateur DynamoDB Streams Kinesis actuelle AWS avec SDK AWS SDK pour Java pour v1.x continuera d'être entièrement prise en charge tout au long de son cycle de vie, comme prévu pendant la période de transition, conformément à la politique de maintenance des outils.AWS SDKs ](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)

**Note**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous recommandons vivement de migrer vos applications KCL utilisant la version 1.x vers la version la plus récente de la KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page de la bibliothèque [client Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour en savoir plus sur la bibliothèque client Kinesis, consultez [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html). Pour en savoir plus sur la migration de la KCL 1.x vers la KCL 3.x, consultez Migration de la KCL 1.x vers la KCL 3.x.

Le diagramme suivant illustre la manière dont ces bibliothèques interagissent entre elles.

![\[Interactions entre DynamoDB Streams, Kinesis Data Streams et la KCL pour le traitement des enregistrements DynamoDB Streams.\]](http://docs.aws.amazon.com/fr_fr/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Avec l’adaptateur Kinesis DynamoDB Streams en place, vous pouvez commencer à développer sur l’interface KCL, avec les appels d’API dirigés de manière transparente vers le point de terminaison DynamoDB Streams.

Lorsque votre application démarre, elle appelle la KCL pour instancier un worker. Vous devez fournir au travailleur les informations de configuration de l'application, telles que le descripteur de flux et les AWS informations d'identification, ainsi que le nom d'une classe de processeur d'enregistrements que vous fournissez. À mesure qu’il exécute le code dans le processeur d’enregistrements, le worker effectue les tâches suivantes :
+ Se connecte au flux
+ Énumère les partitions dans le flux
+ Vérifie et énumère les partitions enfants d’une partition parent fermée dans le flux
+ Coordonne les associations de partition avec les autres travaux (le cas échéant)
+ Instancie un processeur d’enregistrements pour chaque partition qu’il gère
+ Extrait des enregistrements du flux
+ Évalue le taux d'appels d' GetRecords API en cas de débit élevé (si le mode rattrapage est configuré)
+ Pousse les enregistrements sur le processeur d’enregistrements correspondant
+ Contrôle les enregistrements traités
+ Équilibre les associations partition-worker lorsque le nombre d’instances de worker change
+ Équilibre les associations partition-worker quand des partitions sont fractionnées

L'adaptateur KCL prend en charge le mode rattrapage, une fonction de réglage automatique du débit d'appels permettant de gérer les augmentations de débit temporaires. Lorsque le délai de traitement du flux dépasse un seuil configurable (une minute par défaut), le mode rattrapage redimensionne la fréquence des appels d' GetRecords API d'une valeur configurable (3 fois par défaut) pour récupérer les enregistrements plus rapidement, puis revient à la normale une fois le décalage réduit. Cela est utile pendant les périodes de haut débit où l'activité d'écriture DynamoDB peut submerger les consommateurs en utilisant les taux d'interrogation par défaut. Le mode rattrapage peut être activé via le paramètre de `catchupEnabled` configuration (faux par défaut).

**Note**  
Pour une description des concepts de KCL évoqués ici, consultez [Développement d’applications consommateur à l’aide de la bibliothèque client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.  
Pour plus d'informations sur l'utilisation des flux avec, AWS Lambda voir [Streams et déclencheurs DynamoDB AWS Lambda](Streams.Lambda.md)

# Migration de la KCL 1.x vers la KCL 3.x
<a name="streams-migrating-kcl"></a>

## Présentation de
<a name="migrating-kcl-overview"></a>

Ce guide fournit des instructions pour migrer votre application consommateur de la KCL 1.x vers la KCL 3.x. En raison des différences architecturales entre la KCL 1.x et la KCL 3.x, la migration nécessite la mise à jour de plusieurs composants pour garantir la compatibilité.

La KCL 1.x utilise des classes et interfaces différentes rapport à la KCL 3.x. Vous devez d’abord migrer le processeur d’enregistrements, la fabrique de processeurs d’enregistrements et les classes de workers vers le format compatible avec la KCL 3.x, puis suivre les étapes de migration de la KCL 1.x vers la KCL 3.x.

## Étapes de la migration
<a name="migration-steps"></a>

**Topics**
+ [Étape 1 : migrer le processeur d’enregistrements](#step1-record-processor)
+ [Étape 2 : migrer la fabrique de processeurs d’enregistrements](#step2-record-processor-factory)
+ [Étape 3 : migrer le worker](#step3-worker-migration)
+ [Étape 4 : présentation de la configuration de la KCL 3.x et recommandations](#step4-configuration-migration)
+ [Étape 5 : migrer de la KCL 2.x vers la KCL 3.x](#step5-kcl2-to-kcl3)

### Étape 1 : migrer le processeur d’enregistrements
<a name="step1-record-processor"></a>

L’exemple suivant illustre un processeur d’enregistrements implémenté pour l’adaptateur DynamoDB Streams Kinesis version 1.x :

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Pour migrer la RecordProcessor classe**

1. Remplacez les interfaces `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` et `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` par `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` comme suit :

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Mettez à jour les instructions d’importation des méthodes `initialize` et `processRecords` :

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Remplacez la méthode `shutdownRequested` par les nouvelles méthodes suivantes : `leaseLost`, `shardEnded` et `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Voici la version mise à jour de la classe du processeur d’enregistrements :

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**Note**  
L'adaptateur DynamoDB Streams Kinesis utilise désormais le modèle Record. SDKv2 Dans SDKv2, les `AttributeValue` objets complexes (`BS`,,`NS`, `M``L`,`SS`) ne renvoient jamais la valeur nulle. Vérifiez si ces valeurs existent à l’aide des méthodes `hasBs()`, `hasNs()`, `hasM()`, `hasL()` et `hasSs()`.

### Étape 2 : migrer la fabrique de processeurs d’enregistrements
<a name="step2-record-processor-factory"></a>

La fabrique de processeurs d’enregistrements est responsable de la création des processeurs d’enregistrements lorsqu’un bail est acquis. Voici un exemple de fabrique KCL 1.x :

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Pour migrer la `RecordProcessorFactory`**
+ Remplacez l’interface implémentée `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` par `software.amazon.kinesis.processor.ShardRecordProcessorFactory` comme suit :

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Voici un exemple de fabrique de processeurs d’enregistrements dans 3.0 :

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Étape 3 : migrer le worker
<a name="step3-worker-migration"></a>

Dans la version 3.0 de la KCL, une nouvelle classe, appelée **Scheduler**, remplace la classe **Worker**. Voici un exemple de worker KCL 1.x :

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Pour migrer le worker**

1. Modifiez l’instruction `import` de la classe `Worker` pour les instructions d’importation pour les classes `Scheduler` et `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importez `StreamTracker` et remplacez l’importation `StreamsWorkerFactory` par `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Choisissez la position à partir de laquelle vous souhaitez démarrer l’application. Vous avez le choix entre `TRIM_HORIZON` et `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Créez une instance `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Créez l’objet `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Créez l’objet `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Créez le `Scheduler` à l’aide de `ConfigsBuilder`, comme illustré dans l’exemple suivant :

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Important**  
Le paramètre `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` assure la compatibilité entre l’adaptateur DynamoDB Streams Kinesis pour la KCL v3 et la KCL v1, et non entre la KCL v2 et la KCL v3.

### Étape 4 : présentation de la configuration de la KCL 3.x et recommandations
<a name="step4-configuration-migration"></a>

Pour obtenir une description détaillée des configurations introduites après la KCL 1.x qui sont pertinentes dans la KCL 3.x, consultez [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) et [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Important**  
Au lieu de créer directement des objets de `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` et `retrievalConfig`, nous vous recommandons de définir des configurations dans la KCL 3.x et versions ultérieures à l’aide de `ConfigsBuilder`, afin d’éviter les problèmes d’initialisation du Scheduler. `ConfigsBuilder` fournit une méthode plus flexible et plus facile à gérer pour configurer votre application KCL.

#### Configurations avec mise à jour de la valeur par défaut dans la KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Dans la KCL version 1.x, la valeur par défaut de `billingMode` est définie sur `PROVISIONED`. En revanche, avec la KCL version 3.x, le `billingMode` par défaut est `PAY_PER_REQUEST` (mode à la demande). Nous vous recommandons d’utiliser le mode de capacité à la demande pour votre table de baux, afin d’ajuster automatiquement la capacité en fonction de votre utilisation. Pour obtenir des conseils sur l’utilisation de la capacité allouée pour vos tables de baux, consultez [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
Dans la KCL version 1.x, la valeur par défaut de `idleTimeBetweenReadsInMillis` est définie sur 1 000 (soit 1 seconde). La KCL version 3.x définit la valeur par défaut de i`dleTimeBetweenReadsInMillis` sur 1 500 (soit 1,5 seconde), mais l’adaptateur Amazon DynamoDB Streams Kinesis remplace la valeur par défaut par 1 000 (soit 1 seconde).

#### Nouvelles configurations de la KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Cette configuration définit l’intervalle de temps avant que les partitions récemment découvertes ne commencent à être traitées. Elle est calculée comme suit : 1,5 × `leaseAssignmentIntervalMillis`. Si ce paramètre n’est pas explicitement configuré, l’intervalle de temps est défini par défaut sur 1,5 × `failoverTimeMillis`. Le traitement des nouvelles partitions consiste à analyser la table de baux et à interroger un index secondaire global (GSI) de la table de baux. La réduction de `leaseAssignmentIntervalMillis` augmente la fréquence de ces opérations d’analyse et d’interrogation, ce qui entraîne une augmentation des coûts de DynamoDB. Nous vous recommandons de définir cette valeur sur 2 000 (soit 2 secondes) afin de réduire le délai de traitement des nouvelles partitions.

`shardConsumerDispatchPollIntervalMillis`  
Cette configuration définit l’intervalle entre les interrogations successives effectuées par le consommateur de partitions pour déclencher des transitions d’état. Dans la KCL version 1.x, ce comportement était contrôlé par le paramètre `idleTimeInMillis`, qui n’était pas exposé en tant que paramètre configurable. Avec la KCL version 3.x, nous vous recommandons de définir cette configuration de sorte qu’elle corresponde à la valeur utilisée pour ` idleTimeInMillis` dans votre configuration de la KCL version 1.x.

### Étape 5 : migrer de la KCL 2.x vers la KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Pour garantir une transition fluide et une compatibilité avec la version la plus récente de la bibliothèque client Kinesis (KCL), suivez les étapes 5 à 8 des instructions du guide de migration pour la [mise à niveau de la KCL 2.x vers la KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Pour la résolution des problèmes courants liés à la KCL 3.x, consultez [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Restauration par régression de la version précédente de la KCL
<a name="kcl-migration-rollback"></a>

Cette rubrique explique comment restaurer la version précédente de la KCL pour votre application consommateur. Le processus de restauration par régression comprend deux étapes :

1. Exécuter l’[outil de migration de la KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)

1. Redéployer le code de la version précédente de la KCL

## Étape 1 : exécuter l’outil de migration de la KCL
<a name="kcl-migration-rollback-step1"></a>

Si vous avez besoin de restaurer la version précédente de la KCL, vous devez exécuter l’outil de migration de la KCL. L’outil effectue deux tâches importantes :
+ Il supprime une table de métadonnées appelée table des métriques de worker, ainsi qu’un index secondaire global de la table des baux dans DynamoDB. Ces artefacts sont créés par la KCL 3.x, mais ils ne sont pas nécessaires lorsque vous restaurez la version précédente.
+ Ainsi, tous les workers peuvent s’exécuter dans un mode compatible avec la KCL 1.x et commencer à utiliser l’algorithme d’équilibrage de charge utilisé dans les versions précédentes de la KCL. Si vous rencontrez des problèmes avec le nouvel algorithme d’équilibrage de charge dans la KCL 3.x, cela permettra de les résoudre immédiatement.

**Important**  
La table des états de coordinateur de DynamoDB doit exister et ne doit pas être supprimée pendant les processus de migration, de restauration par régression et de restauration par progression.

**Note**  
Il est important que tous les workers de votre application consommateur utilisent le même algorithme d’équilibrage de charge à un moment donné. L’outil de migration de la KCL s’assure que tous les workers de votre application consommateur KCL 3.x basculent vers le mode compatible avec la KCL 1.x, afin qu’ils exécutent le même algorithme d’équilibrage de charge lors de la restauration par régression de l’application vers la version précédente de la KCL.

Vous pouvez télécharger l'[outil de migration KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) dans le répertoire des scripts du référentiel [KCL GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master). Exécutez le script à partir d’un worker ou d’un hôte disposant des autorisations appropriées pour écrire dans la table des états de coordinateur, la table des métriques de worker et la table des baux. Assurez-vous que les [autorisations IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) appropriées sont configurées pour les applications consommateur KCL. Exécutez le script une seule fois par application KCL à l’aide de la commande spécifiée :

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
*region*Remplacez-le par votre Région AWS.

`--application_name`  
Ce paramètre est obligatoire si vous utilisez des noms par défaut pour vos tables de métadonnées DynamoDB (table des baux, table des états de coordinateur et table des métriques de worker). Si vous avez spécifié des noms personnalisés pour ces tables, vous pouvez omettre ce paramètre. *applicationName*Remplacez-le par le nom réel de votre application KCL. L’outil l’utilise pour créer les noms de table par défaut si aucun nom personnalisé n’est fourni.

`--lease_table_name`  
Ce paramètre est nécessaire si vous avez défini un nom personnalisé pour la table des baux dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. *leaseTableName*Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table de location.

`--coordinator_state_table_name`  
Ce paramètre est nécessaire si vous avez défini un nom personnalisé pour la table des états de coordinateur dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. *coordinatorStateTableName*Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table d'état des coordinateurs.

`--worker_metrics_table_name`  
Ce paramètre est nécessaire si vous avez défini un nom personnalisé pour la table des métriques de worker dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. *workerMetricsTableName*Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre tableau des métriques des travailleurs.

## Étape 2 : redéployer le code avec la version précédente de la KCL
<a name="kcl-migration-rollback-step2"></a>

**Important**  
Toute mention de la version 2.x dans la sortie générée par l’outil de migration de la KCL doit être interprétée comme faisant référence à la KCL version 1.x. L’exécution du script n’effectue pas de restauration par régression complète : elle fait uniquement basculer l’algorithme d’équilibrage de charge vers celui utilisé dans la KCL version 1.x.

Après avoir exécuté l’outil de migration de la KCL pour effectuer une restauration par régression, l’un des messages suivants s’affiche :

Message 1  
« Restauration par régression terminée. Votre application exécutait une fonctionnalité compatible avec la version 2x. Veuillez restaurer les fichiers binaires précédents de votre application en déployant le code avec votre version précédente de la KCL. »  
**Action requise :** vos workers s’exécutaient dans le mode compatible avec la KCL 1.x. Redéployez le code avec la version précédente de la KCL sur vos workers.

Message 2  
« Restauration par régression terminée. Votre application KCL exécutait une fonctionnalité compatible avec la version 3x. Une fonctionnalité compatible avec la version 2x a été restaurée par régression. Si vous ne constatez aucune amélioration après un court laps de temps, restaurez les fichiers binaires précédents de votre application en déployant le code avec votre version précédente de la KCL. »  
**Action requise :** vos workers s’exécutaient dans le mode compatible avec la KCL 3.x et l’outil de migration de la KCL a fait basculer tous les workers vers le mode compatible avec la KCL 1.x. Redéployez le code avec la version précédente de la KCL sur vos workers.

Message 3  
« L’application a déjà fait l’objet d’une restauration par régression. Toutes KCLv3 les ressources susceptibles d'être supprimées ont été nettoyées afin d'éviter des frais jusqu'à ce que l'application puisse être reportée avec la migration. »  
**Action requise :** vos workers ont déjà fait l’objet d’une restauration par régression pour s’exécuter dans le mode compatible avec la KCL 1.x. Redéployez le code avec la version précédente de la KCL sur vos workers.

# Restauration par progression de la KCL 3.x après une restauration par régression
<a name="kcl-migration-rollforward"></a>

Cette rubrique explique comment restaurer par progression votre application consommateur vers la KCL 3.x après une restauration par régression. Lorsque vous devez effectuer une restauration par progression, vous devez suivre un processus en deux étapes :

1. Exécuter l’[outil de migration de la KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)

1. Déployer le code avec la KCL 3.x

## Étape 1 : exécuter l’outil de migration de la KCL
<a name="kcl-migration-rollforward-step1"></a>

Exécutez l’outil de migration de la KCL avec la commande suivante pour effectuer une restauration par progression vers la KCL 3.x :

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
*region*Remplacez-le par votre Région AWS.

`--application_name`  
Ce paramètre est obligatoire si vous utilisez des noms par défaut pour votre table des états de coordinateur. Si vous avez spécifié des noms personnalisés pour la table des états de coordinateur, vous pouvez omettre ce paramètre. *applicationName*Remplacez-le par le nom réel de votre application KCL. L’outil l’utilise pour créer les noms de table par défaut si aucun nom personnalisé n’est fourni.

`--coordinator_state_table_name`  
Ce paramètre est nécessaire si vous avez défini un nom personnalisé pour la table des états de coordinateur dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. *coordinatorStateTableName*Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table d'état des coordinateurs.

Une fois que vous avez exécuté l’outil de migration en mode restauration par progression, la KCL crée les ressources DynamoDB suivantes requises pour la KCL 3.x :
+ Un index secondaire global sur la table des baux
+ Une table des métriques de worker

## Étape 2 : déployer le code avec la KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Après avoir exécuté l’outil de migration de la KCL pour une restauration par progression, déployez votre code avec la KCL 3.x sur vos workers. Pour terminer votre migration, consultez [Step 8: Complete the migration](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Démonstration : adaptateur Kinesis DynamoDB Streams
<a name="Streams.KCLAdapter.Walkthrough"></a>

Cette section est une démonstration d’une application Java qui utilise la bibliothèque client Amazon Kinesis et l’adaptateur Amazon DynamoDB Streams. L’application illustre un exemple de la réplication de données, où l’activité d’écriture d’une table est appliquée à une seconde table, avec le contenu des deux tables demeurant synchronisé. Pour le code source, consultez [Programme complet : adaptateur DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

Le programme exécute les tâches suivantes :

1. Crée deux tables DynamoDB nommées `KCL-Demo-src` et `KCL-Demo-dst`. Chacune de ces tables dispose d’un flux activé sur elle-même.

1. Génère une activité de mise à jour de la table source en ajoutant, mettant à jour et supprimant des éléments. Cela entraîne l’écriture des données sur le flux de la table.

1. Lit les enregistrements du flux, les reconstruit en tant que demandes DynamoDB, et applique les demandes à la table de destination.

1. Analyse les tables source et de destination afin de s’assurer que leurs contenus sont identiques.

1. Nettoie en supprimant les tables.

Ces étapes sont décrites dans les sections suivantes et l’application complète est illustrée à la fin de la procédure pas à pas.

**Topics**
+ [Étape 1 : créer des tables DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Étape 2 : générer une activité de mise à jour de la table source](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Étape 3 : traiter le flux](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Étape 4 : s’assurer que les deux tables ont un contenu identique](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Étape 5 : nettoyer](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programme complet : adaptateur DynamoDB Streams Kinesis](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Étape 1 : créer des tables DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

La première étape consiste à créer deux tables DynamoDB : une table source et une table de destination. Le `StreamViewType` sur le flux de la table source est `NEW_IMAGE`. Cela signifie que chaque fois qu’un élément est modifié dans la table, l’image « après » de l’élément est écrite dans le flux. Ainsi, le flux assure le suivi des toute l’activité d’écriture sur la table.

L’extrait de code suivant illustre le code utilisé pour la création des deux tables.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Étape 2 : générer une activité de mise à jour de la table source
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

L’étape suivante consiste à créer une activité d’écriture sur la table source. Tandis que cette activité a lieu, le flux de la table source est aussi mis à jour en quasi-temps réel.

L’application définit une classe d’assistance avec les méthodes qui appellent les actions d’API `PutItem`, `UpdateItem` et `DeleteItem` pour écrire les données. L’extrait de code suivant montre comment ces méthodes sont utilisées.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Étape 3 : traiter le flux
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Maintenant le programme commence à traiter le flux. L’adaptateur DynamoDB Streams Kinesis agit comme une couche transparente entre la KCL et le point de terminaison DynamoDB Streams, afin que le code puisse pleinement exploiter la KCL au lieu de devoir effectuer des appels DynamoDB Streams de bas niveau. Le programme effectue les tâches suivantes :
+ Il définit une classe de processeur d’enregistrements, `StreamsRecordProcessor`, avec des méthodes conformes à la définition de l’interface KCL : `initialize`, `processRecords` et `shutdown`. La méthode `processRecords` contient la logique nécessaire pour lire à partir du flux de la table source et écrire dans la table de destination.
+ Il définit une fabrique de classe pour la classe de processeur d’enregistrements (`StreamsRecordProcessorFactory`). Cette action est obligatoire pour les programmes Java qui utilisent le KCL.
+ Il instancie un nouveau KCL `Worker`, associé à la fabrique de classe.
+ Il arrête le `Worker` lorsque le traitement des enregistrements est terminé.

Activez éventuellement le mode rattrapage dans la configuration de votre adaptateur Streams KCL pour augmenter automatiquement le taux d'appels d' GetRecords API de 3 fois (par défaut) lorsque le délai de traitement des flux dépasse une minute (par défaut), afin d'aider votre consommateur de flux à gérer les pics de débit élevés dans votre table.

Pour en savoir plus sur la définition de l’interface de la KCL, consultez [Développement d’applications consommateur à l’aide de la bibliothèque client Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) dans le *Guide du développeur Amazon Kinesis Data Streams*. 

L’extrait de code suivant illustre la boucle principale dans `StreamsRecordProcessor`. L’instruction `case` détermine l’action à exécuter, en fonction de l’`OperationType` qui s’affiche dans l’enregistrement de flux.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Étape 4 : s’assurer que les deux tables ont un contenu identique
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

À ce stade, le contenu des tables source et destination est synchronisé. L’application émet des demandes `Scan` sur les deux tables afin de vérifier que leurs contenus sont, en fait, identiques.

La classe `DemoHelper` contient une méthode `ScanTable` qui appelle l’API `Scan` de bas niveau. L’exemple suivant illustre la marche à suivre.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Étape 5 : nettoyer
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

Comme la démonstration est terminée, l’application supprime les tables source et destination. Consultez l’exemple de code suivant. Même après que les tables sont supprimées, leurs flux demeurent accessibles 24 heures, délai au-delà duquel ils sont automatiquement supprimés.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Programme complet : adaptateur DynamoDB Streams Kinesis
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Voici le programme Java complet qui effectue les tâches décrites dans [Démonstration : adaptateur Kinesis DynamoDB Streams](Streams.KCLAdapter.Walkthrough.md). Lorsque vous l’exécutez, vous devez visualiser une sortie similaire à ce qui suit.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**Important**  
 Pour exécuter ce programme, assurez-vous que l'application cliente a accès à DynamoDB et à CloudWatch Amazon à l'aide de politiques. Pour de plus amples informations, veuillez consulter [Politiques basées sur l’identité pour DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

Le code source se compose de quatre `.java` fichiers. Pour créer ce programme, ajoutez la dépendance suivante, qui inclut la bibliothèque client Amazon Kinesis (KCL) 3.x et le SDK pour AWS Java v2 en tant que dépendances transitives :

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

Les fichiers sources sont les suivants :
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# API de bas niveau DynamoDB Streams : exemple Java
<a name="Streams.LowLevel.Walkthrough"></a>

**Note**  
Le code sur cette page n’est pas exhaustif et ne gère pas tous les scénarios de consommation d’Amazon DynamoDB Streams. La manière recommandée d’utiliser les enregistrements de flux de DynamoDB est de le faire via l’adaptateur Amazon Kinesis en utilisant la bibliothèque client Kinesis (KCL), comme décrit dans [Utilisation de l’adaptateur DynamoDB Streams Kinesis pour traiter des enregistrements de flux](Streams.KCLAdapter.md).

Cette section contient un programme Java qui affiche DynamoDB Streams en action. Le programme exécute les tâches suivantes :

1. Crée une table DynamoDB avec un flux activé.

1. Décrit les paramètres de flux de cette table.

1. Modifie les données de la table.

1. Décrit les partitions du flux.

1. Lit les enregistrements de flux des partitions.

1. Récupère les partitions enfants et continue de lire les enregistrements.

1. Nettoie.

Lorsque vous exécutez le programme, vous obtenez une sortie similaire à ce qui suit :

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example Exemple**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# Streams et déclencheurs DynamoDB AWS Lambda
<a name="Streams.Lambda"></a>

Amazon DynamoDB est intégré afin que vous puissiez *créer AWS Lambda * des déclencheurs, des éléments de code qui répondent automatiquement aux événements dans DynamoDB Streams. Avec des déclencheurs, vous pouvez créer des applications qui réagissent à des modifications de données dans des tables DynamoDB.

**Topics**
+ [Tutoriel \$11 : Utilisation de filtres pour traiter tous les événements avec Amazon AWS Lambda DynamoDB et utilisation du AWS CLI](Streams.Lambda.Tutorial.md)
+ [Tutoriel n° 2 : utilisation de filtres pour traiter certains événements avec DynamoDB et Lambda](Streams.Lambda.Tutorial2.md)
+ [Bonnes pratiques relatives à l’utilisation de DynamoDB Streams avec Lambda](Streams.Lambda.BestPracticesWithDynamoDB.md)

Si vous activez DynamoDB Streams sur une table, vous pouvez associer le flux Amazon Resource Name (ARN) à une AWS Lambda fonction que vous écrivez. Toutes les actions de mutation vers cette table DynamoDB peuvent ensuite être capturées en tant qu’élément dans le flux. Par exemple, vous pouvez définir un déclencheur pour que, lorsqu’un élément de table est modifié, un nouvel enregistrement apparaisse immédiatement dans le flux de cette table. 

**Note**  
Si vous abonnez plus de deux fonctions Lambda à un flux DynamoDB, une limitation de lecture peut se produire.

Le service [AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) interroge le flux à la recherche de nouveaux enregistrements quatre fois par seconde. Lorsque de nouveaux enregistrements de flux sont disponibles, votre fonction Lambda est invoquée de manière synchrone. Vous pouvez abonner jusqu’à deux fonctions Lambda au même flux DynamoDB. Si vous abonnez plus de deux fonctions Lambda au même flux DynamoDB, une limitation de lecture peut se produire.

La fonction Lambda peut envoyer une notification, lancer un flux de travail ou effectuer un grand nombre d’actions spécifiées par vos soins. Vous pouvez écrire une fonction Lambda simplement afin de copier chaque enregistrement de flux vers un stockage permanent tel qu’Amazon S3 File Gateway (Amazon S3) et créer une piste d’audit permanente de l’activité d’écriture dans votre table. Ou supposons que vous ayez une application de jeux pour appareils mobiles qui écrive dans une table `GameScores`. Chaque fois que l’attribut `TopScore` de la table `GameScores` est mis à jour, un enregistrement de flux correspondant est écrit dans le flux de la table. Cet événement peut alors déclencher une fonction Lambda qui publie un message de félicitations sur un réseau social. Cette fonction ignorerait simplement tout enregistrement de flux qui ne serait pas une mise à jour de `GameScores`, ou qui ne modifierait pas l’attribut `TopScore`.

Si votre fonction renvoie une erreur, Lambda réessaie de traiter le lot jusqu’à ce que le traitement réussisse ou que les données expirent. Vous pouvez également configurer Lambda pour effectuer de nouvelles tentatives avec un lot plus petit, limiter le nombre de tentatives, supprimer les enregistrements une fois qu’ils sont trop anciens et d’autres options.

Afin de respecter les bonnes pratiques en matière de performances, la fonction Lambda doit être de courte durée. Pour éviter d’introduire des retards de traitement inutiles, elle ne doit pas non plus exécuter de logique complexe. Pour un flux à haute vitesse en particulier, il est préférable de déclencher des flux de travail asynchrones avec des fonctions de post-traitement par étapes plutôt que des Lambdas synchrones de longue durée.

 Vous pouvez utiliser des déclencheurs Lambda sur différents AWS comptes en configurant une politique basée sur les ressources sur le flux DynamoDB afin d'accorder à la fonction Lambda un accès en lecture entre comptes. Pour en savoir plus sur la façon de configurer votre flux afin d'autoriser l'accès entre comptes, voir [Partager l'accès avec les fonctions AWS Lambda entre comptes dans](rbac-cross-account-access.md#shared-access-cross-acount-lambda) le guide du développeur DynamoDB.

Pour plus d'informations à ce sujet AWS Lambda, consultez le [guide du AWS Lambda développeur](https://docs.aws.amazon.com/lambda/latest/dg/).

# Tutoriel \$11 : Utilisation de filtres pour traiter tous les événements avec Amazon AWS Lambda DynamoDB et utilisation du AWS CLI
<a name="Streams.Lambda.Tutorial"></a>

 

Dans ce didacticiel, vous allez créer un AWS Lambda déclencheur pour traiter un flux provenant d'une table DynamoDB.

**Topics**
+ [Étape 1 : créer une table DynamoDB avec un flux activé](#Streams.Lambda.Tutorial.CreateTable)
+ [Étape 2 : créer un rôle d’exécution Lambda](#Streams.Lambda.Tutorial.CreateRole)
+ [Étape 3 : créer une rubrique Amazon SNS](#Streams.Lambda.Tutorial.SNSTopic)
+ [Étape 4 : créer et tester une fonction Lambda](#Streams.Lambda.Tutorial.LambdaFunction)
+ [Étape 5 : créer et tester un déclencheur](#Streams.Lambda.Tutorial.CreateTrigger)

Le scénario de ce didacticiel est Woofer, un réseau social simple. Les utilisateurs de Woofer communiquent avec des *aboiements* (messages textuels brefs) qui sont envoyés à d’autres utilisateurs de Woofer. Le schéma suivant illustre les composants et le flux de travail de cette application.

![\[Flux de travail d’application Woofer composé d’une table DynamoDB, d’un enregistrement de flux, d’une fonction Lambda et d’une rubrique Amazon SNS.\]](http://docs.aws.amazon.com/fr_fr/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. Un utilisateur écrit un élément dans une table DynamoDB (`BarkTable`). Chaque élément de la table représente un aboiement.

1. Un nouvel enregistrement de flux est écrit pour refléter l’ajout de ce nouvel élément à `BarkTable`.

1. Le nouvel enregistrement du flux déclenche une AWS Lambda fonction (`publishNewBark`).

1. Si l’enregistrement de flux indique qu’un nouvel élément a été ajouté à `BarkTable`, la fonction Lambda lit les données de l’enregistrement de flux et publie un message sur une rubrique dans Amazon Simple Notification Service (Amazon SNS).

1. Le message est reçu par les abonnés à la rubrique Amazon SNS. Dans le cadre de ce didacticiel, le seul abonné est une adresse e-mail.

**Avant de commencer**  
Ce didacticiel utilise le AWS Command Line Interface AWS CLI. Si vous ne l’avez déjà fait, suivez les instructions du [Guide de l’utilisateur AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/) pour installer et configurer l’ AWS CLI.

## Étape 1 : créer une table DynamoDB avec un flux activé
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

Au cours de cette étape, vous allez créer une table DynamoDB (`BarkTable`) pour stocker tous les aboiements des utilisateurs de Woofer. La clé primaire se compose de `Username` (clé de partition) et de `Timestamp` (clé de tri). Les deux attributs sont de type string (chaîne).

`BarkTable` a un flux activé. Plus loin dans ce didacticiel, vous allez créer un déclencheur en associant une AWS Lambda fonction au flux.

1. Créez le flux à l’aide de la commande suivante.

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. Dans la sortie, recherchez le `LatestStreamArn`.

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   Notez `region` et `accountID`, car vous en aurez besoin pour les autres étapes de ce didacticiel.

## Étape 2 : créer un rôle d’exécution Lambda
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

Au cours de cette étape, vous créez un rôle Gestion des identités et des accès AWS (IAM) (`WooferLambdaRole`) et vous lui attribuez des autorisations. Ce rôle sera utilisé par la fonction Lambda que vous allez créer dans [Étape 4 : créer et tester une fonction Lambda](#Streams.Lambda.Tutorial.LambdaFunction). 

Vous allez également créer une politique pour le rôle. Cette politique contient toutes les autorisations dont la fonction Lambda a besoin lors de l’exécution.

1. Créez un fichier nommé `trust-relationship.json` avec les contenus suivants.

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. Entrez la commande suivante pour créer `WooferLambdaRole`.

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. Créez un fichier nommé `role-policy.json` avec les contenus suivants. (Remplacez `region` et `accountID` par votre AWS région et votre numéro de compte.)

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   La politique comporte quatre instructions pour autoriser `WooferLambdaRole` à effectuer les opérations suivantes :
   + Exécutez une fonction Lambda (`publishNewBark`). Vous allez créer cette fonction plus tard dans le cadre de ce didacticiel.
   + Accédez à Amazon CloudWatch Logs. La fonction Lambda écrit les diagnostics dans les CloudWatch journaux au moment de l'exécution.
   + Lisez les données du flux DynamoDB pour `BarkTable`.
   + Publiez des messages sur Amazon SNS.

1. Exécutez la commande suivante pour associer la politique à `WooferLambdaRole`.

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## Étape 3 : créer une rubrique Amazon SNS
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

Au cours de cette étape, vous allez créer une rubrique Amazon SNS (`wooferTopic`) et y abonner une adresse e-mail. Votre fonction Lambda utilise cette rubrique pour publier de nouveaux aboiements des utilisateurs de Woofer.

1. Entrez la commande suivante pour créer une rubrique Amazon SNS.

   ```
   aws sns create-topic --name wooferTopic
   ```

1. Tapez la commande suivante pour abonner une adresse e-mail à `wooferTopic`. (Remplacez `region` et `accountID` par votre région  AWS et votre ID de compte, puis remplacez `example@example.com` par une adresse e-mail valide.)

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS envoie un message de confirmation à votre adresse e-mail. Cliquez sur le lien **Confirm subscription (Confirmer l’abonnement)** de ce message pour finaliser le processus d’abonnement.

## Étape 4 : créer et tester une fonction Lambda
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

Au cours de cette étape, vous créez une AWS Lambda fonction (`publishNewBark`) pour traiter les enregistrements de flux à partir de`BarkTable`.

La fonction `publishNewBark` traite uniquement les événements de flux qui correspondent aux nouveaux éléments de `BarkTable`. La fonction lit les données d’un tel événement, puis appelle Amazon SNS pour les publier.

1. Créez un fichier nommé `publishNewBark.js` avec les contenus suivants. Remplacez `region` et `accountID` par votre AWS région et votre numéro de compte.

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. Créez un fichier zip pour `publishNewBark.js`. Si vous disposez de l’utilitaire de ligne de commande pour zipper, vous pouvez taper la commande suivante.

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. Lorsque vous créez la fonction Lambda, vous spécifiez l’Amazon Resource Name (ARN) pour `WooferLambdaRole`, que vous avez créé dans [Étape 2 : créer un rôle d’exécution Lambda](#Streams.Lambda.Tutorial.CreateRole). Tapez la commande suivante pour extraire cet ARN.

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   Dans la sortie, recherchez l’ARN pour `WooferLambdaRole`.

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   Utilisez la commande suivante pour créer la fonction Lambda. Remplacez *roleARN* par l'ARN pour`WooferLambdaRole`.

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. Maintenant, testez `publishNewBark` pour vérifier le bon fonctionnement. Pour cela, vous entrez des informations similaires à un enregistrement réel de DynamoDB Streams.

   Créez un fichier nommé `payload.json` avec les contenus suivants. Remplacez `region` et `accountID` par votre Région AWS et votre ID de compte.

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   Testez la fonction `publishNewBark` à l’aide de la commande suivante.

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   Si le test est réussi, vous verrez la sortie suivante.

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   De plus, le fichier `output.txt` contiendra le texte suivant.

   ```
   "Successfully processed 1 records."
   ```

   Vous recevrez également un nouveau message électronique quelques minutes après.
**Note**  
AWS Lambda écrit des informations de diagnostic dans Amazon CloudWatch Logs. Si vous rencontrez des erreurs avec la fonction Lambda, vous pouvez utiliser ces diagnostics à des fins de dépannage :  
Ouvrez la CloudWatch console à l'adresse [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
Dans le panneau de navigation, sélectionnez **Logs** (Journaux).
Choisissez le groupe de journal suivant : `/aws/lambda/publishNewBark`
Choisissez le dernier flux de journal pour consulter la sortie (et les erreurs) de la fonction.

## Étape 5 : créer et tester un déclencheur
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

Dans [Étape 4 : créer et tester une fonction Lambda](#Streams.Lambda.Tutorial.LambdaFunction), vous avez testé la fonction Lambda pour vérifier qu’elle s’exécutait correctement. Au cours de cette étape, vous allez créer un *déclencheur* en associant la fonction Lambda (`publishNewBark`) à une source d’événement (le flux `BarkTable`).

1. Lorsque vous créez le déclencheur, vous devez spécifier l’ARN du flux `BarkTable`. Tapez la commande suivante pour extraire cet ARN.

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   Dans la sortie, recherchez le `LatestStreamArn`.

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. Tapez la commande suivante pour créer le déclencheur. Remplacez `streamARN` par l’ARN du flux réel.

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. Testez le déclencheur. Tapez la commande suivante pour ajouter un élément à `BarkTable`.

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   Vous devriez recevoir un nouveau message électronique quelques minutes après.

1. Ouvrez la console DynamoDB et ajoutez quelques éléments à `BarkTable`. Vous devez spécifier les valeurs des attributs `Username` et `Timestamp`. Vous devez également spécifier une valeur pour `Message`, bien que cela ne soit pas obligatoire. Vous devriez recevoir un nouveau message électronique pour chaque élément ajouté à `BarkTable`.

   La fonction Lambda traite uniquement les nouveaux éléments que vous ajoutez à `BarkTable`. Si vous mettez à jour ou supprimez un élément dans la table, la fonction ne fait rien.

**Note**  
AWS Lambda écrit des informations de diagnostic dans Amazon CloudWatch Logs. Si vous rencontrez des erreurs avec la fonction Lambda, vous pouvez utiliser ces diagnostics à des fins de dépannage.  
Ouvrez la CloudWatch console à l'adresse [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).
Dans le panneau de navigation, sélectionnez **Logs** (Journaux).
Choisissez le groupe de journal suivant : `/aws/lambda/publishNewBark`
Choisissez le dernier flux de journal pour consulter la sortie (et les erreurs) de la fonction.

# Tutoriel n° 2 : utilisation de filtres pour traiter certains événements avec DynamoDB et Lambda
<a name="Streams.Lambda.Tutorial2"></a>

Dans ce didacticiel, vous allez créer un AWS Lambda déclencheur pour traiter uniquement certains événements d'un flux à partir d'une table DynamoDB.

**Topics**
+ [Tout mettre en place - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [Synthèse – CDK](#Streams.Lambda.Tutorial2.CDK)

Avec le [filtrage des événements Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html), vous pouvez utiliser des expressions de filtre pour contrôler les événements envoyés par Lambda à votre fonction pour traitement. Vous pouvez configurer jusqu’à 5 filtres différents par flux DynamoDB. Si vous utilisez des fenêtres de traitement par lots, Lambda applique les critères de filtre à chaque nouvel événement pour déterminer s’il doit être ajouté au lot actuel.

Les filtres sont appliqués via des structures nommées `FilterCriteria`. Les 3 principaux attributs de `FilterCriteria` sont `metadata properties`, `data properties` et `filter patterns`. 

Voici un exemple de structure d’un événement DynamoDB Streams :

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

Les `metadata properties` correspondent aux champs de l’objet d’événement. Dans le cas de DynamoDB Streams, les `metadata properties` correspondent à des champs tels que `dynamodb` ou `eventName`. 

Les `data properties` correspondent aux champs du corps de l’événement. Pour filtrer sur `data properties`, veillez à les contenir dans `FilterCriteria` à l’intérieur de la clé appropriée. Pour les sources d’événements DynamoDB, la clé de données est `NewImage` ou `OldImage`.

En dernier lieu, les règles de filtrage définissent l’expression de filtre que vous appliquez à une propriété spécifique. Voici quelques exemples :


| Opérateur de comparaison | Exemple | Syntaxe des règles (partielle) | 
| --- | --- | --- | 
|  Null  |  Type de produit null  |  `{ "product_type": { "S": null } } `  | 
|  Vide  |  Nom du produit vide  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Égal à  |  État égal à Floride  |  `{ "state": { "S": ["FL"] } } `  | 
|  Et  |  État du produit égal à Floride et catégorie de produit Chocolat  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  Or  |  L’État du produit est la Floride ou la Californie  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Pas  |  L’État du produit n’est pas la Floride  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Existe  |  Le produit fait maison existe  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  N’existe pas  |  Le produit fait maison n’existe pas  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  Commence par  |  PK commence par COMPANY  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Vous pouvez spécifier jusqu’à 5 modèles de filtrage des événements pour une fonction Lambda. Notez que chacun de ces 5 événements sera évalué comme un OR logique. Donc, si vous configurez deux filtres nommés `Filter_One` et `Filter_Two`, la fonction Lambda exécutera `Filter_One` OR `Filter_Two`.

**Note**  
La page de [filtrage des événements Lambda](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) propose certaines options permettant de filtrer et de comparer des valeurs numériques. Toutefois, dans le cas des événements de filtre DynamoDB, cela ne s’applique pas, car les nombres dans DynamoDB sont stockés sous forme de chaînes. Par exemple ` "quantity": { "N": "50" }`, nous savons que c’est un numéro grâce à la propriété `"N"`.

## Tout mettre en place - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

Pour illustrer la fonctionnalité de filtrage des événements dans la pratique, voici un exemple de CloudFormation modèle. Ce modèle va générer une table DynamoDB simple avec une clé de partition PK et une clé de tri SK avec Amazon DynamoDB Streams activé. Il créera une fonction Lambda et un rôle d’exécution Lambda simple qui permettront d’écrire des journaux dans Amazon Cloudwatch et de lire les événements depuis le flux Amazon DynamoDB. Il ajoutera également le mappage des sources d’événements entre les DynamoDB Streams et la fonction Lambda, afin que la fonction puisse être exécutée chaque fois qu’un événement se produit dans Amazon DynamoDB Stream.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

Après avoir déployé ce modèle CloudFormation, vous pouvez insérer l’élément Amazon DynamoDB suivant :

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Grâce à la fonction lambda simple incluse en ligne dans ce modèle de formation de cloud, vous verrez les événements relatifs à la fonction lambda dans les groupes de CloudWatch logs Amazon de la manière suivante :

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**Exemples de filtrage**
+ **Uniquement les produits qui correspondent à un État donné**

Cet exemple modifie le CloudFormation modèle pour inclure un filtre correspondant à tous les produits provenant de Floride, avec l'abréviation « FL ».

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Une fois que vous avez redéployé la pile, vous pouvez ajouter l’élément DynamoDB suivant à la table. Notez qu’il n’apparaîtra pas dans les journaux des fonctions Lambda, car le produit de cet exemple provient de Californie.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **Uniquement les éléments qui commencent par certaines valeurs dans PK et SK**

Cet exemple modifie le CloudFormation modèle pour inclure la condition suivante :

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Notez que la condition AND exige que la condition se trouve à l’intérieur du modèle, où les clés PK et SK se trouvent dans la même expression, séparées par des virgules.

Soit vous commencez avec des valeurs sur PK et SK, soit le produit provient d’un État donné.

Cet exemple modifie le CloudFormation modèle pour inclure les conditions suivantes :

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

Notez que la condition OR est ajoutée en introduisant de nouveaux modèles dans la section des filtres.

## Synthèse – CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

L’exemple de modèle de formation de projet CDK suivant décrit les fonctionnalités de filtrage des événements. Avant de travailler avec ce projet CDK, vous devez [installer les prérequis](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html), y compris l’[exécution de scripts de préparation](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html).

**Créer un projet CDK**

Créez d'abord un nouveau AWS CDK projet en l'invoquant `cdk init` dans un répertoire vide.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

La commande `cdk init` utilise le nom du dossier du projet pour nommer les différents éléments du projet, notamment les classes, les sous-dossiers et les fichiers. Tous les traits d’union figurant dans le nom du dossier sont convertis en traits de soulignement. Dans le cas contraire, le nom doit prendre la forme d’un identifiant Python. Par exemple, il ne doit pas commencer par un chiffre ni contenir d’espaces.

Pour travailler avec le nouveau projet, activez son environnement virtuel. Cela permet d’installer les dépendances du projet localement dans le dossier du projet, plutôt que globalement.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**Note**  
Il se peut que vous reconnaissiez qu'il s'agit de la Mac/Linux commande permettant d'activer un environnement virtuel. Les modèles Python incluent un fichier batch, `source.bat`, qui permet d’utiliser la même commande sous Windows. La commande Windows traditionnelle `.venv\Scripts\activate.bat` fonctionne également. Si vous avez initialisé votre AWS CDK projet à l'aide de AWS CDK Toolkit v1.70.0 ou d'une version antérieure, votre environnement virtuel se trouve dans le `.env` répertoire au lieu de. `.venv` 

**Infrastructure de base**

Ouvrez le fichier `./ddb_filters/ddb_filters_stack.py` dans l’éditeur de texte de votre choix. Ce fichier a été généré automatiquement lorsque vous avez créé le AWS CDK projet. 

Ensuite, ajoutez les fonctions `_create_ddb_table` et `_set_ddb_trigger_function`. Ces fonctions créeront une table DynamoDB avec la clé de partition PK et la clé de tri SK en mode provisionné et mode à la demande), avec Amazon DynamoDB Streams activé par défaut pour afficher les nouvelles et les anciennes images.

La fonction Lambda sera stockée dans le dossier `lambda` situé sous le fichier `app.py`. Ce fichier sera créé ultérieurement. Elle inclura une variable d’environnement `APP_TABLE_NAME`, qui sera le nom de la table Amazon DynamoDB créée par cette pile. Dans la même fonction, nous accorderons des autorisations de lecture de flux à la fonction Lambda. Enfin, elle s’abonnera aux DynamoDB Streams en tant que source d’événements pour la fonction Lambda. 

À la fin du fichier de la méthode `__init__`, vous appellerez les constructions respectives pour les initialiser dans la pile. Pour les projets plus importants qui nécessitent des composants et des services supplémentaires, il peut être préférable de définir ces constructions en dehors de la pile de base. 

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

Nous allons maintenant créer une fonction lambda très simple qui imprimera les journaux sur Amazon CloudWatch. Pour ce faire, créez un nouveau dossier appelé `lambda`.

```
mkdir lambda
touch app.py
```

À l’aide de votre éditeur de texte préféré, ajoutez le contenu suivant au fichier `app.py` :

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

Assurez-vous que vous vous trouvez dans le dossier `/ddb_filters/`, tapez la commande suivante pour créer l’exemple d’application :

```
cdk deploy
```

à un moment donné, il vous sera demandé de confirmer si vous souhaitez déployer la solution. Acceptez les modifications en saisissant `Y`.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

Une fois les modifications déployées, ouvrez votre AWS console et ajoutez un élément à votre tableau. 

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

Les CloudWatch journaux devraient désormais contenir toutes les informations de cette entrée. 

**Exemples de filtrage**
+ **Uniquement les produits qui correspondent à un État donné**

Ouvrez le fichier `ddb_filters/ddb_filters/ddb_filters_stack.py` et modifiez-le pour inclure le filtre correspondant à tous les produits égaux à « FL ». Cela peut être révisé juste sous `event_subscription`, à la ligne 45.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **Uniquement les éléments qui commencent par certaines valeurs dans PK et SK**

Modifiez le script Python de façon à inclure la condition suivante :

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **Soit vous commencez avec des valeurs sur PK et SK, soit le produit provient d’un État donné.**

Modifiez le script Python de façon à inclure les conditions suivantes :

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

Notez que la condition OR est ajoutée en ajoutant d’autres éléments au tableau de filtres.

**Nettoyage**

Localisez la pile de filtres dans la base de votre répertoire de travail et exécutez `cdk destroy`. Il vous sera demandé de confirmer la suppression de la ressource :

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# Bonnes pratiques relatives à l’utilisation de DynamoDB Streams avec Lambda
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

Une AWS Lambda fonction s'exécute dans un *conteneur*, c'est-à-dire un environnement d'exécution isolé des autres fonctions. Lorsque vous exécutez une fonction pour la première fois, AWS Lambda créez un nouveau conteneur et commencez à exécuter le code de la fonction.

Une fonction Lambda a un *gestionnaire* qui est exécuté une fois par appel. Le gestionnaire contient la logique métier principale de la fonction. Par exemple, la fonction Lambda illustrée dans [Étape 4 : créer et tester une fonction Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) comporte un gestionnaire qui peut traiter des enregistrements dans un flux DynamoDB. 

Vous pouvez également fournir un code d'initialisation qui ne s'exécute qu'une seule fois, après la création du conteneur, mais avant la première AWS Lambda exécution du gestionnaire. La fonction Lambda présentée dans [Étape 4 : créer et tester une fonction Lambda](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) contient un code d'initialisation qui importe le SDK JavaScript dans Node.js et crée un client pour Amazon SNS. Ces objets doivent être définis une seule fois, en dehors du gestionnaire.

Une fois la fonction exécutée, vous AWS Lambda pouvez choisir de réutiliser le conteneur pour les appels ultérieurs de la fonction. Dans ce cas, votre gestionnaire de fonction peut réutiliser les ressources que vous avez définies dans votre code d’initialisation. (Notez que vous ne pouvez pas contrôler la durée de conservation du conteneur par AWS Lambda , ni la réutilisation du conteneur.)

Pour AWS Lambda utiliser des déclencheurs DynamoDB, nous recommandons ce qui suit :
+ AWS les clients de service doivent être instanciés dans le code d'initialisation, et non dans le gestionnaire. Cela permet AWS Lambda de réutiliser les connexions existantes, pendant toute la durée de vie du conteneur.
+ En général, il n'est pas nécessaire de gérer explicitement les connexions ou d'implémenter un regroupement de connexions, car il AWS Lambda gère cela pour vous.

Un consommateur Lambda d’un flux DynamoDB ne garantit pas une livraison unique et peut entraîner des doublons occasionnels. Assurez-vous que le code de votre fonction Lambda est idempotent, afin d’éviter que des problèmes inattendus ne surviennent en raison d’un traitement dupliqué.

Pour plus d'informations, consultez la section [Bonnes pratiques relatives à l'utilisation AWS Lambda des fonctions](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html) dans le *Guide du AWS Lambda développeur*.

# DynamoDB Streams et Apache Flink
<a name="StreamsApacheFlink.xml"></a>

Vous pouvez utiliser des enregistrements Amazon DynamoDB Streams avec Apache Flink. Grâce au [service géré Amazon pour Apache Flink](https://aws.amazon.com/managed-service-apache-flink/), vous pouvez transformer et analyser des données de streaming en temps réel à l’aide d’Apache Flink. Apache Flink est un cadre de traitement de flux open source dédié au traitement de données en temps réel. Le connecteur Amazon DynamoDB Streams pour Apache Flink simplifie la création et la gestion des charges de travail Apache Flink et vous permet d’intégrer des applications à d’autres Services AWS.

Amazon Managed Service pour Apache Flink vous aide à créer rapidement des applications de traitement de end-to-end flux pour l'analyse des journaux, l'analyse des flux de clics, l'Internet des objets (IoT), les technologies publicitaires, les jeux vidéo, etc. Les quatre cas d'utilisation les plus courants sont le streaming extract-transform-load (ETL), les applications pilotées par les événements, les analyses réactives en temps réel et les requêtes interactives des flux de données. Pour plus d’informations sur l’écriture dans Apache Flink à partir d’Amazon DynamoDB, consultez [Amazon DynamoDB Streams Connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/).