

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utiliser la bibliothèque cliente Kinesis
<a name="kcl"></a>

## Qu'est-ce que Kinesis Client Library ?
<a name="kcl-library-what-is"></a>

Kinesis Client Library (KCL) est une bibliothèque logicielle Java autonome conçue pour simplifier le processus de consommation et de traitement des données provenant d'Amazon Kinesis Data Streams. KCL gère de nombreuses tâches complexes associées à l'informatique distribuée, ce qui permet aux développeurs de se concentrer sur la mise en œuvre de leur logique métier pour le traitement des données. Il gère des activités telles que l'équilibrage de charge entre plusieurs travailleurs, la réponse aux défaillances des travailleurs, le contrôle des enregistrements traités et la réponse aux modifications du nombre de partitions dans le flux.

KCL est fréquemment mis à jour pour intégrer de nouvelles versions des bibliothèques sous-jacentes, des améliorations de sécurité et des corrections de bogues. Nous vous recommandons d'utiliser la dernière version de KCL pour éviter les problèmes connus et bénéficier de toutes les dernières améliorations. Pour trouver la dernière version de KCL, consultez [KCL](https://github.com/awslabs/amazon-kinesis-client) Github. 

**Important**  
Nous vous recommandons d'utiliser la dernière version de KCL pour éviter les bogues et problèmes connus. Si vous utilisez KCL 2.6.0 ou une version antérieure, passez à KCL 2.6.1 ou version ultérieure pour éviter une situation rare susceptible de bloquer le traitement des partitions lorsque la capacité du flux change. 
KCL est une bibliothèque Java. Support pour les langages autres que Java est fourni à l'aide d'un démon basé sur Java appelé. MultiLangDaemon MultiLangDaemoninteragit avec l'application KCL via STDIN et STDOUT. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, consultez[Développez vos clients avec KCL dans des langages autres que Java](develop-kcl-consumers-non-java.md).
N'utilisez pas les AWS SDK pour Java versions 2.27.19 à 2.27.23 avec KCL 3.x. Ces versions incluent un problème qui provoque une erreur d'exception liée à l'utilisation de DynamoDB par KCL. Nous vous recommandons d'utiliser la AWS SDK pour Java version 2.28.0 ou ultérieure pour éviter ce problème. 

## Principales fonctionnalités et avantages du KCL
<a name="kcl-benefits"></a>

Voici les principales caractéristiques et les avantages connexes du KCL :
+ **Évolutivité** : KCL permet aux applications d'évoluer de manière dynamique en répartissant la charge de traitement entre plusieurs travailleurs. Vous pouvez effectuer une mise à l'échelle initiale ou négative de votre application, manuellement ou à l'aide de l'auto-scaling, sans vous soucier de la redistribution de la charge.
+ **Équilibrage de charge** : KCL équilibre automatiquement la charge de traitement entre les travailleurs disponibles, ce qui permet une répartition uniforme du travail entre les travailleurs.
+ **Point de contrôle** : KCL gère le point de contrôle des enregistrements traités, ce qui permet aux candidatures de reprendre le traitement à partir de leur dernière position traitée avec succès.
+ **Tolérance aux pannes** : KCL fournit des mécanismes de tolérance aux pannes intégrés, garantissant que le traitement des données se poursuit même en cas d'échec individuel des travailleurs. KCL assure également la at-least-once livraison.
+ **Gestion des modifications au niveau du flux** : KCL s'adapte aux divisions de partitions et aux fusions susceptibles de se produire en raison de modifications du volume de données. Il maintient l'ordre en veillant à ce que les partitions enfants ne soient traitées qu'une fois que la partition parent est terminée et contrôlée.
+ **Surveillance** : KCL s'intègre à Amazon CloudWatch pour une surveillance au niveau des consommateurs.
+ **Support multilingue** : KCL prend en charge Java de manière native et permet l'utilisation de plusieurs langages de programmation autres que Java. MultiLangDaemon

# Concepts du KCL
<a name="kcl-concepts"></a>

Cette section explique les concepts fondamentaux et les interactions de la Kinesis Client Library (KCL). Ces concepts sont fondamentaux pour le développement et la gestion des applications grand public KCL.
+ **Application client KCL : application** personnalisée conçue pour lire et traiter les enregistrements issus des flux de données Kinesis à l'aide de la bibliothèque cliente Kinesis.
+ **Travailleur** : les applications grand public KCL sont généralement distribuées, avec un ou plusieurs travailleurs exécutés simultanément. KCL coordonne les travailleurs pour qu'ils consomment les données du flux de manière distribuée et équilibre la charge de manière uniforme entre plusieurs travailleurs.
+ **Planificateur** : classe de haut niveau qu'un utilisateur KCL utilise pour commencer à traiter des données. Chaque collaborateur KCL dispose d'un planificateur. Le planificateur initialise et supervise diverses tâches, notamment la synchronisation des informations relatives aux partitions issues des flux de données Kinesis, le suivi des affectations de partitions entre les travailleurs et le traitement des données issues du flux en fonction des partitions attribuées au travailleur. Le planificateur peut prendre différentes configurations qui affectent le comportement du planificateur, telles que le nom du flux à traiter et les informations d'identification. AWS Le planificateur lance la livraison des enregistrements de données du flux aux processeurs d'enregistrements.
+ **Processeur d'enregistrements** : définit la logique selon laquelle votre application client KCL traite les données qu'elle reçoit des flux de données. Vous devez implémenter votre propre logique de traitement des données personnalisée dans le processeur d'enregistrements. Un worker KCL instancie un planificateur. Le planificateur instancie ensuite un processeur d'enregistrement pour chaque partition qu'il loue. Un travailleur peut exécuter plusieurs processeurs d'enregistrement.
+ **Bail** : définit l'affectation entre un travailleur et une partition. Les applications grand public de KCL utilisent des contrats de location pour répartir le traitement des enregistrements de données entre plusieurs travailleurs. Chaque partition est liée à un seul travailleur par un bail à un moment donné et chaque travailleur peut détenir un ou plusieurs baux simultanément. Lorsqu'un travailleur cesse de détenir un bail pour cause d'arrêt ou d'échec, KCL désigne un autre travailleur pour prendre le bail. Pour en savoir plus sur le bail, consultez la [documentation Github : Lease Lifecycle](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle).
+ **Table des baux** : il s'agit d'une table Amazon DynamoDB unique utilisée pour suivre tous les baux de l'application client KCL. Chaque application client KCL crée sa propre table de location. La table des baux est utilisée pour maintenir l'état de tous les travailleurs afin de coordonner le traitement des données. Pour de plus amples informations, veuillez consulter [Tables de métadonnées DynamoDB et équilibrage de charge dans KCL](kcl-dynamoDB.md).
+ **Point de contrôle** : processus qui consiste à stocker de manière persistante la position du dernier enregistrement traité avec succès dans une partition. KCL gère le point de contrôle pour s'assurer que le traitement peut être repris à partir de la dernière position du point de contrôle en cas de défaillance d'un travailleur ou de redémarrage de l'application. Les points de contrôle sont stockés dans la table des baux DynamoDB dans le cadre des métadonnées du bail. Cela permet aux travailleurs de poursuivre le traitement là où le précédent travailleur s'est arrêté.

# Tables de métadonnées DynamoDB et équilibrage de charge dans KCL
<a name="kcl-dynamoDB"></a>

KCL gère les métadonnées telles que les baux et les mesures d'utilisation du processeur fournies par les travailleurs. KCL assure le suivi de ces métadonnées à l'aide de tables DynamoDB. Pour chaque application Amazon Kinesis Data Streams, KCL crée trois tables DynamoDB pour gérer les métadonnées : table des baux, table des métriques des travailleurs et table de l'état des coordinateurs.

**Note**  
KCL 3.x a introduit deux nouvelles tables de métadonnées : les *métriques de travail et les* tables d'*état des coordinateurs*.

**Important**  
 Vous devez ajouter les autorisations appropriées pour que les applications KCL puissent créer et gérer des tables de métadonnées dans DynamoDB. Pour en savoir plus, consultez [Autorisations IAM requises pour les applications grand public KCL](kcl-iam-permissions.md).  
L'application client KCL ne supprime pas automatiquement ces trois tables de métadonnées DynamoDB. Assurez-vous de supprimer ces tables de métadonnées DynamoDB créées par l'application client KCL lorsque vous mettez hors service votre application client afin d'éviter des coûts inutiles.

## Tableau des baux
<a name="kcl-leasetable"></a>

Une table de location est une table Amazon DynamoDB unique utilisée pour suivre les partitions louées et traitées par les planificateurs de l'application client KCL. Chaque application client KCL crée sa propre table de location. KCL utilise le nom de l'application client comme nom de la table des baux par défaut. Vous pouvez définir un nom de table personnalisé à l'aide de la configuration. KCL crée également un [index secondaire global](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html) sur la table des baux avec la clé de partition de LeaseOwner pour une découverte efficace des baux. L'index secondaire global reflète l'attribut LeaseKey de la table des baux de base. Si la table des baux de votre application client KCL n'existe pas au démarrage de l'application, l'un des travailleurs crée la table des baux pour votre application.

Vous pouvez consulter cette table à l'aide de la [console Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) lors que l'application est en cours d'exécution.

**Important**  
Chaque nom d'application client KCL doit être unique afin d'éviter la duplication du nom de la table des baux. 
Votre compte est facturé pour les coûts associés à la table DynamoDB, en plus des coûts associés au service Kinesis Data Streams lui-même. 

Chaque ligne du tableau des baux représente une partition en cours de traitement par les planificateurs de votre application client. Les principaux champs sont les suivants :
+ **LeaseKey** : pour le traitement en flux unique, il s'agit de l'ID de partition. Pour le traitement multi-flux avec KCL, il est structuré comme suit : `account-id:StreamName:streamCreationTimestamp:ShardId` LeaseKey est la clé de partition de la table des baux. Pour plus d'informations sur le traitement multiflux, consultez[Traitement multi-flux avec KCL](kcl-multi-stream.md).
+ **checkpoint :** le plus récent numéro de séquence de point de contrôle de la partition. 
+ **checkpointSubSequenceNuméro :** lorsque vous utilisez la fonctionnalité d'agrégation de la bibliothèque Kinesis Producer, il s'agit d'une extension du **point de contrôle** qui permet de suivre les enregistrements utilisateur individuels au sein de l'enregistrement Kinesis.
+ **LeaseCounter** : utilisé pour vérifier si un travailleur traite actuellement activement le bail. LeaseCounter augmente si la propriété du bail est transférée à un autre travailleur.
+ **LeaseOwner** : le travailleur actuel titulaire de ce bail.
+ **ownerSwitchesSincePoint de contrôle :** combien de fois ce bail a changé de travailleurs depuis le dernier point de contrôle.
+ **parentShardId:** ID du parent de cette partition. Assurez-vous que la partition parent est entièrement traitée avant que le traitement ne commence sur les partitions enfants, en maintenant le bon ordre de traitement des enregistrements.
+ **childShardId:** Liste des fragments enfants IDs résultant de la division ou de la fusion de ce fragment. Utilisé pour suivre la lignée des partitions et gérer l'ordre de traitement lors des opérations de repartage.
+ **startingHashKey:** limite inférieure de la plage de clés de hachage pour cette partition.
+ **endingHashKey:** limite supérieure de la plage de clés de hachage pour cette partition.

Si vous utilisez le traitement multi-flux avec KCL, les deux champs supplémentaires suivants s'affichent dans le tableau des baux. Pour de plus amples informations, veuillez consulter [Traitement multi-flux avec KCL](kcl-multi-stream.md).
+ **shardID :** ID de la partition.
+ **StreamName :** identifiant du flux de données au format suivant :`account-id:StreamName:streamCreationTimestamp`.

## Tableau des indicateurs relatifs aux travailleurs
<a name="kcl-worker-metrics-table"></a>

La table des métriques de travail est une table Amazon DynamoDB unique pour chaque application KCL et est utilisée pour enregistrer les métriques d'utilisation du processeur de chaque travailleur. Ces indicateurs seront utilisés par KCL pour effectuer des assignations de location efficaces afin d'assurer une utilisation équilibrée des ressources entre les travailleurs. KCL utilise le nom `KCLApplicationName-WorkerMetricStats` de la table des métriques de travail par défaut.

## Tableau des états des coordinateurs
<a name="kcl-coordinator-state-table"></a>

Une table d'état des coordinateurs est une table Amazon DynamoDB unique pour chaque application KCL. Elle est utilisée pour stocker les informations d'état internes des travailleurs. Par exemple, la table d'état du coordinateur stocke les données concernant l'élection du leader ou les métadonnées associées à la migration sur place de KCL 2.x vers KCL 3.x. KCL utilise le nom `KCLApplicationName-CoordinatorState` de la table d'état du coordinateur par défaut.

## Mode de capacité DynamoDB pour les tables de métadonnées créées par KCL
<a name="kcl-capacity-mode"></a>

[Par défaut, la bibliothèque cliente Kinesis (KCL) crée des tables de métadonnées DynamoDB telles que la table des baux, la table des métriques des travailleurs et la table d'état des coordinateurs en utilisant le mode de capacité à la demande.](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html) Ce mode adapte automatiquement la capacité de lecture et d'écriture pour s'adapter au trafic sans nécessiter de planification de la capacité. Nous vous recommandons vivement de conserver le mode capacité en mode à la demande pour un fonctionnement plus efficace de ces tables de métadonnées.

Si vous décidez de passer la table des baux en [mode capacité provisionnée](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html), suivez les meilleures pratiques suivantes :
+ Analysez les modèles d'utilisation :
  + Surveillez les modèles et les utilisations de lecture et d'écriture de votre application (RCU, WCU) à l'aide des métriques Amazon CloudWatch .
  + Comprenez les exigences de débit de pointe et de débit moyen.
+ Calculez la capacité requise :
  + Estimez les unités de capacité de lecture (RCUs) et les unités de capacité d'écriture (WCUs) en fonction de votre analyse.
  + Tenez compte de facteurs tels que le nombre de fragments, la fréquence des points de contrôle et le nombre de travailleurs.
+ Implémentez le dimensionnement automatique :
  + Utilisez le dimensionnement [automatique DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling) pour ajuster automatiquement la capacité allouée et définir les limites de capacité minimale et maximale appropriées. 
  + La mise à l'échelle automatique DynamoDB vous aidera à éviter que votre table de métadonnées KCL n'atteigne la limite de capacité et ne soit limitée.
+ Surveillance et optimisation régulières :
  + Surveillez en permanence CloudWatch les métriques pour`ThrottledRequests`.
  + Ajustez la capacité en fonction de l'évolution de votre charge de travail.

Si vous rencontrez des tables DynamoDB intégrées aux métadonnées pour votre application client KCL, vous devez augmenter la capacité de débit allouée à la table DynamoDB. `ProvisionedThroughputExceededException` Si vous définissez un certain niveau d'unités de capacité de lecture (RCU) et d'unités de capacité d'écriture (WCU) lorsque vous créez votre application grand public pour la première fois, il se peut que cela ne soit pas suffisant à mesure que votre utilisation augmente. Par exemple, si votre application client KCL effectue des points de contrôle fréquents ou fonctionne sur un flux contenant de nombreux fragments, vous aurez peut-être besoin d'unités de capacité supplémentaires. Pour plus d'informations sur le débit alloué dans DynamoDB, consultez les sections [Capacité de débit DynamoDB et [mise](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable) à jour d'un tableau dans le manuel Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html) Developer Guide.

## Comment KCL attribue les baux aux travailleurs et équilibre la charge
<a name="kcl-assign-leases"></a>

KCL collecte et surveille en permanence les indicateurs d'utilisation du processeur à partir des hôtes de calcul qui exécutent les travailleurs afin de garantir une répartition uniforme de la charge de travail. Ces mesures d'utilisation du processeur sont stockées dans le tableau des métriques de travail de DynamoDB. Si KCL détecte que certains travailleurs affichent des taux d'utilisation du processeur plus élevés que d'autres, elle réattribuera les baux entre les travailleurs afin de réduire la charge de travail des travailleurs très sollicités. L'objectif est d'équilibrer la charge de travail de manière plus uniforme au sein du parc d'applications grand public, afin d'éviter qu'un seul travailleur ne soit surchargé. À mesure que KCL répartit l'utilisation du processeur sur l'ensemble du parc d'applications grand public, vous pouvez ajuster la capacité de votre parc d'applications grand public en choisissant le bon nombre de travailleurs ou utiliser le dimensionnement automatique pour gérer efficacement la capacité de calcul afin de réduire les coûts.

**Important**  
KCL peut collecter des métriques d'utilisation du processeur auprès des travailleurs uniquement si certaines conditions préalables sont remplies. Pour en savoir plus, consultez [Conditions préalables](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites). Si KCL ne parvient pas à collecter les indicateurs d'utilisation du processeur auprès des travailleurs, KCL utilisera à nouveau le débit par travailleur pour attribuer des baux et équilibrer la charge entre les travailleurs du parc. KCL surveillera le débit reçu par chaque travailleur à un moment donné et réattribuera les baux pour s'assurer que chaque travailleur obtient un niveau de débit total similaire pour les baux qui lui ont été attribués.

# Développez les consommateurs avec KCL
<a name="develop-kcl-consumers"></a>

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) pour créer des applications grand public qui traitent les données issues de vos flux de données Kinesis.

KCL est disponible en plusieurs langues. Cette rubrique explique comment développer des consommateurs KCL dans des langages Java et non-Java.
+ [Pour consulter la référence Javadoc de la bibliothèque cliente Kinesis, consultez le document Javadoc de la bibliothèque cliente Amazon Kinesis.](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html)
+ Pour télécharger KCL pour Java depuis GitHub, consultez la [bibliothèque client Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) pour Java.
+ Pour localiser la KCL pour Java sur Apache Maven, consultez le référentiel central de [KCL Maven](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client).

**Topics**
+ [Développez vos clients avec KCL en Java](develop-kcl-consumers-java.md)
+ [Développez vos clients avec KCL dans des langages autres que Java](develop-kcl-consumers-non-java.md)

# Développez vos clients avec KCL en Java
<a name="develop-kcl-consumers-java"></a>

## Conditions préalables
<a name="develop-kcl-consumers-java-prerequisites"></a>

Avant de commencer à utiliser KCL 3.x, assurez-vous que vous disposez des éléments suivants :
+ Kit de développement Java (JDK) 8 ou version ultérieure
+ AWS SDK pour Java 2. x
+ Maven ou Gradle pour la gestion des dépendances

KCL collecte des mesures d'utilisation du processeur, telles que l'utilisation du processeur, à partir de l'hôte de calcul sur lequel les travailleurs s'exécutent afin d'équilibrer la charge afin d'atteindre un niveau d'utilisation des ressources uniforme pour tous les travailleurs. Pour permettre à KCL de collecter des métriques d'utilisation du processeur auprès des travailleurs, vous devez remplir les conditions préalables suivantes :

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Votre système d'exploitation doit être Linux.
+ Vous devez l'activer [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)dans votre instance EC2.

 **Amazon Elastic Container Service (Amazon ECS) sur Amazon EC2**
+ Votre système d'exploitation doit être Linux.
+ Vous devez activer la [version 4 du point de terminaison des métadonnées des tâches ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La version de votre agent de conteneur Amazon ECS doit être 1.39.0 ou ultérieure.

 **Amazon ECS sur AWS Fargate**
+ Vous devez activer la version 4 [du point de terminaison des métadonnées des tâches Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). Si vous utilisez la version 1.4.0 ou ultérieure de la plateforme Fargate, cette option est activée par défaut. 
+ Version 1.4.0 ou ultérieure de la plateforme Fargate.

 **Amazon Elastic Kubernetes Service (Amazon EKS) sur Amazon EC2** 
+ Votre système d'exploitation doit être Linux.

 **Amazon EKS sur AWS Fargate**
+ Plateforme Fargate 1.3.0 ou version ultérieure.

**Important**  
Si KCL ne peut pas collecter les indicateurs d'utilisation du processeur auprès des travailleurs, KCL utilisera à nouveau le débit par travailleur pour attribuer les baux et équilibrer la charge entre les travailleurs du parc. Pour de plus amples informations, veuillez consulter [Comment KCL attribue les baux aux travailleurs et équilibre la charge](kcl-dynamoDB.md#kcl-assign-leases).

## Installation et ajout de dépendances
<a name="develop-kcl-consumers-java-installation"></a>

Si vous utilisez Maven, ajoutez la dépendance suivante à votre `pom.xml` fichier. Assurez-vous d'avoir remplacé la version 3.x.x par la dernière version de KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Si vous utilisez Gradle, ajoutez ce qui suit à votre `build.gradle` fichier. Assurez-vous d'avoir remplacé la version 3.x.x par la dernière version de KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Vous pouvez vérifier la dernière version de la KCL dans le référentiel [central Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Mettre en œuvre le consommateur
<a name="develop-kcl-consumers-java-implemetation"></a>

Une application KCL destinée aux consommateurs comprend les éléments clés suivants :

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Planificateur](#implementation-scheduler)
+ [Application principale destinée aux consommateurs](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor est le composant central dans lequel réside votre logique métier pour le traitement des enregistrements de flux de données Kinesis. Il définit la manière dont votre application traite les données qu'elle reçoit du flux Kinesis.

Principales responsabilités :
+ Initialiser le traitement d'une partition
+ Traiter des lots d'enregistrements issus du flux Kinesis
+ Arrêter le traitement d'une partition (par exemple, lorsque la partition se divise ou fusionne, ou lorsque le bail est transféré à un autre hôte)
+ Gérez les points de contrôle pour suivre les progrès

Voici un exemple de mise en œuvre :

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Voici une explication détaillée de chaque méthode utilisée dans l'exemple :

**initialiser (InitializationInput) InitializationInput**
+ Objectif : configurer les ressources ou les états nécessaires au traitement des enregistrements.
+ Quand il est appelé : une fois, lorsque KCL attribue une partition à ce processeur d'enregistrement.
+ Points clés :
  + `initializationInput.shardId()`: ID de la partition que ce processeur va gérer.
  + `initializationInput.extendedSequenceNumber()`: le numéro de séquence à partir duquel démarrer le traitement.

**ProcessRecords () ProcessRecordsInput processRecordsInput**
+ Objectif : traiter les enregistrements entrants et éventuellement vérifier la progression des points de contrôle.
+ Quand il est appelé : à plusieurs reprises, tant que le processeur d'enregistrements détient le bail de la partition.
+ Points clés :
  + `processRecordsInput.records()`: liste des enregistrements à traiter.
  + `processRecordsInput.checkpointer()`: Utilisé pour vérifier la progression.
  + Assurez-vous d'avoir géré toutes les exceptions pendant le traitement afin d'éviter que KCL ne tombe en panne.
  + Cette méthode doit être idempotente, car le même enregistrement peut être traité plusieurs fois dans certains scénarios, tels que les données qui n'ont pas été contrôlées avant un crash ou un redémarrage inattendu du travailleur.
  + Videz toujours toutes les données mises en mémoire tampon avant le point de contrôle pour garantir la cohérence des données.

**Bail perdu () LeaseLostInput leaseLostInput**
+ Objectif : Nettoyer toutes les ressources spécifiques au traitement de cette partition.
+ Quand il est appelé : lorsqu'un autre planificateur prend en charge le bail de cette partition.
+ Points clés :
  + Le point de contrôle n'est pas autorisé dans cette méthode.

**Partagé () ShardEndedInput shardEndedInput**
+ Objectif : terminer le traitement de cette partition et de ce point de contrôle.
+ Quand elle est appelée : lorsque la partition se divise ou fusionne, cela indique que toutes les données de cette partition ont été traitées.
+ Points clés :
  + `shardEndedInput.checkpointer()`: Utilisé pour effectuer le point de contrôle final.
  + Le point de contrôle utilisé dans cette méthode est obligatoire pour terminer le traitement.
  + Le fait de ne pas vider les données et de ne pas vérifier le point de contrôle ici peut entraîner une perte de données ou un double traitement lors de la réouverture de la partition.

**Arrêt demandé () ShutdownRequestedInput shutdownRequestedInput**
+ Objectif : Contrôler et nettoyer les ressources lors de la fermeture de KCL.
+ Quand il est appelé : lorsque KCL s'arrête, par exemple, lorsque l'application s'arrête).
+ Points clés :
  + `shutdownRequestedInput.checkpointer()`: Utilisé pour effectuer le pointage avant l'arrêt.
  + Assurez-vous d'avoir implémenté le point de contrôle dans la méthode afin que la progression soit enregistrée avant que l'application ne s'arrête.
  + Le fait de ne pas vider les données et le point de contrôle ici peut entraîner une perte de données ou un retraitement des enregistrements au redémarrage de l'application.

**Important**  
KCL 3.x réduit le retraitement des données lorsque le bail est transféré d'un travailleur à un autre en effectuant un point de contrôle avant que le travailleur précédent ne soit arrêté. Si vous n'implémentez pas la logique de point de contrôle dans la `shutdownRequested()` méthode, vous ne verrez pas cet avantage. Assurez-vous d'avoir implémenté une logique de point de contrôle dans la `shutdownRequested()` méthode.

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory est chargé de créer de nouvelles RecordProcessor instances. KCL utilise cette fabrique pour créer une nouvelle partition RecordProcessor pour chaque partition que l'application doit traiter.

Principales responsabilités :
+ Créez de nouvelles RecordProcessor instances à la demande
+ Assurez-vous que chacun RecordProcessor est correctement initialisé

Voici un exemple de mise en œuvre :

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

Dans cet exemple, la fabrique crée un nouveau SampleRecordProcessor chaque fois que shardRecordProcessor () est appelé. Vous pouvez étendre cela pour inclure toute logique d'initialisation nécessaire.

### Planificateur
<a name="implementation-scheduler"></a>

Le planificateur est un composant de haut niveau qui coordonne toutes les activités de l'application KCL. Il est responsable de l'orchestration globale du traitement des données.

Principales responsabilités :
+ Gérez le cycle de vie de RecordProcessors
+ Gérez la gestion des baux pour les partitions
+ Coordonner le pointage
+ Équilibrez la charge de traitement des partitions entre les différents intervenants de votre application
+ Gérez les signaux d'arrêt et de fin d'application en douceur

Le planificateur est généralement créé et démarré dans l'application principale. Vous pouvez consulter l'exemple d'implémentation de Scheduler dans la section suivante, Application client principale. 

### Application principale destinée aux consommateurs
<a name="implementation-main"></a>

L'application principale destinée aux consommateurs relie tous les composants entre eux. Il est chargé de configurer le consommateur KCL, de créer les clients nécessaires, de configurer le planificateur et de gérer le cycle de vie de l'application.

Principales responsabilités :
+ Configuration des clients AWS de service (Kinesis, DynamoDB,) CloudWatch
+ Configuration de l'application KCL
+ Création et démarrage du planificateur
+ Gérer l'arrêt de l'application

Voici un exemple de mise en œuvre :

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL crée un consommateur EFO (Enhanced Fan-out) avec un débit dédié par défaut. Pour plus d'informations sur la sortie de ventilateur améliorée, consultez. [Développez des clients fans améliorés grâce à un débit dédié](enhanced-consumers.md) Si vous avez moins de 2 consommateurs ou si vous n'avez pas besoin de délais de propagation de lecture inférieurs à 200 ms, vous devez définir la configuration suivante dans l'objet du planificateur pour utiliser des consommateurs à débit partagé :

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

Le code suivant est un exemple de création d'un objet planificateur utilisant des consommateurs à débit partagé :

**Importations** :

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Code** :

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Développez vos clients avec KCL dans des langages autres que Java
<a name="develop-kcl-consumers-non-java"></a>

Cette section décrit la mise en œuvre par les consommateurs de la bibliothèque cliente Kinesis (KCL) en Python, Node.js, .NET et Ruby.

KCL est une bibliothèque Java. Support pour les langages autres que Java est fourni à l'aide d'une interface multilingue appelée. `MultiLangDaemon` Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez une KCL avec un langage autre que Java. Par conséquent, si vous installez KCL pour des langages autres que Java et que vous écrivez votre application grand public entièrement dans des langages autres que Java, vous devez toujours installer Java sur votre système en raison du. `MultiLangDaemon` En outre, `MultiLangDaemon` il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation (par exemple, la région AWS à laquelle il se connecte). Pour plus d'informations sur le `MultiLangDaemon` on GitHub, consultez le [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Bien que les concepts de base restent les mêmes d'une langue à l'autre, certaines considérations et implémentations spécifiques à chaque langue doivent être prises en compte. Pour les concepts de base relatifs au développement des consommateurs de KCL, voir[Développez vos clients avec KCL en Java](develop-kcl-consumers-java.md). Pour des informations plus détaillées sur le développement de consommateurs KCL en Python, Node.js, .NET et Ruby, ainsi que sur les dernières mises à jour, consultez les GitHub référentiels suivants :
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js : [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET : [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Rubis : [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**Important**  
N'utilisez pas les versions de bibliothèque KCL autres que Java suivantes si vous utilisez le JDK 8. Ces versions contiennent une dépendance (logback) incompatible avec le JDK 8.  
KCL Python 3.0.2 et 2.2.0
KCL Node.js 2.3.0
KCL.NET 3.1.0
KCL Ruby 2.2.0
Nous vous recommandons d'utiliser les versions publiées avant ou après les versions concernées lorsque vous travaillez avec le JDK 8.

# Traitement multi-flux avec KCL
<a name="kcl-multi-stream"></a>

Cette section décrit les modifications requises dans KCL qui vous permettent de créer des applications grand public KCL capables de traiter plusieurs flux de données à la fois.
**Important**  
Le traitement multi-flux n'est pris en charge que dans KCL 2.3 ou version ultérieure.
Le traitement multi-flux *n'est pas* pris en charge pour les utilisateurs KCL écrits dans des langages autres que Java qui s'exécutent avec. `multilangdaemon`
Le traitement multi-flux *n'*est pris en charge dans aucune version de KCL 1.x.
+ **MultistreamTracker interface**
  + Pour créer une application grand public capable de traiter plusieurs flux en même temps, vous devez implémenter une nouvelle interface appelée [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Cette interface inclut la méthode `streamConfigList` qui renvoie la liste des flux de données et leurs configurations à traiter par l'application consommateur KCL. Notez que les flux de données en cours de traitement peuvent être modifiés pendant l'exécution de l'application grand public. `streamConfigList`est appelé périodiquement par KCL pour prendre connaissance de l'évolution des flux de données à traiter.
  + Le `streamConfigList` renseigne la [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)liste.

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + Les champs `StreamIdentifier` et `InitialPositionInStreamExtended` sont obligatoires, alors qu'ils `consumerArn` sont facultatifs. Vous devez le fournir `consumerArn` uniquement si vous utilisez KCL pour implémenter une application grand public améliorée.
  + Pour plus d'informations sur`StreamIdentifier`, consultez [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). Pour créer une`StreamIdentifier`, nous vous recommandons de créer une instance multistream à partir de `streamArn` et `streamCreationEpoch` qui est disponible dans KCL 2.5.0 ou version ultérieure. Dans KCL v2.3 et v2.4, qui ne sont pas compatibles`streamArm`, créez une instance multistream en utilisant le format. `account-id:StreamName:streamCreationTimestamp` Ce format sera obsolète et ne sera plus pris en charge à compter de la prochaine version majeure.
  +  MultistreamTracker inclut également une stratégie pour supprimer les baux des anciens flux dans la table des baux (formerStreamsLeasesDeletionStrategy). Notez que la stratégie NE PEUT PAS être modifiée pendant l'exécution de l'application consommateur. Pour plus d'informations, consultez [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0 .java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java).
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)est une classe à l'échelle de l'application que vous pouvez utiliser pour spécifier tous les paramètres de configuration KCL à utiliser lors de la création de votre application client KCL pour KCL version 2.x ou ultérieure. `ConfigsBuilder`la classe prend désormais en charge l'`MultistreamTracker`interface. Vous pouvez initialiser l'un ConfigsBuilder ou l'autre avec le nom du flux de données à partir duquel les enregistrements seront consommés : 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

Vous pouvez également l'initialiser ConfigsBuilder avec `MultiStreamTracker` si vous souhaitez implémenter une application client KCL qui traite plusieurs flux en même temps.

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ Grâce au support multi-flux mis en œuvre pour votre application client KCL, chaque ligne de la table des baux de l'application contient désormais l'ID de partition et le nom de flux des multiples flux de données traités par cette application.
+ Lorsque le support multi-flux pour votre application client KCL est implémenté, le LeaseKey adopte la structure suivante :. `account-id:StreamName:streamCreationTimestamp:ShardId` Par exemple, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

**Important**  
Lorsque votre application client KCL existante est configurée pour traiter un seul flux de données, le `leaseKey` (qui est la clé de partition pour la table des baux) est l'ID de partition. Si vous reconfigurez une application client KCL existante pour traiter plusieurs flux de données, cela interrompt votre table de location, car la `leaseKey` structure doit être la suivante : `account-id:StreamName:StreamCreationTimestamp:ShardId` pour prendre en charge le multi-flux.

# Utiliser le registre des AWS Glue schémas avec KCL
<a name="kcl-glue-schema"></a>

Vous pouvez intégrer Kinesis Data Streams au registre AWS Glue des schémas. Le registre des AWS Glue schémas vous permet de découvrir, de contrôler et de faire évoluer les schémas de manière centralisée, tout en garantissant que les données produites sont continuellement validées par un schéma enregistré. Un schéma définit la structure et le format d'un enregistrement de données. Un schéma est une spécification versionnée pour la publication, la consommation ou le stockage des données fiables. Le registre AWS Glue Schema vous permet d'améliorer la qualité end-to-end des données et la gouvernance des données au sein de vos applications de streaming. Pour plus d’informations, consultez [Registre de schémas AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). L'un des moyens de configurer cette intégration consiste à utiliser KCL pour Java.

**Important**  
AWS Glue L'intégration du registre des schémas pour Kinesis Data Streams n'est prise en charge que dans KCL 2.3 ou version ultérieure.
AWS Glue L'intégration du registre des schémas pour Kinesis Data Streams n'*est* pas prise en charge pour les utilisateurs KCL écrits dans des langages autres que Java qui s'exécutent avec. `multilangdaemon`
AWS Glue L'intégration du registre des schémas pour Kinesis Data Streams n'*est* prise en charge dans aucune version de KCL 1.x.

Pour obtenir des instructions détaillées sur la façon de configurer l'intégration de Kinesis Data Streams au registre de AWS Glue schémas à l'aide de KCL, consultez la section « Interaction avec les données à l'aide des KPL/KCL bibliothèques » dans [Cas d'utilisation : intégration d'Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) au registre de schémas. AWS Glue 

# Autorisations IAM requises pour les applications grand public KCL
<a name="kcl-iam-permissions"></a>

 Vous devez ajouter les autorisations suivantes au rôle ou à l'utilisateur IAM associé à votre application client KCL. 

 Les meilleures pratiques de sécurité AWS dictent l'utilisation d'autorisations détaillées pour contrôler l'accès aux différentes ressources. Gestion des identités et des accès AWS (IAM) vous permet de gérer les utilisateurs et les autorisations des utilisateurs dans AWS. Une stratégie IAM répertorie explicitement les actions autorisées et les ressources sur lesquelles les actions sont applicables.

Le tableau suivant indique les autorisations IAM minimales généralement requises pour les applications grand public KCL :


**Autorisations IAM minimales pour les applications grand public KCL**  

| Service | Actions | Ressources (ARNs) | Objectif | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  Flux de données Kinesis à partir duquel votre application KCL traitera les données.`arn:aws:kinesis:region:account:stream/StreamName`  |  Avant d'essayer de lire les enregistrements, l'application consommateur vérifie si le flux existe, s’il est actif et si les partitions sont contenues dans le flux de données. Enregistre les consommateurs sur un shard.  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | Flux de données Kinesis à partir duquel votre application KCL traitera les données.`arn:aws:kinesis:region:account:stream/StreamName` |  Lit les enregistrements d'une partition.  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  Flux de données Kinesis à partir duquel votre application KCL traitera les données. Ajoutez cette action uniquement si vous utilisez des consommateurs EFO (Enhanced Fan-Out). `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  S'abonne à un shard pour les consommateurs bénéficiant d'un système EFO (Enhanced Fan-Out).  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Table de location (table de métadonnées dans DynamoDB) créée par KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  Ces actions sont nécessaires pour que KCL puisse gérer la table des baux créée dans DynamoDB.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Indicateurs de travail et table d'état des coordinateurs (tables de métadonnées dans DynamoDB) créés par KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  Ces actions sont nécessaires pour que KCL puisse gérer les métriques de travail et les tables de métadonnées d'état des coordinateurs dans DynamoDB.  | 
| Amazon DynamoDB | `Query` |  Indice secondaire mondial sur le tableau des baux. `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  Cette action est requise pour que KCL puisse lire l'index secondaire global de la table des baux créée dans DynamoDB.  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  Vous pouvez y télécharger CloudWatch des métriques utiles pour surveiller l'application. L'astérisque (\$1) est utilisé car il n'existe aucune ressource spécifique CloudWatch sur laquelle l'`PutMetricData`action est invoquée.   | 

**Note**  
Remplacez « région », « compte »StreamName, et « KCLApplication nom » par votre propre Compte AWS numéro Région AWS, le ARNs nom du flux de données Kinesis et le nom de l'application KCL respectivement. KCL 3.x crée deux autres tables de métadonnées dans DynamoDB. Pour plus de détails sur les tables de métadonnées DynamoDB créées par KCL, consultez. [Tables de métadonnées DynamoDB et équilibrage de charge dans KCL](kcl-dynamoDB.md) Si vous utilisez des configurations pour personnaliser les noms des tables de métadonnées créées par KCL, utilisez les noms de table spécifiés au lieu du nom de l'application KCL. 

Voici un exemple de document de politique pour une application client KCL. 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Avant d'utiliser cet exemple de politique, vérifiez les points suivants :
+ Remplacez REGION par votre Région AWS (par exemple, us-east-1).
+ Remplacez ACCOUNT\$1ID par votre Compte AWS identifiant.
+ Remplacez STREAM\$1NAME par le nom de votre flux de données Kinesis.
+ Remplacez CONSUMER\$1NAME par le nom de votre consommateur, généralement le nom de votre application lorsque vous utilisez KCL.
+ Remplacez KCL\$1APPLICATION\$1NAME par le nom de votre application KCL.

# Configurations KCL
<a name="kcl-configuration"></a>

Vous pouvez définir les propriétés de configuration pour personnaliser les fonctionnalités de la bibliothèque cliente Kinesis afin de répondre à vos besoins spécifiques. Le tableau suivant décrit les propriétés et les classes de configuration.

**Important**  
Dans KCL 3.x, l'algorithme d'équilibrage de charge vise à obtenir une utilisation uniforme du processeur entre les travailleurs, et non un nombre égal de baux par travailleur. Si cette `maxLeasesForWorker` valeur est trop faible, vous risquez de limiter la capacité de KCL à équilibrer efficacement la charge de travail. Si vous utilisez la `maxLeasesForWorker` configuration, pensez à augmenter sa valeur pour permettre la meilleure répartition de charge possible.


**Ce tableau présente les propriétés de configuration de KCL**  

| Propriété de configuration | Classe de configuration | Description | Valeur par défaut | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | Nom de cette application KCL. Utilisé par défaut pour le tableName et le consumerName. | Non applicable | 
| tableName | ConfigsBuilder |  Permet de remplacer le nom du tableau utilisé par le tableau des baux Amazon DynamoDB.  | Non applicable | 
| streamName | ConfigsBuilder |  Nom du flux à partir duquel cette application traite les enregistrements.  | Non applicable | 
| workerIdentifier | ConfigsBuilder |  Identifiant unique qui représente cette instanciation du processeur d'applications. Il doit être unique.  | Non applicable | 
| failoverTimeMillis | LeaseManagementConfig |  Nombre de millisecondes qui doivent s'écouler avant que vous puissiez considérer qu'un bail propriétaire a échoué. Pour les applications comportant un grand nombre de partitions, ce nombre peut être défini sur un nombre plus élevé afin de réduire le nombre d'IOPS DynamoDB nécessaires au suivi des baux.  | 10 000 (10 secondes) | 
| shardSyncIntervalMillis | LeaseManagementConfig |  Délai entre les appels de synchronisation des partitions.  | 60 000 (60 secondes) | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  Lorsqu'ils sont définis, les baux sont supprimés dès que les baux enfant ont commencé le traitement.  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  Lorsqu'elles sont définies, les partitions enfant ont une partition ouverte qui est ignorée. Cela concerne principalement DynamoDB Streams.  | FALSE | 
| maxLeasesForWorker | LeaseManagementConfig |  Le nombre maximum de baux qu'un seul travailleur doit accepter. Une valeur trop faible peut entraîner une perte de données si les travailleurs ne peuvent pas traiter toutes les partitions, et entraîner une attribution de bail sous-optimale entre les travailleurs. Tenez compte du nombre total de partitions, du nombre de travailleurs et de la capacité de traitement des travailleurs lors de sa configuration.  | Illimité | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  Contrôle la taille du pool de threads des renouvellements de baux. Plus votre application accepte de baux, plus la taille du pool doit être importante.  | 20 | 
| billingMode | LeaseManagementConfig |  Détermine le mode de capacité de la table des baux créée dans DynamoDB. Il existe deux options : le mode à la demande (PAY\$1PER\$1REQUEST) et le mode provisionné. Nous vous recommandons d'utiliser le paramètre par défaut du mode à la demande, car il s'adapte automatiquement à votre charge de travail sans qu'il soit nécessaire de planifier les capacités.  | PAY\$1PER\$1REQUEST (mode à la demande) | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Capacité de lecture DynamoDB utilisée si la bibliothèque cliente Kinesis doit créer une nouvelle table de location DynamoDB en mode capacité provisionnée. Vous pouvez ignorer cette configuration si vous utilisez le mode de capacité à la demande par défaut dans la billingMode configuration. | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Capacité de lecture DynamoDB utilisée si la bibliothèque cliente Kinesis doit créer une nouvelle table de bail DynamoDB. Vous pouvez ignorer cette configuration si vous utilisez le mode de capacité à la demande par défaut dans la billingMode configuration. | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  La position initiale dans le flux à laquelle l'application doit commencer. Elle est utilisée uniquement lors de la création de bail initiale.  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  Une valeur en pourcentage qui détermine à quel moment l'algorithme d'équilibrage de charge doit envisager de réaffecter des partitions entre les travailleurs. Il s'agit d'une nouvelle configuration introduite dans KCL 3.x.  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  Une valeur en pourcentage qui est utilisée pour atténuer la quantité de charge qui sera déplacée par le travailleur surchargé lors d'une seule opération de rééquilibrage. Il s'agit d'une nouvelle configuration introduite dans KCL 3.x.  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  Détermine si un contrat de location supplémentaire doit encore être souscrit auprès du travailleur surchargé, même si cela fait en sorte que le débit total de location utilisé dépasse le débit souhaité. Il s'agit d'une nouvelle configuration introduite dans KCL 3.x.  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  Détermine si KCL doit ignorer les indicateurs de ressources fournis par les travailleurs (tels que l'utilisation du processeur) lors de la réattribution des baux et de l'équilibrage de charge. Définissez ce paramètre sur TRUE si vous souhaitez empêcher KCL d'équilibrer la charge en fonction de l'utilisation du processeur. Il s'agit d'une nouvelle configuration introduite dans KCL 3.x.  | FALSE | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  Montant du débit maximal à attribuer à un travailleur lors de l'attribution du bail. Il s'agit d'une nouvelle configuration introduite dans KCL 3.x.  | Illimité | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  Contrôle le comportement du transfert des baux entre les travailleurs. Lorsque ce paramètre est défini sur true, KCL tentera de transférer les baux de manière harmonieuse en laissant RecordProcessor suffisamment de temps à la partition pour terminer le traitement avant de transférer le bail à un autre travailleur. Cela peut contribuer à garantir l'intégrité des données et des transitions fluides, mais peut augmenter le temps de transfert. Lorsqu'il est défini sur false, le bail sera annulé immédiatement sans attendre qu' RecordProcessor il soit définitivement résilié. Cela peut accélérer les transferts mais risque de ne pas aboutir à un traitement incomplet. Remarque : Le point de contrôle doit être implémenté dans la méthode shutdownRequested () du RecordProcessor pour bénéficier de la fonctionnalité de transfert de bail gracieux. Il s'agit d'une nouvelle configuration introduite dans KCL 3.x.  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  Spécifie le temps minimum (en millisecondes) pendant lequel il faut attendre que la partition actuelle s'éteigne correctement avant de RecordProcessor transférer de force le bail au propriétaire suivant. Si votre méthode ProcessRecords s'exécute généralement plus longtemps que la valeur par défaut, pensez à augmenter ce paramètre. Cela garantit RecordProcessor qu'il dispose de suffisamment de temps pour terminer son traitement avant le transfert de bail. Il s'agit d'une nouvelle configuration introduite dans KCL 3.x.  | 30 000 (30 secondes) | 
| maxRecords | PollingConfig |  Permet de définir le nombre maximum d'enregistrements renvoyés par Kinesis.  | 10 000 | 
| retryGetRecordsInSeconds | PollingConfig |  Configure le délai entre les GetRecords tentatives d'échec.  | Aucune | 
| maxGetRecordsThreadPool | PollingConfig |  La taille du pool de threads utilisée pour GetRecords.  | Aucune | 
| idleTimeBetweenReadsInMillis | PollingConfig |  Détermine la durée pendant laquelle KCL attend entre les GetRecords appels pour interroger les données des flux de données. L'unité est la milliseconde.  | 1 500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  Lorsqu'il est défini, le processeur d'enregistrements est appelé même si aucun enregistrement n'a été fourni depuis Kinesis.  | FALSE | 
| parentShardPollIntervalMillis | CoordinatorConfig |  À quelle fréquence un processeur d'enregistrements doit-il interroger pour voir si la partition parent est terminée. L'unité est la milliseconde.  | 10 000 (10 secondes) | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  Désactivez la synchronisation des données de partition si la table des baux contient des baux existants.  |  FALSE  | 
| shardPrioritization | CoordinatorConfig |  Définition des priorités de partition à utiliser.  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  Détermine le mode de compatibilité des versions de KCL dans lequel l'application sera exécutée. Cette configuration est uniquement destinée à la migration depuis les versions précédentes de KCL. Lors de la migration vers la version 3.x, vous devez définir cette configuration sur. `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` Vous pouvez supprimer cette configuration lorsque vous avez terminé la migration.  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  Temps d'attente avant de réessayer les tâches KCL ayant échoué. L'unité est la milliseconde.  | 500 (0,5 seconde) | 
| logWarningForTaskAfterMillis | LifecycleConfig |  Temps d'attente avec la consignation d'un avertissement si une tâche n'a pas été terminée.  | Aucune | 
| listShardsBackoffTimeInMillis | RetrievalConfig | Nombre de millisecondes à attendre entre les appels de ListShards en cas de défaillance. L'unité est la milliseconde. | 1 500 (1,5 seconde) | 
| maxListShardsRetryAttempts | RetrievalConfig | Nombre maximum de nouvelles tentatives par ListShards avant l'abandon. | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  Spécifie la durée maximale (en millisecondes) pendant laquelle les métriques doivent être mises en mémoire tampon avant de les publier. CloudWatch  | 10 000 (10 secondes) | 
| metricsMaxQueueSize | MetricsConfig |  Spécifie le nombre maximum de métriques à mettre en mémoire tampon avant de les publier CloudWatch.  | 10 000 | 
| metricsLevel | MetricsConfig |  Spécifie le niveau de granularité des CloudWatch métriques à activer et à publier.  Valeurs possibles : NONE, SUMMARY, DETAILED.  |  MetricsLevel.DÉTAILLÉ  | 
| metricsEnabledDimensions | MetricsConfig |  Contrôle les dimensions autorisées pour CloudWatch les métriques.  | Toutes les dimensions | 

**Configurations abandonnées dans KCL 3.x**

Les propriétés de configuration suivantes ne sont plus disponibles dans KCL 3.x :


**Le tableau indique les propriétés de configuration abandonnées pour KCL 3.x**  

| Propriété de configuration | Classe de configuration | Description | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  Nombre maximum de baux qu'une application doit tenter de voler à la fois. KCL 3.x ignorera cette configuration et réattribuera les baux en fonction de l'utilisation des ressources par les travailleurs.  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  Contrôle si les travailleurs doivent donner la priorité aux baux très expirés (les baux ne sont pas renouvelés pendant 3 fois le temps de basculement) et aux nouveaux baux partiels, quel que soit le nombre de baux cibles, tout en respectant les limites maximales des baux. KCL 3.x ignorera cette configuration et répartira toujours les baux expirés entre les travailleurs.  | 

**Important**  
Vous devez toujours disposer des propriétés de configuration interrompues lors de la migration des versions précédentes de KCL vers KCL 3.x. Au cours de la migration, le programme de travail KCL commence par le mode compatible KCL 2.x et passe au mode de fonctionnalité KCL 3.x lorsqu'il détecte que tous les programmes de travail KCL de l'application sont prêts à exécuter KCL 3.x. Ces configurations abandonnées sont nécessaires lorsque les utilisateurs de KCL exécutent le mode compatible KCL 2.x.

# Politique de cycle de vie des versions de KCL
<a name="kcl-version-lifecycle-policy"></a>

Cette rubrique décrit la politique de cycle de vie des versions pour Amazon Kinesis Client Library (KCL). AWS fournit régulièrement de nouvelles versions pour les versions de KCL afin de prendre en charge les nouvelles fonctionnalités et améliorations, les corrections de bogues, les correctifs de sécurité et les mises à jour de dépendances. Nous vous recommandons de vous en tenir up-to-date aux versions de KCL pour vous tenir au courant des dernières fonctionnalités, des mises à jour de sécurité et des dépendances sous-jacentes. Nous **ne recommandons pas** de continuer à utiliser une version KCL non prise en charge.

Le cycle de vie des principales versions de KCL comprend les trois phases suivantes :
+ **Disponibilité générale (GA)** — Au cours de cette phase, la version majeure est entièrement prise en charge. AWS fournit régulièrement des versions mineures et des correctifs qui incluent la prise en charge de nouvelles fonctionnalités ou des mises à jour d'API pour Kinesis Data Streams, ainsi que des correctifs de bogues et de sécurité.
+ **Mode maintenance** : AWS limite les versions de correctifs publiées uniquement pour résoudre les corrections de bogues critiques et les problèmes de sécurité. La version majeure ne recevra pas de mises à jour relatives aux nouvelles fonctionnalités ou APIs à Kinesis Data Streams.
+ **E nd-of-support** — La version majeure ne recevra plus de mises à jour ni de versions. Les versions précédemment publiées continueront d'être disponibles via les gestionnaires de packages publics et le code restera activé GitHub. L'utilisation d'une version atteinte se end-of-support fait à la discrétion de l'utilisateur. Nous vous recommandons de passer à la dernière version majeure.


| Version majeure | Phase en cours | Date de publication | Date du mode de maintenance | End-of-support date | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | Mode de maintenance | 2013-12-19 | 17/04/2025 | 30-01-2026/ | 
| KCL 2.x | Disponibilité générale | 02/08/2018 | -- | -- | 
| KCL 3.x | Disponibilité générale | 06/11/2024 | -- | -- | 

# Migrer depuis les versions précédentes de KCL
<a name="kcl-migration-previous-versions"></a>

Cette rubrique explique comment effectuer une migration depuis des versions précédentes de la Kinesis Client Library (KCL). 

## Quelles sont les nouveautés de KCL 3.0 ?
<a name="kcl-migration-new-3-0"></a>

Kinesis Client Library (KCL) 3.0 apporte plusieurs améliorations majeures par rapport aux versions précédentes :
+  Il réduit les coûts de calcul pour les applications grand public en redistribuant automatiquement le travail des employés surutilisés vers les travailleurs sous-utilisés du parc d'applications grand public. Ce nouvel algorithme d'équilibrage de charge garantit une utilisation du processeur uniformément répartie entre les travailleurs et élimine le besoin de surprovisionner les travailleurs.
+  Il réduit le coût DynamoDB associé à KCL en optimisant les opérations de lecture sur la table des baux.
+ Il minimise le retraitement des données lorsque les baux sont réaffectés à un autre travailleur en permettant au travailleur actuel de vérifier les enregistrements qu'il a traités.
+  Il utilise AWS SDK for Java 2.x pour améliorer les performances et les fonctionnalités de sécurité, en supprimant complètement la dépendance à la version AWS SDK pour Java 1.x.

Pour plus d'informations, consultez la [note de mise à jour de KCL 3.0](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md).

**Topics**
+ [Quelles sont les nouveautés de KCL 3.0 ?](#kcl-migration-new-3-0)
+ [Migrer de KCL 2.x vers KCL 3.x](kcl-migration-from-2-3.md)
+ [Restauration par régression de la version précédente de la KCL](kcl-migration-rollback.md)
+ [Restauration par progression de la KCL 3.x après une restauration par régression](kcl-migration-rollforward.md)
+ [Bonnes pratiques pour la table de location avec mode capacité provisionnée](kcl-migration-lease-table.md)
+ [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

# Migrer de KCL 2.x vers KCL 3.x
<a name="kcl-migration-from-2-3"></a>

Cette rubrique fournit des step-by-step instructions pour faire migrer votre client de KCL 2.x vers KCL 3.x. KCL 3.x prend en charge la migration sur place des utilisateurs de KCL 2.x. Vous pouvez continuer à utiliser les données de votre flux de données Kinesis tout en faisant migrer vos employés de manière progressive.

**Important**  
KCL 3.x conserve les mêmes interfaces et méthodes que KCL 2.x. Il n'est donc pas nécessaire de mettre à jour votre code de traitement des enregistrements pendant la migration. Cependant, vous devez définir la configuration appropriée et vérifier les étapes requises pour la migration. Nous vous recommandons vivement de suivre les étapes de migration suivantes pour une expérience de migration fluide.

## Étape 1 : Prérequis
<a name="kcl-migration-from-2-3-prerequisites"></a>

Avant de commencer à utiliser KCL 3.x, assurez-vous que vous disposez des éléments suivants :
+ Kit de développement Java (JDK) 8 ou version ultérieure
+ AWS SDK pour Java 2. x
+ Maven ou Gradle pour la gestion des dépendances

**Important**  
N'utilisez pas les AWS SDK pour Java versions 2.27.19 à 2.27.23 avec KCL 3.x. Ces versions incluent un problème qui provoque une erreur d'exception liée à l'utilisation de DynamoDB par KCL. Nous vous recommandons d'utiliser la AWS SDK pour Java version 2.28.0 ou ultérieure pour éviter ce problème. 

## Étape 2 : ajouter des dépendances
<a name="kcl-migration-from-2-3-dependencies"></a>

Si vous utilisez Maven, ajoutez la dépendance suivante à votre `pom.xml` fichier. Assurez-vous d'avoir remplacé la version 3.x.x par la dernière version de KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Si vous utilisez Gradle, ajoutez ce qui suit à votre `build.gradle` fichier. Assurez-vous d'avoir remplacé la version 3.x.x par la dernière version de KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Vous pouvez vérifier la dernière version de la KCL dans le référentiel [central Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Étape 3 : Configuration de la configuration liée à la migration
<a name="kcl-migration-from-2-3-configuration"></a>

Pour migrer de KCL 2.x vers KCL 3.x, vous devez définir le paramètre de configuration suivant :
+ CoordinatorConfig. clientVersionConfig: Cette configuration détermine le mode de compatibilité des versions de KCL dans lequel l'application s'exécutera. Lors de la migration de KCL 2.x vers 3.x, vous devez définir cette configuration sur. `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` Pour définir cette configuration, ajoutez la ligne suivante lors de la création de votre objet planificateur :

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

Voici un exemple de la façon de définir le `CoordinatorConfig.clientVersionConfig` pour la migration de KCL 2.x vers 3.x. Vous pouvez ajuster d'autres configurations selon vos besoins spécifiques :

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

Il est important que tous les utilisateurs de votre application grand public utilisent le même algorithme d'équilibrage de charge à un moment donné, car KCL 2.x et 3.x utilisent des algorithmes d'équilibrage de charge différents. L'exécution de travailleurs utilisant des algorithmes d'équilibrage de charge différents peut entraîner une distribution de charge sous-optimale, car les deux algorithmes fonctionnent indépendamment les uns des autres.

Ce paramètre de compatibilité KCL 2.x permet à votre application KCL 3.x de s'exécuter dans un mode compatible avec KCL 2.x et d'utiliser l'algorithme d'équilibrage de charge pour KCL 2.x jusqu'à ce que tous les utilisateurs de votre application grand public aient été mis à niveau vers KCL 3.x. Une fois la migration terminée, KCL passe automatiquement en mode de fonctionnalité complète de KCL 3.x et commence à utiliser un nouvel algorithme d'équilibrage de charge KCL 3.x pour tous les travailleurs en cours d'exécution.

**Important**  
Si vous n'utilisez pas `ConfigsBuilder` mais créez un `LeaseManagementConfig` objet pour définir des configurations, vous devez ajouter un autre paramètre appelé `applicationName` dans la version 3.x ou ultérieure de KCL. Pour plus de détails, voir [Erreur de compilation avec le LeaseManagementConfig constructeur.](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig) Nous vous recommandons de l'utiliser `ConfigsBuilder` pour définir les configurations KCL. `ConfigsBuilder`fournit une méthode plus flexible et plus facile à gérer pour configurer votre application KCL.

## Étape 4 : Suivez les meilleures pratiques pour l'implémentation de la méthode ShutdownRequested ()
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x introduit une fonctionnalité appelée *transfert de bail progressif* afin de minimiser le retraitement des données lorsqu'un bail est transféré à un autre travailleur dans le cadre du processus de réattribution du bail. Pour ce faire, il suffit de vérifier le dernier numéro de séquence traité dans le tableau des baux avant le transfert des baux. Pour garantir le bon fonctionnement du transfert de bail progressif, vous devez vous assurer d'invoquer l'`checkpointer`objet dans la `shutdownRequested` méthode de votre classe. `RecordProcessor` Si vous n'invoquez pas l'`checkpointer`objet dans la `shutdownRequested` méthode, vous pouvez l'implémenter comme illustré dans l'exemple suivant. 

**Important**  
L'exemple de mise en œuvre suivant est une exigence minimale pour un transfert de bail harmonieux. Vous pouvez l'étendre pour inclure une logique supplémentaire liée au point de contrôle si nécessaire. Si vous effectuez un traitement asynchrone, assurez-vous que tous les enregistrements envoyés en aval ont été traités avant d'appeler le point de contrôle. 
Bien que le transfert progressif des baux réduise considérablement la probabilité de retraitement des données lors des transferts de bail, il n'élimine pas totalement cette possibilité. Pour préserver l'intégrité et la cohérence des données, concevez vos applications grand public en aval de manière à ce qu'elles soient idempotentes. Cela signifie qu'ils devraient être en mesure de traiter d'éventuels doublons d'enregistrements sans effets négatifs sur l'ensemble du système.

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## Étape 5 : Vérifiez les conditions préalables à la KCL 3.x pour la collecte des métriques relatives aux travailleurs
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x collecte des mesures d'utilisation du processeur, telles que l'utilisation du processeur, auprès des travailleurs afin d'équilibrer la charge entre les travailleurs de manière uniforme. Les utilisateurs d'applications grand public peuvent exécuter des applications sur Amazon EC2, Amazon ECS, Amazon EKS ou. AWS Fargate KCL 3.x peut collecter des métriques d'utilisation du processeur auprès des travailleurs uniquement lorsque les conditions préalables suivantes sont remplies :

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Votre système d'exploitation doit être Linux.
+ Vous devez l'activer [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)dans votre instance EC2.

 **Amazon Elastic Container Service (Amazon ECS) sur Amazon EC2**
+ Votre système d'exploitation doit être Linux.
+ Vous devez activer la [version 4 du point de terminaison des métadonnées des tâches ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La version de votre agent de conteneur Amazon ECS doit être 1.39.0 ou ultérieure.

 **Amazon ECS sur AWS Fargate**
+ Vous devez activer la version 4 [du point de terminaison des métadonnées des tâches Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). Si vous utilisez la version 1.4.0 ou ultérieure de la plateforme Fargate, cette option est activée par défaut. 
+ Version 1.4.0 ou ultérieure de la plateforme Fargate.

 **Amazon Elastic Kubernetes Service (Amazon EKS) sur Amazon EC2** 
+ Votre système d'exploitation doit être Linux.

 **Amazon EKS sur AWS Fargate**
+ Plateforme Fargate 1.3.0 ou version ultérieure.

**Important**  
Si KCL 3.x ne parvient pas à collecter les métriques d'utilisation du processeur auprès des travailleurs parce que les conditions préalables ne sont pas remplies, il rééquilibrera la charge en fonction du niveau de débit par bail. Ce mécanisme de rééquilibrage de secours garantira à tous les travailleurs des niveaux de débit total similaires grâce aux baux attribués à chaque travailleur. Pour de plus amples informations, veuillez consulter [Comment KCL attribue les baux aux travailleurs et équilibre la charge](kcl-dynamoDB.md#kcl-assign-leases).

## Étape 6 : Mettre à jour les autorisations IAM pour KCL 3.x
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

Vous devez ajouter les autorisations suivantes au rôle ou à la politique IAM associé à votre application client KCL 3.x. Cela implique de mettre à jour la politique IAM existante utilisée par l'application KCL. Pour de plus amples informations, veuillez consulter [Autorisations IAM requises pour les applications grand public KCL](kcl-iam-permissions.md).

**Important**  
Il est possible que les actions et ressources IAM suivantes ne soient pas ajoutées à vos applications KCL existantes dans la stratégie IAM, car elles n'étaient pas nécessaires dans KCL 2.x. Assurez-vous de les avoir ajoutés avant d'exécuter votre application KCL 3.x :  
Mesures : `UpdateTable`  
Ressources (ARNs) : `arn:aws:dynamodb:region:account:table/KCLApplicationName`
Mesures : `Query`  
Ressources (ARNs) : `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
Mesures :`CreateTable`,`DescribeTable`,`Scan`,`GetItem`,`PutItem`,`UpdateItem`, `DeleteItem`  
Ressources (ARNs) :`arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`, `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
Remplacez « région », « compte » et « KCLApplication nom » ARNs par votre propre Région AWS Compte AWS numéro et le nom de votre application KCL respectivement. Si vous utilisez des configurations pour personnaliser les noms des tables de métadonnées créées par KCL, utilisez les noms de table spécifiés au lieu du nom de l'application KCL.

## Étape 7 : Déployez le code KCL 3.x auprès de vos employés
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

Après avoir défini la configuration requise pour la migration et rempli toutes les listes de contrôle de migration précédentes, vous pouvez créer et déployer votre code auprès de vos employés.

**Note**  
Si vous constatez une erreur de compilation avec le `LeaseManagementConfig` constructeur, consultez la section [Erreur de compilation avec le LeaseManagementConfig constructeur](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig) pour obtenir des informations de dépannage.

## Étape 8 : terminer la migration
<a name="kcl-migration-from-2-3-finish"></a>

Pendant le déploiement du code KCL 3.x, KCL continue d'utiliser l'algorithme d'attribution de bail de KCL 2.x. Lorsque vous avez déployé avec succès le code KCL 3.x auprès de tous vos employés, KCL le détecte automatiquement et passe au nouvel algorithme d'attribution de bail en fonction de l'utilisation des ressources par les travailleurs. Pour plus de détails sur le nouvel algorithme d'attribution de bail, consultez[Comment KCL attribue les baux aux travailleurs et équilibre la charge](kcl-dynamoDB.md#kcl-assign-leases).

Pendant le déploiement, vous pouvez surveiller le processus de migration à l'aide des métriques suivantes émises à CloudWatch. Vous pouvez surveiller les métriques dans le cadre de l'`Migration`opération. Toutes les mesures sont per-KCL-application des mesures définies au niveau de la `SUMMARY` métrique. Si la `Sum` statistique de la `CurrentState:3xWorker` métrique correspond au nombre total de travailleurs dans votre application KCL, cela indique que la migration vers KCL 3.x s'est terminée avec succès.

**Important**  
 Il faut au moins 10 minutes à KCL pour passer au nouvel algorithme d'attribution des locataires une fois que tous les travailleurs sont prêts à l'exécuter.


**CloudWatch métriques pour le processus de migration de KCL**  

| Métriques | Description | 
| --- | --- | 
| CurrentState:3xWorker |  Le nombre de travailleurs de KCL ayant migré avec succès vers KCL 3.x et exécutant le nouvel algorithme d'attribution de bail. Si le `Sum` nombre de cette métrique correspond au nombre total de vos travailleurs, cela indique que la migration vers KCL 3.x s'est terminée avec succès. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  Nombre de travailleurs KCL exécutés en mode compatible KCL 2.x pendant le processus de migration. Une valeur différente de zéro pour cette métrique indique que la migration est toujours en cours. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  Le nombre d'exceptions rencontrées au cours du processus de migration. La plupart de ces exceptions sont des erreurs transitoires, et KCL 3.x réessaiera automatiquement de terminer la migration. Si vous observez une valeur de `Fault` métrique persistante, consultez vos journaux de la période de migration pour en savoir plus sur la résolution des problèmes. Si le problème persiste, contactez Support. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  État de la création de l'indice secondaire global (GSI) dans la table des baux. Cette métrique indique si le GSI de la table des baux a été créé, condition préalable à l'exécution de KCL 3.x. La valeur est 0 ou 1, 1 indiquant une création réussie. Lors d'un état de restauration, cette métrique ne sera pas émise. Une fois que vous aurez recommencé à avancer, vous pourrez reprendre le suivi de cette métrique. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  Le statut du travailleur mesure les émissions de tous les travailleurs. Les métriques indiquent si tous les travailleurs émettent des métriques telles que l'utilisation du processeur. La valeur est 0 ou 1, 1 indiquant que tous les travailleurs ont réussi à émettre des métriques et sont prêts pour le nouvel algorithme d'attribution de bail. Lors d'un état de restauration, cette métrique ne sera pas émise. Une fois que vous aurez recommencé à avancer, vous pourrez reprendre le suivi de cette métrique. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/streams/latest/dev/kcl-migration-from-2-3.html)  | 

KCL fournit une fonctionnalité de restauration vers le mode compatible 2.x pendant la migration. Une fois la migration réussie vers KCL 3.x, nous vous recommandons de supprimer le `CoordinatorConfig.clientVersionConfig` paramètre « `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` si la restauration n'est plus nécessaire ». La suppression de cette configuration arrête l'émission de métriques liées à la migration depuis l'application KCL.

**Note**  
Nous vous recommandons de surveiller les performances et la stabilité de votre application pendant un certain temps pendant la migration et une fois celle-ci terminée. Si vous constatez des problèmes, vous pouvez faire en sorte que les travailleurs utilisent les fonctionnalités compatibles avec KCL 2.x à l'aide de l'outil de migration [KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

# Restauration par régression de la version précédente de la KCL
<a name="kcl-migration-rollback"></a>

Cette rubrique explique les étapes à suivre pour ramener votre client à la version précédente. Lorsque vous devez revenir en arrière, il existe un processus en deux étapes : 

1. Exécuter l’[outil de migration de la KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)

1. Redéployez le code de la version précédente de KCL (facultatif).

## Étape 1 : exécuter l’outil de migration de la KCL
<a name="kcl-migration-rollback-tool"></a>

Si vous avez besoin de restaurer la version précédente de la KCL, vous devez exécuter l’outil de migration de la KCL. L'outil de migration KCL effectue deux tâches importantes :
+ Il supprime une table de métadonnées appelée table des métriques de worker, ainsi qu’un index secondaire global de la table des baux dans DynamoDB. Ces deux artefacts sont créés par KCL 3.x mais ne sont pas nécessaires lorsque vous revenez à la version précédente.
+ Il permet à tous les travailleurs de fonctionner dans un mode compatible avec KCL 2.x et de commencer à utiliser l'algorithme d'équilibrage de charge utilisé dans les versions précédentes de KCL. Si vous rencontrez des problèmes avec le nouvel algorithme d’équilibrage de charge dans la KCL 3.x, cela permettra de les résoudre immédiatement.

**Important**  
La table des états de coordinateur de DynamoDB doit exister et ne doit pas être supprimée pendant les processus de migration, de restauration par régression et de restauration par progression. 

**Note**  
Il est important que tous les workers de votre application consommateur utilisent le même algorithme d’équilibrage de charge à un moment donné. L'outil de migration KCL garantit que tous les utilisateurs de votre application grand public KCL 3.x passent en mode compatible avec KCL 2.x afin qu'ils exécutent le même algorithme d'équilibrage de charge lors du report du remboursement vers votre version KCL précédente.

Vous pouvez télécharger l'[outil de migration KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) dans le répertoire des scripts du référentiel [KCL GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master). Le script peut être exécuté depuis n'importe lequel de vos collaborateurs ou depuis n'importe quel hôte disposant des autorisations requises pour écrire dans la table d'état des coordinateurs, supprimer la table des métriques des travailleurs et mettre à jour la table des baux. Vous pouvez vous référer à [Autorisations IAM requises pour les applications grand public KCL](kcl-iam-permissions.md) l'autorisation IAM requise pour exécuter le script. Vous ne devez exécuter le script qu'une seule fois par application KCL. Vous pouvez exécuter l'outil de migration KCL à l'aide de la commande suivante : 

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**Paramètres**
+ --region : Remplacez `<region>` par votre. Région AWS
+ --application\$1name : ce paramètre est obligatoire si vous utilisez des noms par défaut pour vos tables de métadonnées DynamoDB (table des baux, table des états des coordinateurs et table des métriques des travailleurs). Si vous avez spécifié des noms personnalisés pour ces tables, vous pouvez omettre ce paramètre. `<applicationName>`Remplacez-le par le nom réel de votre application KCL. L’outil l’utilise pour créer les noms de table par défaut si aucun nom personnalisé n’est fourni.
+ --lease\$1table\$1name (facultatif) : ce paramètre est nécessaire lorsque vous avez défini un nom personnalisé pour la table des baux dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. `leaseTableName`Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table de location.
+ --coordinator\$1state\$1table\$1name (facultatif) : ce paramètre est nécessaire lorsque vous avez défini un nom personnalisé pour la table d'état des coordinateurs dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. `<coordinatorStateTableName>`Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table d'état des coordinateurs. 
+ --worker\$1metrics\$1table\$1name (facultatif) : ce paramètre est nécessaire lorsque vous avez défini un nom personnalisé pour la table des métriques de travail dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. `<workerMetricsTableName>`Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre tableau des métriques des travailleurs. 

## Étape 2 : redéployer le code avec la version précédente de KCL (facultatif)
<a name="kcl-migration-rollback-redeploy"></a>

 Après avoir exécuté l’outil de migration de la KCL pour effectuer une restauration par régression, l’un des messages suivants s’affiche :
+ **Message 1 :** « Annulation terminée. Votre application KCL exécutait le mode compatible KCL 2.x. Si aucune régression n'est atténuée, veuillez revenir aux fichiers binaires de votre application précédente en déployant le code avec votre version précédente de KCL. »
  + **Action requise :** Cela signifie que vos travailleurs fonctionnaient en mode compatible avec KCL 2.x. Si le problème persiste, redéployez le code avec la version précédente de KCL auprès de vos employés.
+ **Message 2 :** « Annulation terminée. Votre application KCL exécutait le mode de fonctionnalité KCL 3.x. Il n'est pas nécessaire de revenir aux fichiers binaires de l'application précédents, sauf si vous ne voyez aucune solution au problème dans les 5 minutes. Si le problème persiste, veuillez revenir aux fichiers binaires de votre application précédente en déployant le code avec votre version précédente de KCL. »
  + **Action requise :** Cela signifie que vos travailleurs fonctionnaient en mode KCL 3.x et que l'outil de migration KCL a fait passer tous les travailleurs en mode compatible avec KCL 2.x. Si le problème est résolu, il n'est pas nécessaire de redéployer le code avec la version précédente de KCL. Si le problème persiste, redéployez le code avec la version précédente de KCL auprès de vos employés.

 

# Restauration par progression de la KCL 3.x après une restauration par régression
<a name="kcl-migration-rollforward"></a>

Cette rubrique explique les étapes à suivre pour faire revenir votre client à KCL 3.x après une annulation. Lorsque vous devez avancer, vous devez suivre un processus en deux étapes : 

1. Exécuter l’[outil de migration de la KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) 

1. Déployer le code avec la KCL 3.x

## Étape 1 : exécuter l’outil de migration de la KCL
<a name="kcl-migration-rollback-tool"></a>

Exécuter l’outil de migration de la KCL Outil de migration KCL avec la commande suivante pour passer à KCL 3.x :

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**Paramètres**
+ --region : Remplacez `<region>` par votre. Région AWS
+ --application\$1name : ce paramètre est obligatoire si vous utilisez des noms par défaut pour votre table d'état des coordinateurs. Si vous avez spécifié des noms personnalisés pour la table des états de coordinateur, vous pouvez omettre ce paramètre. `<applicationName>`Remplacez-le par le nom réel de votre application KCL. L’outil l’utilise pour créer les noms de table par défaut si aucun nom personnalisé n’est fourni.
+ --coordinator\$1state\$1table\$1name (facultatif) : ce paramètre est nécessaire lorsque vous avez défini un nom personnalisé pour la table d'état des coordinateurs dans votre configuration KCL. Si vous utilisez le nom de table par défaut, vous pouvez omettre ce paramètre. `<coordinatorStateTableName>`Remplacez-le par le nom de table personnalisé que vous avez spécifié pour votre table d'état des coordinateurs. 

Une fois que vous avez exécuté l’outil de migration en mode restauration par progression, la KCL crée les ressources DynamoDB suivantes requises pour la KCL 3.x :
+ Un index secondaire global sur la table des baux
+ Une table des métriques de worker

## Étape 2 : déployer le code avec la KCL 3.x
<a name="kcl-migration-rollback-redeploy"></a>

Après avoir exécuté l’outil de migration de la KCL pour une restauration par progression, déployez votre code avec la KCL 3.x sur vos workers. Suivez [Étape 8 : terminer la migration](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) pour terminer votre migration.

# Bonnes pratiques pour la table de location avec mode capacité provisionnée
<a name="kcl-migration-lease-table"></a>

Si la table des baux de votre application KCL est passée en mode capacité provisionnée, KCL 3.x crée un index secondaire global sur la table des baux avec le mode de facturation provisionné et les mêmes unités de capacité de lecture (RCU) et unités de capacité d'écriture (WCU) que la table de location de base. Lorsque l'index secondaire global est créé, nous vous recommandons de surveiller l'utilisation réelle de l'index secondaire global dans la console DynamoDB et d'ajuster les unités de capacité si nécessaire. Pour un guide plus détaillé sur le changement de mode de capacité des tables de métadonnées DynamoDB créées par KCL, consultez. [Mode de capacité DynamoDB pour les tables de métadonnées créées par KCL](kcl-dynamoDB.md#kcl-capacity-mode) 

**Note**  
Par défaut, KCL crée des tables de métadonnées telles que la table des baux, la table des métriques des travailleurs et la table d'état des coordinateurs, ainsi que l'index secondaire global sur la table des baux en utilisant le mode de capacité à la demande. Nous vous recommandons d'utiliser le mode capacité à la demande pour ajuster automatiquement la capacité en fonction de l'évolution de votre utilisation. 

# Migration de la KCL 1.x vers la KCL 3.x
<a name="kcl-migration-1-3"></a>

Cette rubrique explique les instructions pour migrer votre client de KCL 1.x vers KCL 3.x. KCL 1.x utilise des classes et interfaces différentes de celles de KCL 2.x et KCL 3.x. Vous devez d'abord migrer le processeur d'enregistrements, l'usine du processeur d'enregistrements et les classes de travail vers le format compatible KCL 2.x/3.x, puis suivre les étapes de migration de KCL 2.x vers KCL 3.x. Vous pouvez passer directement de KCL 1.x à KCL 3.x.
+ **Étape 1 : migrer le processeur d'enregistrement**

  Suivez la section [Migrer le processeur d'enregistrements](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) sur la page [Migrer les consommateurs de KCL 1.x vers KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Étape 2 : migrer l'usine de traitement des enregistrements**

  Suivez la section [Migrer l'usine du processeur d'enregistrements](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration) sur la page [Migrer les consommateurs de KCL 1.x vers KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Étape 3 : migrer le travailleur**

  Suivez la section [Migrer le travailleur](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration) sur la page [Migrer les consommateurs de KCL 1.x vers KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Étape 4 : Migrer la configuration de KCL 1.x**

  Suivez la section [Configurer le client Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration) sur la page [Migrer les consommateurs de KCL 1.x vers KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Étape 5 : vérification de la suppression des périodes d'inactivité et des suppressions de configuration client**

  Suivez les sections [Suppression des périodes d'inactivité](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal) et [Suppression de la configuration client](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals) de la page [Migrer les consommateurs de KCL 1.x vers KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Étape 6 : Suivez les step-by-step instructions du guide de migration de KCL 2.x vers KCL 3.x**

  Suivez les instructions de la [Migrer de KCL 2.x vers KCL 3.x](kcl-migration-from-2-3.md) page pour terminer la migration. Si vous devez revenir à la version précédente de KCL ou passer à KCL 3.x après une restauration, reportez-vous à et. [Restauration par régression de la version précédente de la KCL](kcl-migration-rollback.md) [Restauration par progression de la KCL 3.x après une restauration par régression](kcl-migration-rollforward.md)

**Important**  
N'utilisez pas les AWS SDK pour Java versions 2.27.19 à 2.27.23 avec KCL 3.x. Ces versions incluent un problème qui provoque une erreur d'exception liée à l'utilisation de DynamoDB par KCL. Nous vous recommandons d'utiliser la AWS SDK pour Java version 2.28.0 ou ultérieure pour éviter ce problème. 

# Documentation sur les versions précédentes de KCL
<a name="kcl-archive"></a>

Les rubriques suivantes ont été archivées. Pour consulter la documentation actuelle de la bibliothèque cliente Kinesis, consultez. [Utiliser la bibliothèque cliente Kinesis](kcl.md)

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

**Topics**
+ [Informations sur KCL 1.x et 2.x](shared-throughput-kcl-consumers.md)
+ [Développez des consommateurs personnalisés avec un débit partagé](shared-throughput-consumers.md)
+ [Migrer les consommateurs de KCL 1.x vers KCL 2.x](kcl-migration.md)

# Informations sur KCL 1.x et 2.x
<a name="shared-throughput-kcl-consumers"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

L'une des méthodes de développement d'applications consommateur personnalisées capables de traiter des données provenant de flux de données KDS consiste à utiliser la Kinesis Client Library (KCL).

**Topics**
+ [À propos de KCL (versions précédentes)](#shared-throughput-kcl-consumers-overview)
+ [Versions précédentes de KCL](#shared-throughput-kcl-consumers-versions)
+ [Concepts KCL (versions précédentes)](#shared-throughput-kcl-consumers-concepts)
+ [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](#shared-throughput-kcl-consumers-leasetable)
+ [Traitez plusieurs flux de données avec la même application grand public KCL 2.x pour Java](#shared-throughput-kcl-multistream)
+ [Utiliser le KCL avec le registre des AWS Glue schémas](#shared-throughput-kcl-consumers-glue-schema-registry)

**Note**  
Il est recommandé, selon votre scénario d'utilisation, de mettre à jour vers la version la plus récente de KCL 1.x ou de KCL 2.x, que vous utilisiez KCL 1.x ou KCL 2.x. KCL 1.x et KCL 2.x bénéficient régulièrement de nouvelles versions incluant les derniers correctifs de dépendance et de sécurité, des corrections de bogues, ainsi que de nouvelles fonctionnalités compatibles avec les versions antérieures. Pour plus d'informations, consultez [https://github.com/awslabs/amazon-kinesis-client/releases](https://github.com/awslabs/amazon-kinesis-client/releases).

## À propos de KCL (versions précédentes)
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL facilite la consommation et le traitement des données provenant d'un flux de données Kinesis en gérant de nombreuses tâches complexes associées au calcul distribué. Il s'agit notamment de l'équilibrage de charge entre plusieurs instances d'applications consommateur, de la réponse aux défaillances des instances d'applications consommateur, du contrôle des enregistrements traités et de la réaction au repartitionnement. La KCL gère toutes ces sous-tâches afin que vous puissiez vous concentrer sur l'écriture de votre logique personnalisée de traitement des enregistrements.

Le KCL est différent des Kinesis Data APIs Streams disponibles dans le. AWS SDKs Les Kinesis Data APIs Streams vous aident à gérer de nombreux aspects de Kinesis Data Streams, notamment la création de flux, le repartage, ainsi que le transfert et l'obtention d'enregistrements. La KCL offre une couche d'abstraction pour toutes ces sous-tâches, notamment pour que vous puissiez vous concentrer sur la logique de traitement des données personnalisée de votre application consommateur. Pour plus d'informations sur l'API Kinesis Data Streams, consultez la [Référence d'API Amazon Kinesis](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html) (français non garanti).

**Important**  
La KCL est une bibliothèque Java. Support pour les langages autres que Java est fourni à l'aide d'une interface multilingue appelée. MultiLangDaemon Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par exemple, si vous installez la KCL pour Python et que vous écrivez votre application client entièrement en Python, vous devez tout de même installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations sur le MultiLangDaemon on GitHub, consultez le [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

La KCL sert d'intermédiaire entre votre logique de traitement des enregistrements et Kinesis Data Streams. 

## Versions précédentes de KCL
<a name="shared-throughput-kcl-consumers-versions"></a>

Actuellement, vous pouvez utiliser l'une des versions prises en charge suivantes de KCL pour créer vos applications consommateur personnalisées :
+ **KCL 1.x**

  Pour de plus amples informations, consultez [Développez les consommateurs de KCL 1.x](developing-consumers-with-kcl.md).
+ **KCL 2.x**

  Pour de plus amples informations, consultez [Développez des consommateurs de KCL 2.x](developing-consumers-with-kcl-v2.md).

Vous pouvez utiliser KCL 1.x ou KCL 2.x pour créer des applications consommateur utilisant un débit partagé. Pour de plus amples informations, veuillez consulter [Développez des consommateurs personnalisés avec un débit partagé à l'aide de KCL](custom-kcl-consumers.md).

Pour créer des applications consommateur qui utilisent un débit dédié (consommateurs à débit amélioré), vous devez uniquement utiliser KCL 2.x. Pour de plus amples informations, veuillez consulter [Développez des clients fans améliorés grâce à un débit dédié](enhanced-consumers.md).

Pour plus d'informations sur les différences entre KCL 1.x et KCL 2.x, ainsi que des instructions sur la manière de migrer de KCL 1.x à KCL 2.x, consultez [Migrer les consommateurs de KCL 1.x vers KCL 2.x](kcl-migration.md).

## Concepts KCL (versions précédentes)
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **Application consommateur KCL** : application personnalisée utilisant KCL et conçue pour lire et traiter des enregistrements à partir de flux de données. 
+ **Instance d'application consommateur** : les applications consommateur KCL sont généralement distribuées, avec une ou plusieurs instances d'application s'exécutant simultanément afin de coordonner les défaillances et d'équilibrer dynamiquement la charge de traitement des enregistrements de données.
+ **Application de travail** : classe de haut niveau utilisée par une instance d'application consommateur KCL pour commencer à traiter des données. 
**Important**  
Chaque instance d'application consommateur KCL possède une application de travail. 

  L'application de travail initialise et supervise diverses tâches, y compris la synchronisation des informations de partition et de bail, le suivi des affectations de partitions et le traitement des données provenant des partitions. Un travailleur fournit à KCL les informations de configuration de l'application client, telles que le nom du flux de données dont cette application client KCL va traiter les enregistrements de données et les AWS informations d'identification nécessaires pour accéder à ce flux de données. L'application de travail démarre également cette instance d'application consommateur KCL spécifique pour fournir des enregistrements de données du flux de données aux processeurs d'enregistrements.
**Important**  
Dans KCL 1.x, cette classe s'appelle **Application de travail**. Pour plus d'informations (il s'agit des référentiels Java KCL), consultez [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java).java. Dans KCL 2.x, cette classe s'appelle **Planificateur**. L'objectif du planificateur dans KCL 2.x est identique à celui de l'application de travail dans KCL 1.x. [Pour plus d'informations sur la classe Scheduler dans KCL 2.x, consultez/.java. https://github.com/awslabs/ amazon-kinesis-client blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java) 
+ **Bail** : données qui définissent le lien entre une application de travail et une partition. Les applications consommateur KCL distribuées utilisent des baux pour répartir le traitement des enregistrements de données sur une flotte d'applications de travail. À tout moment, chaque partition d'enregistrements de données est liée à une application de travail particulière par un bail identifié par la variable **leaseKey**. 

  Par défaut, un travailleur peut détenir un ou plusieurs baux (sous réserve de la valeur de la variable **maxLeasesForWorker**) en même temps. 
**Important**  
Chaque application de travail devra posséder tous les baux disponibles pour toutes les partitions disponibles dans un flux de données. Cependant, à tout moment, une seule application de travail peut posséder avec succès un bail spécifique. 

  Par exemple, si vous avez une instance d'application consommateur A avec l'application de travail A qui traite un flux de données contenant 4 partitions, l'application de travail A peut posséder simultanément des baux pour les partitions 1, 2, 3 et 4. Cependant, si vous avez deux instances d'applications consommateur, nommées A et B, chacune avec sa propre application de travail (application de travail A et application de travail B), traitant un flux de données composé de 4 partitions, alors l'application de travail A et l'application de travail B ne peuvent pas détenir simultanément le bail de la partition 1. Une application de travail possède le bail d'une partition spécifique jusqu'à ce qu'elle soit prête à arrêter de traiter les enregistrements de données de cette partition ou jusqu'à ce qu'elle rencontre une défaillance. Lorsqu'une application de travail cesse de possède le bail, une autre application de travail prend et possède ce bail. 

  Pour plus d'informations (il s'agit des référentiels Java KCL), consultez [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java pour KCL 1.x et [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java).java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) pour KCL 2.x.
+ **Table des baux** : une table Amazon DynamoDB unique utilisée pour suivre les partitions d'un flux de données KDS louées et traitées par les applications de travail de l'application consommateur KCL. La table des baux doit rester synchronisée (au sein d'une application de travail et entre toutes les application de travail) avec les dernières informations relatives aux partitions provenant du flux de données pendant l'exécution de l'application consommateur KCL. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](#shared-throughput-kcl-consumers-leasetable).
+ **Processeur d'enregistrement :** logique qui définit la manière dont votre application consommateur KCL traite les données provenant des flux de données. Au moment de l'exécution, une instance d'application consommateur KCL instancie une application de travail et cette dernière instancie un processeur d'enregistrement pour chaque partition qu'elle loue. 

## Utilisez une table de location pour suivre les partitions traitées par l'application client KCL
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [Qu'est-ce qu'une table de location](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [Débit](#shared-throughput-kcl-leasetable-throughput)
+ [Comment une table de location est synchronisée avec les partitions d'un flux de données Kinesis](#shared-throughput-kcl-consumers-leasetable-sync)

### Qu'est-ce qu'une table de location
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

Pour chaque application Amazon Kinesis Data Streams, KCL utilise une table des baux unique (stockée dans une table Amazon DynamoDB) pour suivre les partitions d'un flux de données KDS qui sont louées et traitées par les applications de travail de l'application consommateur KCL.

**Important**  
KCL utilise le nom de l'application consommateur pour créer le nom de la table des baux que cette application utilise, c'est pourquoi chaque nom d'application consommateur doit être unique.

Vous pouvez consulter cette table à l'aide de la [console Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) lors que l'application est en cours d'exécution.

Si la table des baux de votre application consommateur KCL n'existe pas au démarrage de l'application, l'une des applications de travail crée la table des baux pour cette application. 

**Important**  
 Votre compte est facturé pour les coûts associés à la table DynamoDB, en plus des coûts associés au service Kinesis Data Streams lui-même. 

Chaque ligne de la table des baux représente une partition qui est en cours de traitement par les applications de travail de votre application consommateur. Si votre application consommateur KCL ne traite qu'un seul flux de données, alors `leaseKey` qui est la clé de hachage de la table des baux est l'ID de la partition. Si vous êtes [Traitez plusieurs flux de données avec la même application grand public KCL 2.x pour Java](#shared-throughput-kcl-multistream), la structure de la leaseKey ressemble à : `account-id:StreamName:streamCreationTimestamp:ShardId`. Par exemple, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

En plus de l'ID de partition, chaque ligne inclut également les données suivantes :
+ **checkpoint :** le plus récent numéro de séquence de point de contrôle de la partition. Cette valeur est unique dans toutes les partitions du flux de données.
+ **checkpointSubSequenceNuméro :** lorsque vous utilisez la fonctionnalité d'agrégation de la bibliothèque Kinesis Producer, il s'agit d'une extension du **point de contrôle** qui permet de suivre les enregistrements utilisateur individuels au sein de l'enregistrement Kinesis.
+ **leaseCounter :** utilisé pour la gestion des versions de bail afin de permettre aux applications de travail de détecter que leur bail a été pris par une autre application de travail.
+ **leaseKey :** un identifiant unique de bail. Chaque bail est propre à une partition du flux de données et est détenu par une seule application de travail à la fois.
+ **leaseOwner :** l'application de travail qui détient ce bail.
+ **ownerSwitchesSincePoint de contrôle :** combien de fois ce bail a changé de travailleur depuis la dernière fois qu'un point de contrôle a été écrit.
+ **parentShardId:** Utilisé pour garantir que la partition parent est entièrement traitée avant le début du traitement sur les partitions enfants. Cela garantit que les enregistrements sont traités dans l'ordre dans lequel ils ont été placés dans le flux.
+ **hashrange :** utilisé par le `PeriodicShardSyncManager` pour exécuter des synchronisations périodiques afin de trouver les partitions manquantes dans la table des baux et de créer des baux pour celles-ci si nécessaire. 
**Note**  
Ces données sont présentes dans le tableau des baux pour chaque partition à partir de KCL 1.14 et KCL 2.3. Pour plus d'informations sur `PeriodicShardSyncManager` et la synchronisation périodique entre les baux et les partitions, consultez [Comment une table de location est synchronisée avec les partitions d'un flux de données Kinesis](#shared-throughput-kcl-consumers-leasetable-sync).
+ **childshards :** utilisé par le `LeaseCleanupManager` pour vérifier l'état de traitement de la partition enfant et décider si la partition parent peut être supprimée de la table des baux.
**Note**  
Ces données sont présentes dans le tableau des baux pour chaque partition à partir de KCL 1.14 et KCL 2.3.
+ **shardID :** ID de la partition.
**Note**  
Ces données ne sont présentes dans le tableau des baux que si vous êtes [Traitez plusieurs flux de données avec la même application grand public KCL 2.x pour Java](#shared-throughput-kcl-multistream). Ceci n'est pris en charge que dans KCL 2.x pour Java, à partir de KCL 2.3 pour Java et versions ultérieures. 
+ **nom du flux** : identifiant du flux de données au format suivant : `account-id:StreamName:streamCreationTimestamp`.
**Note**  
Ces données ne sont présentes dans le tableau des baux que si vous êtes [Traitez plusieurs flux de données avec la même application grand public KCL 2.x pour Java](#shared-throughput-kcl-multistream). Ceci n'est pris en charge que dans KCL 2.x pour Java, à partir de KCL 2.3 pour Java et versions ultérieures. 

### Débit
<a name="shared-throughput-kcl-leasetable-throughput"></a>

Si votre application Amazon Kinesis Data Streams reçoit des exceptions de débit provisionné, vous devez augmenter le débit provisionné pour la table DynamoDB. La KCL crée la table avec un débit provisionné de 10 lectures par seconde et 10 écritures par seconde, mais cela peut être insuffisant pour votre application. Par exemple, si votre application Amazon Kinesis Data Streams effectue des contrôles fréquents ou fonctionne sur un flux composé de nombreuses partitions, il se peut que vous ayez besoin d'un débit plus élevé.

Pour plus d'informations sur le débit provisionné dans DynamoDB consultez les rubriques [Mode de capacité en lecture/écriture](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) et [Utilisation des tables et des données](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html) dans le *Guide du développeur Amazon DynamoDB*.

### Comment une table de location est synchronisée avec les partitions d'un flux de données Kinesis
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

Les applications de travail des applications consommateur KCL utilisent des baux pour traiter des partitions de données à partir d'un flux de données donné. Les informations relatives à l'application de travail qui loue une partition à un moment donné sont stockées dans une table des baux. La table des baux doit rester synchronisée avec les dernières informations relatives aux partitions provenant du flux de données pendant l'exécution de l'application consommateur KCL. KCL synchronise la table des baux avec les informations sur les partitions acquises à partir du service Kinesis Data Streams pendant le démarrage de l'application consommateur (soit lorsque l'application consommateur est initialisée, soit lorsqu'elle est redémarrée) et également chaque fois qu'une partition en cours de traitement arrive à son terme (repartitionnement). Autrement dit, les applications de travail ou une application consommateur KCL sont synchronisés avec le flux de données qu'ils traitent lors du démarrage initial de l'application consommateur et chaque fois que l'application consommateur rencontre un événement de repartitionnement du flux de données.

**Topics**
+ [Synchronisation dans KCL 1.0 - 1.13 et KCL 2.0 - 2.2](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [Synchronisation dans KCL 2.x, à partir de KCL 2.3 et versions ultérieures](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [Synchronisation dans KCL 1.x, à partir de KCL 1.14 et versions ultérieures](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### Synchronisation dans KCL 1.0 - 1.13 et KCL 2.0 - 2.2
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

Dans KCL 1.0 - 1.13 et KCL 2.0 - 2.2, lors du démarrage de l'application client et également lors de chaque événement de redéfinition du flux de données, KCL synchronise la table des baux avec les informations relatives aux partitions obtenues auprès du service Kinesis Data Streams en invoquant le ou la découverte. `ListShards` `DescribeStream` APIs Dans toutes les versions de KCL répertoriées ci-dessus, chaque utilisateur d'une application client KCL effectue les étapes suivantes pour effectuer le processus de lease/shard synchronisation lors du démarrage de l'application client et à chaque événement de redéfinition du flux :
+ Récupère toutes les partitions contenant les données du flux en cours de traitement
+ Récupère tous les baux de partitions à partir de la table des baux
+ Filtre chaque partition ouverte qui n'a pas de bail dans la table des baux
+ Effectue une itération sur toutes les partitions ouvertes trouvées et pour chaque partition ouverte sans parent ouvert :
  + Parcourt l'arbre hiérarchique en suivant le chemin de ses ancêtres pour déterminer si la partition est une descendante. Une partition est définie comme descendante si une partition ancestrale est actuellement en cours de traitement (c'est-à-dire, l'entrée de bail pour cette partition ancestrale est présente dans la table des baux) ou si une partition ancestrale est prévue pour être traitée (par exemple, si la position initiale est `TRIM_HORIZON` ou `AT_TIMESTAMP`)
  + Si la partition ouverte dans le contexte est une descendante, KCL vérifie les points de contrôle de la partition en fonction de la position initiale et crée des baux pour ses parents, si nécessaire

#### Synchronisation dans KCL 2.x, à partir de KCL 2.3 et versions ultérieures
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

À partir des dernières versions prises en charge de KCL 2.x (KCL 2.3) et versions ultérieures, la bibliothèque prend désormais en charge les modifications suivantes apportées au processus de synchronisation. Ces modifications de lease/shard synchronisation réduisent considérablement le nombre d'appels d'API effectués par les applications grand public KCL vers le service Kinesis Data Streams et optimisent la gestion des baux dans votre application client KCL. 
+ Pendant le démarrage de l'application, si la table des baux est vide, KCL utilise l'option de filtrage de l'API `ListShard` (le paramètre de demande facultatif `ShardFilter`) pour récupérer et créer des baux uniquement pour un instantané des partitions ouvertes à l'heure spécifiée par le paramètre `ShardFilter`. Le paramètre `ShardFilter` vous permet de filtrer la réponse de l'API `ListShards`. La seule propriété obligatoire du paramètre `ShardFilter` est `Type`. KCL utilise la propriété de filtre `Type` et ses valeurs valides suivantes pour identifier et renvoyer un instantané des partitions ouvertes susceptibles de nécessiter de nouveaux baux :
  + `AT_TRIM_HORIZON` : la réponse inclut toutes les partitions ouvertes sur `TRIM_HORIZON`. 
  + `AT_LATEST` : la réponse inclut uniquement les partitions actuellement ouvertes du flux de données. 
  + `AT_TIMESTAMP` : la réponse inclut toutes les partitions dont l'horodatage de début est inférieur ou égal à l'horodatage donné et l'horodatage de fin est supérieur ou égal à l'horodatage donné ou encore ouvertes.

  `ShardFilter` est utilisé lors de la création de baux pour une table des baux vide afin d'initialiser les baux pour un instantané des partitions spécifiées à `RetrievalConfig#initialPositionInStreamExtended`.

  Pour plus d’informations sur `ShardFilter`, consultez [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Au lieu que tous les travailleurs effectuent la lease/shard synchronisation pour maintenir le tableau des baux à jour avec les dernières partitions du flux de données, un seul responsable des travailleurs élu effectue la synchronisation entre le bail et la partition.
+ KCL 2.3 utilise le paramètre de `ChildShards` retour de `GetRecords` et pour effectuer une lease/shard synchronisation qui se produit `SHARD_END` pour les `SubscribeToShard` APIs partitions fermées, permettant à un travailleur KCL de créer des baux uniquement pour les partitions enfants de la partition qu'il a fini de traiter. Pour les applications grand public partagées, cette optimisation de la lease/shard synchronisation utilise le `ChildShards` paramètre de l'`GetRecords`API. Pour les applications grand public à débit dédié (fan-out amélioré), cette optimisation de la lease/shard synchronisation utilise le `ChildShards` paramètre de l'API. `SubscribeToShard` Pour plus d’informations, consultez [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html), [SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) et [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Avec les modifications ci-dessus, le comportement de KCL passe d'un modèle où toutes les applications de travail connaissent toutes les partitions existantes à un modèle où les applications de travail ne connaissent que les partitions enfants des partitions dont chaque application est propriétaire. Par conséquent, outre la synchronisation qui se produit lors des événements de démarrage et de refonte des applications grand public, KCL effectue désormais des shard/lease analyses périodiques supplémentaires afin d'identifier les éventuelles lacunes dans le tableau des baux (en d'autres termes, pour en savoir plus sur toutes les nouvelles partitions) afin de s'assurer que la plage de hachage complète du flux de données est traitée et de créer des baux pour celles-ci si nécessaire. `PeriodicShardSyncManager`est le composant chargé d'exécuter des lease/shard analyses périodiques. 

  Pour plus d'informations sur `PeriodicShardSyncManager` KCL 2.3, consultez [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java \$1L201](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213) -L213.

  Dans KCL 2.3, de nouvelles options de configuration sont disponibles pour la configuration de `PeriodicShardSyncManager` dans `LeaseManagementConfig` :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/streams/latest/dev/shared-throughput-kcl-consumers.html)

  De nouvelles CloudWatch mesures sont également désormais émises pour surveiller l'état de santé du`PeriodicShardSyncManager`. Pour de plus amples informations, veuillez consulter [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ Incluant une optimisation de `HierarchicalShardSyncer` pour créer des baux uniquement pour une couche de partition.

#### Synchronisation dans KCL 1.x, à partir de KCL 1.14 et versions ultérieures
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

À partir des dernières versions prises en charge de KCL 1.x (KCL 1.14) et versions ultérieures, la bibliothèque prend désormais en charge les modifications suivantes apportées au processus de synchronisation. Ces modifications de lease/shard synchronisation réduisent considérablement le nombre d'appels d'API effectués par les applications grand public KCL vers le service Kinesis Data Streams et optimisent la gestion des baux dans votre application client KCL. 
+ Pendant le démarrage de l'application, si la table des baux est vide, KCL utilise l'option de filtrage de l'API `ListShard` (le paramètre de demande facultatif `ShardFilter`) pour récupérer et créer des baux uniquement pour un instantané des partitions ouvertes à l'heure spécifiée par le paramètre `ShardFilter`. Le paramètre `ShardFilter` vous permet de filtrer la réponse de l'API `ListShards`. La seule propriété obligatoire du paramètre `ShardFilter` est `Type`. KCL utilise la propriété de filtre `Type` et ses valeurs valides suivantes pour identifier et renvoyer un instantané des partitions ouvertes susceptibles de nécessiter de nouveaux baux :
  + `AT_TRIM_HORIZON` : la réponse inclut toutes les partitions ouvertes sur `TRIM_HORIZON`. 
  + `AT_LATEST` : la réponse inclut uniquement les partitions actuellement ouvertes du flux de données. 
  + `AT_TIMESTAMP` : la réponse inclut toutes les partitions dont l'horodatage de début est inférieur ou égal à l'horodatage donné et l'horodatage de fin est supérieur ou égal à l'horodatage donné ou encore ouvertes.

  `ShardFilter` est utilisé lors de la création de baux pour une table des baux vide afin d'initialiser les baux pour un instantané des partitions spécifiées à `KinesisClientLibConfiguration#initialPositionInStreamExtended`.

  Pour plus d’informations sur `ShardFilter`, consultez [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Au lieu que tous les travailleurs effectuent la lease/shard synchronisation pour maintenir le tableau des baux à jour avec les dernières partitions du flux de données, un seul responsable des travailleurs élu effectue la synchronisation entre le bail et la partition.
+ KCL 1.14 utilise le paramètre de `ChildShards` retour de `GetRecords` et pour effectuer une lease/shard synchronisation qui se produit `SHARD_END` pour les `SubscribeToShard` APIs partitions fermées, permettant à un travailleur KCL de créer des baux uniquement pour les partitions enfants de la partition qu'il a fini de traiter. Pour plus d’informations, consultez [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) et [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Avec les modifications ci-dessus, le comportement de KCL passe d'un modèle où toutes les applications de travail connaissent toutes les partitions existantes à un modèle où les applications de travail ne connaissent que les partitions enfants des partitions dont chaque application est propriétaire. Par conséquent, outre la synchronisation qui se produit lors des événements de démarrage et de refonte des applications grand public, KCL effectue désormais des shard/lease analyses périodiques supplémentaires afin d'identifier les éventuelles lacunes dans le tableau des baux (en d'autres termes, pour en savoir plus sur toutes les nouvelles partitions) afin de s'assurer que la plage de hachage complète du flux de données est traitée et de créer des baux pour celles-ci si nécessaire. `PeriodicShardSyncManager`est le composant responsable de l'exécution des lease/shard analyses périodiques. 

  Lorsque `KinesisClientLibConfiguration#shardSyncStrategyType` est défini sur `ShardSyncStrategyType.SHARD_END`, `PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` est utilisé pour déterminer le seuil du nombre d'analyses consécutives révélant des lacunes dans la table des baux, après lequel une synchronisation des partitions est imposée. Lorsque `KinesisClientLibConfiguration#shardSyncStrategyType` est défini sur `ShardSyncStrategyType.PERIODIC`, `leasesRecoveryAuditorInconsistencyConfidenceThreshold` est ignoré.

  Pour plus d'informations sur `PeriodicShardSyncManager` KCL 1.14, consultez [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999) \$1L987 -L999.

  Dans KCL 1.14, une nouvelle option de configuration est disponible pour la configuration de `PeriodicShardSyncManager` dans `LeaseManagementConfig` :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/streams/latest/dev/shared-throughput-kcl-consumers.html)

  De nouvelles CloudWatch mesures sont également désormais émises pour surveiller l'état de santé du`PeriodicShardSyncManager`. Pour de plus amples informations, veuillez consulter [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ KCL 1.14 prend désormais également en charge le nettoyage différé des baux. Les baux sont supprimés de manière asynchrone par `LeaseCleanupManager` lorsqu'ils atteignent `SHARD_END`, c'est-à-dire lorsqu'une partition a dépassé la période de conservation du flux de données ou a été fermée à la suite d'une opération de repartitionnement.

  De nouvelles options de configuration sont disponibles pour la configuration de `LeaseCleanupManager`.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ Incluant une optimisation de `KinesisShardSyncer` pour créer des baux uniquement pour une couche de partition.

## Traitez plusieurs flux de données avec la même application grand public KCL 2.x pour Java
<a name="shared-throughput-kcl-multistream"></a>

Cette section décrit les modifications suivantes apportées à KCL 2.x pour Java, qui vous permettent de créer des applications grand public KCL capables de traiter plusieurs flux de données à la fois. 

**Important**  
Le traitement multiflux n'est pris en charge que dans KCL 2.x pour Java, à partir de KCL 2.3 pour Java et versions ultérieures.   
Le traitement multiflux n'est PAS pris en charge pour les autres langages dans lesquels KCL 2.x peut être mis en œuvre.  
Le traitement multiflux n'est PAS pris en charge dans les versions de KCL 1.x.
+ **MultistreamTracker interface**

  Pour créer une application grand public capable de traiter plusieurs flux en même temps, vous devez implémenter une nouvelle interface appelée [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Cette interface inclut la méthode `streamConfigList` qui renvoie la liste des flux de données et leurs configurations à traiter par l'application consommateur KCL. Notez que les flux de données en cours de traitement peuvent être modifiés pendant l'exécution de l'application consommateur. `streamConfigList` est appelée périodiquement par la KCL pour connaître l'évolution des flux de données à traiter.

  La `streamConfigList` méthode remplit la [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)liste. 

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  Notez que les champs `StreamIdentifier` et `InitialPositionInStreamExtended` sont obligatoires, alors que `consumerArn` est facultatif. Vous ne devez fournir le code `consumerArn` que si vous utilisez KCL 2.x pour mettre en œuvre une application consommateur à débit amélioré.

  Pour plus d'informations sur`StreamIdentifier`, consultez [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). Pour créer une`StreamIdentifier`, nous vous recommandons de créer une instance multistream à partir de `streamArn` et `streamCreationEpoch` qui est disponible dans les versions 2.5.0 et ultérieures. Dans KCL v2.3 et v2.4, qui ne sont pas compatibles`streamArm`, créez une instance multistream en utilisant le format. `account-id:StreamName:streamCreationTimestamp` Ce format sera obsolète et ne sera plus pris en charge à compter de la prochaine version majeure.

  `MultistreamTracker` inclut également une stratégie pour supprimer les baux des anciens flux dans la table des baux (`formerStreamsLeasesDeletionStrategy`). Notez que la stratégie NE PEUT PAS être modifiée pendant l'exécution de l'application consommateur. Pour plus d'informations, consultez [https://github.com/awslabs/amazon-kinesis-clientb/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy/blob/0c5042dadf794fe988438436252a5a8fe70b6b0](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java) .java
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)est une classe à l'échelle de l'application que vous pouvez utiliser pour spécifier tous les paramètres de configuration KCL 2.x à utiliser lors de la création de votre application client KCL. `ConfigsBuilder`la classe prend désormais en charge l'`MultistreamTracker`interface. Vous pouvez initialiser l'un ConfigsBuilder ou l'autre avec le nom du flux de données à partir duquel les enregistrements seront consommés :

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  Vous pouvez également l'initialiser ConfigsBuilder avec `MultiStreamTracker` si vous souhaitez implémenter une application client KCL qui traite plusieurs flux en même temps.

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ Lorsque la prise en charge des flux multiples est mise en œuvre pour votre application consommateur KCL, chaque ligne de la table des baux de l'application contient désormais l'identifiant de la partition et le nom du flux des multiples flux de données traités par cette application. 
+ Lorsque la prise en charge de flux multiples pour votre application consommateur KCL est mise en œuvre, la easeKey prend la structure suivante : `account-id:StreamName:streamCreationTimestamp:ShardId`. Par exemple, `111111111:multiStreamTest-1:12345:shardId-000000000336`.
**Important**  
Lorsque votre application consommateur KCL existante est configurée pour ne traiter qu'un seul flux de données, la leaseKey (qui est la clé de hachage de la table des baux) est l'ID de la partition. Si vous reconfigurez cette application consommateur KCL existante pour traiter des flux de données multiples, votre table des baux est rompue, car avec la prise en charge de flux multiples, la structure de la leaseKey doit être la suivante : `account-id:StreamName:StreamCreationTimestamp:ShardId`.

## Utiliser le KCL avec le registre des AWS Glue schémas
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

Vous pouvez intégrer vos flux de données Kinesis au registre des AWS Glue schémas. Le registre des AWS Glue schémas vous permet de découvrir, de contrôler et de faire évoluer les schémas de manière centralisée, tout en garantissant que les données produites sont validées en permanence par un schéma enregistré. Un schéma définit la structure et le format d'un enregistrement de données. Un schéma est une spécification versionnée pour la publication, la consommation ou le stockage des données fiables. Le registre des AWS Glue schémas vous permet d'améliorer la qualité end-to-end des données et la gouvernance des données au sein de vos applications de streaming. Pour plus d'informations, consultez le registre [AWS Glue Schema](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) (français non garanti). L'un des moyens de configurer cette intégration consiste à utiliser la KCL en Java. 

**Important**  
Actuellement, l'intégration de Kinesis Data Streams AWS Glue et de Schema Registry n'est prise en charge que pour les flux de données Kinesis qui utilisent des consommateurs KCL 2.3 implémentés en Java. La prise en charge multilingue n'est pas fournie. Les consommateurs KCL 1.0 ne sont pas pris en charge. Les consommateurs KCL 2.x antérieurs à KCL 2.3 ne sont pas pris en charge.

Pour obtenir des instructions détaillées sur la façon de configurer l'intégration de Kinesis Data Streams à Schema Registry à l'aide de la KCL, consultez la section « Interaction avec les données à KPL/KCL l'aide des bibliothèques » [dans Use Case : Integrating Amazon Kinesis Data Streams with AWS the](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Glue Schema Registry.

# Développez des consommateurs personnalisés avec un débit partagé
<a name="shared-throughput-consumers"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Si vous n'avez pas besoin d'un débit dédié lors de la réception des données à partir de Kinesis Data Streams, et si vous n'avez pas besoin de lire les retards de propagation sous 200 ms, vous pouvez créer des applications consommateur comme décrit dans les rubriques suivantes. Vous pouvez utiliser la bibliothèque cliente Kinesis (KCL) ou le. AWS SDK pour Java

**Topics**
+ [Développez des consommateurs personnalisés avec un débit partagé à l'aide de KCL](custom-kcl-consumers.md)

Pour plus d'informations sur la création d'applications consommateur pouvant recevoir des enregistrements provenant des flux de données Kinesis avec un débit dédié, consultez [Développez des clients fans améliorés grâce à un débit dédié](enhanced-consumers.md).

# Développez des consommateurs personnalisés avec un débit partagé à l'aide de KCL
<a name="custom-kcl-consumers"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

L'une des méthodes de développement d'une application client personnalisée avec un débit partagé consiste à utiliser la bibliothèque client Kinesis (KCL). 

Choisissez l'une des rubriques suivantes pour la version KCL que vous utilisez.

**Topics**
+ [Développez les consommateurs de KCL 1.x](developing-consumers-with-kcl.md)
+ [Développez des consommateurs de KCL 2.x](developing-consumers-with-kcl-v2.md)

# Développez les consommateurs de KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez développer une application consommateur pour Amazon Kinesis Data Streams à l'aide de la bibliothèque client Kinesis (KCL). 

Pour plus d'informations sur KCL, consultez [À propos de KCL (versions précédentes)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Choisissez l'une des rubriques suivantes en fonction de l'option que vous souhaitez utiliser.

**Topics**
+ [Développez un client de bibliothèque cliente Kinesis en Java](kinesis-record-processor-implementation-app-java.md)
+ [Développez un client de bibliothèque cliente Kinesis dans Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Développez un client de bibliothèque client Kinesis dans .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Développez un client de bibliothèque cliente Kinesis en Python](kinesis-record-processor-implementation-app-py.md)
+ [Développez un client de bibliothèque cliente Kinesis dans Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Développez un client de bibliothèque cliente Kinesis en Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Java. Pour consulter la référence Javadoc, consultez la rubrique [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) relative à Class. AmazonKinesisClient

Pour télécharger la KCL Java depuis GitHub, accédez à la [bibliothèque cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client)Java). Pour rechercher la KCL Java sur Apache Maven, consultez la page [KCL search results](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Pour télécharger un exemple de code pour une application client Java KCL à partir de GitHub, rendez-vous sur la page d'[exemple de projet KCL pour Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) sur. GitHub 

L'exemple d'application utilise [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). Vous pouvez modifier la configuration de la journalisation dans la méthode statique `configure` définie dans le fichier `AmazonKinesisApplicationSample.java`. *Pour plus d'informations sur l'utilisation d'Apache Commons Logging avec Log4j et les applications AWS Java, consultez la section [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) dans le manuel du développeur.AWS SDK pour Java *

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL dans Java :

**Topics**
+ [Implémenter les méthodes IRecord du processeur](#kinesis-record-processor-implementation-interface-java)
+ [Implémenter une fabrique de classes pour l'interface IRecord du processeur](#kinesis-record-processor-implementation-factory-java)
+ [Créer un travailleur](#kcl-java-worker)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-java)
+ [Migrer vers la version 2 de l'interface du processeur d'enregistrements](#kcl-java-v2-migration)

## Implémenter les méthodes IRecord du processeur
<a name="kinesis-record-processor-implementation-interface-java"></a>

La KCL prend en charge actuellement les deux versions de l'interface `IRecordProcessor` : interface d'origine disponible avec la première version de la KCL et la version 2 disponible à partir de la KCL version 1.5.0. Les deux interfaces sont entièrement prises en charge. Votre choix dépend des exigences spécifiques à votre scénario. Reportez-vous à vos Javadocs locales ou au code source pour voir toutes les différences. Les sections suivantes décrivent l'implémentation minimale pour la mise en route.

**Topics**
+ [Interface d'origine (Version 1)](#kcl-java-interface-original)
+ [Interface mise à jour (version 2)](#kcl-java-interface-v2)

### Interface d'origine (Version 1)
<a name="kcl-java-interface-original"></a>

L'interface d'origine `IRecordProcessor` (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) expose les méthodes de processeur d'enregistrements suivantes que votre application producteur doit implémenter. L'exemple fournit des implémentations que vous pouvez utiliser comme point de départ (voir `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialisation**  
La KCL appelle la méthode `initialize` lorsque le processeur d'enregistrements est instancié, en passant un ID de partition spécifique comme paramètre. Ce processeur d'enregistrements traite uniquement cette partition et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Kinesis Data Streams a la sémantique *au moins une fois*, ce qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut être traitée par plusieurs applications de travail, consultez la page [Utilisez le redécoupage, la mise à l'échelle et le traitement parallèle pour modifier le nombre de partitions](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
La KCL appelle la méthode `processRecords`, en passant une liste d'enregistrement de données issue de la partition spécifiée par la méthode `initialize(shardId)`. Le processeur d'enregistrements traite les données contenues dans ces enregistrements selon la sémantique de l'application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition. L'application de travail utilise ces valeurs lors du traitement des données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. La classe `Record` expose les méthodes suivantes qui permettent d'accéder aux données, numéro de séquence et clé de partition de l'enregistrement. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

Dans l'exemple, la méthode privée `processRecordsWithRetries` contient du code qui indique comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL assure ce suivi à votre place en passant un objet Checkpointer (`IRecordProcessorCheckpointer`) à `processRecords`. Le processeur d'enregistrements appelle la méthode `checkpoint` sur cette interface pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si le travail échoue, la KCL utilise ces informations pour redémarrer le traitement de la partition au niveau du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé `checkpoint` pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas de paramètre, la KCL suppose que l'appel de `checkpoint` signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler `checkpoint` seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler `checkpoint` à chaque appel de `processRecords`. Un processeur pourrait, par exemple, appeler `checkpoint` à chaque troisième appel de `processRecords`. Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à `checkpoint`. Dans ce cas, la KCL suppose que tous les enregistrements ont été traités jusqu'à cet enregistrement uniquement.

Dans l'exemple, la méthode privée `checkpoint` montre comment appeler `IRecordProcessorCheckpointer.checkpoint` en utilisant la logique appropriée de traitement des exceptions et de nouvelle tentative.

La KCL s'appuie sur `processRecords` pour gérer toutes les exceptions générées par le traitement des enregistrements de données. Si une exception est déclenchée depuis `processRecords`, la KCL ignore les enregistrements de données qui ont été transmis avant l'exception. En d'autres termes, ces enregistrements ne sont pas renvoyés au processeur d'enregistrements qui a lancé l'exception ou à tout autre processeur d'enregistrement dans l'application consommateur.

**shutdown**  
La KCL appelle la méthode `shutdown` soit à la fin du traitement (le motif de fermeture étant `TERMINATE`) ou lorsque l'application de travail ne répond plus (la raison de fermeture ayant la valeur `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

La KCL passe également une interface `IRecordProcessorCheckpointer` à `shutdown`. Si le motif de fermeture est `TERMINATE`, le processeur d'enregistrements doit terminer le traitement des enregistrements de données et appeler ensuite la méthode `checkpoint` sur cette interface.

### Interface mise à jour (version 2)
<a name="kcl-java-interface-v2"></a>

L'interface `IRecordProcessor` mise à jour (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) expose les méthodes de processeur d'enregistrements suivantes que votre application producteur doit implémenter : 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Tous les arguments provenant de la version initiale de l'interface sont accessibles via les méthodes get sur les objets de conteneur. Par exemple, pour extraire la liste des enregistrements dans `processRecords()`, vous pouvez utiliser `processRecordsInput.getRecords()`.

A partir de la version 2 de cette interface (KCL 1.5.0 et ultérieure), les nouvelles entrées suivantes sont disponibles en plus des entrées fournies par l'interface d'origine :

Numéro de séquence de début  
Dans l'objet `InitializationInput` passé à l'opération `initialize()`, le numéro de séquence de début à partir duquel les enregistrements sont fournis à l'instance de processeur d'enregistrements. C'est le numéro de séquence qui a été contrôlé en dernier par l'instance de processeur d'enregistrements qui a traité précédemment la même partition. Il est fourni si votre application a besoin de cette information. 

Numéro de séquence de point de contrôle en attente  
Dans l'objet `InitializationInput` passé à l'opération `initialize()`, le numéro de séquence de point de contrôle en attente (le cas échéant) qui n'a pas pu être validé avant l'arrêt de l'instance de processeur d'enregistrements précédente..

## Implémenter une fabrique de classes pour l'interface IRecord du processeur
<a name="kinesis-record-processor-implementation-factory-java"></a>

Vous avez aussi besoin d'implémenter une fabrique pour la classe qui implémente les méthodes de processeur d'enregistrements. Lorsque votre application consommateur instancie l'application de travail, elle passe une référence à cette fabrique.

L'exemple implémente la classe Factory dans le fichier `AmazonKinesisApplicationSampleRecordProcessorFactory.java` à l'aide de l'interface de processeur d'enregistrements d'origine. Si vous voulez que la fabrique de classes crée des processeurs d'enregistrements version 2, utilisez le nom de package `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Créer un travailleur
<a name="kcl-java-worker"></a>

Comme présenté dans [Implémenter les méthodes IRecord du processeur](#kinesis-record-processor-implementation-interface-java), deux versions de l'interface de processeur d'enregistrements KCL sont disponibles, ce qui affecte la façon dont vous créez une application de travail. L'interface de processeur d'enregistrements d'origine utilise la structure de code suivante pour créer un travail :

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Avec la version 2 de l'interface de processeur d'enregistrements, vous pouvez utiliser `Worker.Builder` pour créer un travail sans avoir à vous soucier du constructeur à utiliser et de l'ordre des arguments. L'interface de processeur d'enregistrements mise à jour utilise la structure de code suivante pour créer un travail :

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-java"></a>

L'exemple fournit les valeurs par défaut des propriétés de configuration. Ces données de configuration du travail sont ensuite consolidées dans un objet `KinesisClientLibConfiguration`. Cet objet et une référence à la fabrique de classes pour `IRecordProcessor` sont passés dans l'appel qui instancie l'application de travail. Vous pouvez remplacer ces propriétés par vos propres valeurs en utilisant un fichier de propriétés Java (voir `AmazonKinesisApplicationSample.java`).

### Application name (Nom de l'application)
<a name="configuration-property-application-name"></a>

La KCL demande un nom d'application qui est unique dans l'ensemble de vos applications et dans les tableaux Amazon DynamoDB de la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurer les informations d'identification
<a name="kinesis-record-processor-cred-java"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Par exemple, si vous exécutez votre client sur une instance EC2, nous vous recommandons de lancer l'instance avec un rôle IAM. Les informations d'identification AWS qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

L'exemple d'application tente d'abord d'extraire les informations d'identification IAM à partir des métadonnées d'instance : 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Si l'exemple d'application ne peut pas obtenir les informations d'identification à partir des métadonnées d'instance, il tente d'extraire les informations d'identification d'un fichier de propriétés :

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Pour plus d'informations sur les métadonnées d'instance, consultez la section [Métadonnées d'instance](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) dans le *guide de l'utilisateur Amazon EC2*.

### Utiliser l'ID du travailleur pour plusieurs instances
<a name="kinesis-record-processor-workerid-java"></a>

L'exemple de code d'initialisation crée un ID `workerId` pour l'application de travail, en utilisant le nom de l'ordinateur local et en y ajoutant un identifiant unique dans le monde entier, comme illustré dans l'extrait de code ci-après. Cette approche prend en charge le scénario où plusieurs instances de l'application consommateur sont exécutées sur le même ordinateur.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrer vers la version 2 de l'interface du processeur d'enregistrements
<a name="kcl-java-v2-migration"></a>

Si vous souhaitez migrer le code qui utilise l'interface d'origine, les étapes suivantes sont nécessaires en plus de celles décrites précédemment :

1. Changez la classe de processeur d'enregistrements pour importer la version 2 de l'interface de processeur d'enregistrements :

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Modifiez les références aux entrées pour utiliser des méthodes `get` sur les objets de conteneur. Par exemple, dans l'opération `shutdown()`, remplacez `checkpointer` par `shutdownInput.getCheckpointer()`.

1. Changez la classe de la fabrique de processeurs d'enregistrements pour importer la version 2 de l'interface de fabrique de processeurs d'enregistrements :

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Modifiez la construction de l'application de travail pour utiliser `Worker.Builder`. Par exemple :

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Développez un client de bibliothèque cliente Kinesis dans Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Node.js.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Node.js et que vous écrivez votre application grand public entièrement dans Node.js, Java doit toujours être installé sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger le fichier KCL Node.js depuis GitHub, accédez à la [bibliothèque cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client-nodejs)Node.js).

**Téléchargements des exemples de code**

Il y a deux exemples de code disponibles pour la KCL pour Node.js :
+ [basic-sample​​](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Il est utilisé dans les sections suivantes pour illustrer les principes fondamentaux de construction d'une application consommateur KCL en Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Il est un peu plus avancé et se sert d'un scénario réel. A utiliser après vous être familiarisé avec l'exemple de code de base. Cet exemple n'est pas présenté ici, mais est accompagné d'un fichier README qui contient plus d'informations.

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en Node.js :

**Topics**
+ [Implémenter le processeur d'enregistrement](#kinesis-record-processor-implementation-interface-nodejs)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-nodejs)

## Implémenter le processeur d'enregistrement
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

L'application consommateur la plus simple possible qui utilise la KCL pour Node.js doit implémenter une fonction `recordProcessor`, laquelle contient les fonctions `initialize`, `processRecords` et `shutdown`. L'exemple fournit une implémentation que vous pouvez utiliser comme point de départ (voir `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialisation**  
La KCL appelle la fonction `initialize` au démarrage du processeur d'enregistrements. Ce processeur d'enregistrements traite uniquement l'ID de partition passé à `initializeInput.shardId` et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique *au moins une fois*, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut éventuellement être traitée par plusieurs applications de travail, consultez la page [Utilisez le redécoupage, la mise à l'échelle et le traitement parallèle pour modifier le nombre de partitions](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 La KCL appelle cette fonction en indiquant une entrée qui contient une liste d'enregistrements de données de la partition spécifiée pour la fonction `initialize`. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition, que l'application de travail peut utiliser pour traiter les données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. Le dictionnaire `record` expose les paires clé-valeur suivantes pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

```
record.data
record.sequenceNumber
record.partitionKey
```

Notez que les données sont encodées en Base64.

Dans l'exemple de base, la fonction `processRecords` contient du code qui indique comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL se charge d'assurer ce suivi avec un objet `checkpointer` passé comme `processRecordsInput.checkpointer`. Le processeur d'enregistrements appelle la fonction `checkpointer.checkpoint` pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si l'application de travail échoue, la KCL utilise ces informations lorsque vous redémarrez le traitement de la partition pour continuer à partir du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé `checkpoint` pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas le numéro de séquence à la fonction `checkpoint`, la KCL suppose que l'appel vers `checkpoint` signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler `checkpoint` **seulement** après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler `checkpoint` à chaque appel de `processRecords`. Un processeur peut, par exemple, appeler un `checkpoint` appel tous les trois, ou un événement externe à votre processeur d'enregistrement, tel qu'un verification/validation service personnalisé que vous avez mis en œuvre. 

Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à `checkpoint`. Dans ce cas, la KCL suppose que tous les enregistrements ont été traités jusqu'à cet enregistrement uniquement.

L'exemple d'application de base montre l'appel le plus simple possible de la fonction `checkpointer.checkpoint`. Vous pouvez ajouter à la fonction une autre logique de points de contrôle nécessaire pour votre application consommateur à ce stade.

**shutdown**  
La KCL appelle la fonction `shutdown` soit à la fin du traitement (`shutdownInput.reason` est `TERMINATE`) ou si l'application de travail ne répond plus (`shutdownInput.reason` est `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

La KCL passe également un objet `shutdownInput.checkpointer` à `shutdown`. Si le motif de fermeture est `TERMINATE`, vous devez vous assurer que le processeur d'enregistrements a fini de traiter les enregistrements de données et appeler ensuite la fonction `checkpoint` sur cet objet.

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-nodejs"></a>

L'exemple fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir `sample.properties` dans l'exemple de base).

### Application name (Nom de l'application)
<a name="kinesis-record-processor-application-name-nodejs"></a>

La KCL nécessite une d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurer les informations d'identification
<a name="kinesis-record-processor-credentials-nodejs"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété `AWSCredentialsProvider` pour définir un fournisseur d'informations d'identification. Le fichier `sample.properties` doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la [chaîne des fournisseurs d'informations d'identification par défaut](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si vous exécutez votre client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. AWS les informations d'identification qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

L'exemple suivant configure la KCL pour qu'elle traite un flux de données Kinesis appelé `kclnodejssample` à l'aide du processeur d'enregistrements fourni dans `sample_kcl_app.js` :

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Développez un client de bibliothèque client Kinesis dans .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente .NET.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour .NET et que vous écrivez votre application grand public entièrement en .NET, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger le .NET KCL depuis GitHub, accédez à la [bibliothèque cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Pour télécharger un exemple de code pour une application client .NET KCL, rendez-vous sur la page du [projet client d'exemple KCL pour .NET](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) sur. GitHub

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en .NET :

**Topics**
+ [Implémenter les méthodes de classe IRecord Processor](#kinesis-record-processor-implementation-interface-dotnet)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-dotnet)

## Implémenter les méthodes de classe IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

L'application consommateur doit implémenter les méthodes suivantes pour `IRecordProcessor`. L'exemple d'application consommateur fournit des implémentations que vous pouvez utiliser comme point de départ (voir la classe `SampleRecordProcessor` dans `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Initialiser**  
La KCL appelle cette méthode lorsque le processeur d'enregistrements est instancié, en passant un ID de partition spécifique dans le paramètre `input` (`input.ShardId`). Ce processeur d'enregistrements traite uniquement cette partition et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique *au moins une fois*, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut éventuellement être traitée par plusieurs applications de travail, consultez la page [Utilisez le redécoupage, la mise à l'échelle et le traitement parallèle pour modifier le nombre de partitions](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
La KCL appelle cette méthode, en passant une liste d'enregistrements de données dans le paramètre `input` (`input.Records`), qui sont issues de la partition spécifiée par la méthode `Initialize`. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition. L'application de travail utilise ces valeurs lors du traitement des données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. La classe `Record` expose le code suivant pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

Dans l'exemple, la méthode `ProcessRecordsWithRetries` contient du code qui montre comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL assure ce suivi à votre place en passant un objet `Checkpointer` à `ProcessRecords` (`input.Checkpointer`). Le processeur d'enregistrements appelle la méthode `Checkpointer.Checkpoint` pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si le travail échoue, la KCL utilise ces informations pour redémarrer le traitement de la partition au niveau du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé `Checkpointer.Checkpoint` pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas de paramètre, la KCL suppose que l'appel de `Checkpointer.Checkpoint` signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler `Checkpointer.Checkpoint` seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler `Checkpointer.Checkpoint` à chaque appel de `ProcessRecords`. Un processeur peut, par exemple, appeler `Checkpointer.Checkpoint` tous les trois ou quatre appels. Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à `Checkpointer.Checkpoint`. Dans ce cas, la KCL suppose que les enregistrements ont été traités seulement jusqu'à cet enregistrement.

Dans l'exemple, la méthode privée `Checkpoint(Checkpointer checkpointer)` montre comment appeler la méthode `Checkpointer.Checkpoint` en utilisant la logique appropriée de traitement des exceptions et de nouvelle tentative.

La KCL pour .NET gère les exceptions différemment des autres bibliothèques de langage KCL, car elle ne gère pas toutes les exceptions générées par le traitement des enregistrements de données. Toutes les exceptions non interceptées dans le code utilisateur bloquent le programme.

**Fermeture**  
La KCL appelle la méthode `Shutdown` soit à la fin du traitement (le motif de fermeture étant `TERMINATE`) ou lorsque l'application de travail ne répond plus (la raison de fermeture `input.Reason` ayant la valeur `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

La KCL passe également un objet `Checkpointer` à `shutdown`. Si le motif de fermeture est `TERMINATE`, le processeur d'enregistrements doit terminer le traitement des enregistrements de données et appeler ensuite la méthode `checkpoint` sur cette interface.

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-dotnet"></a>

L'exemple d'application consommateur fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir `SampleConsumer/kcl.properties`).

### Application name (Nom de l'application)
<a name="modify-kinesis-record-processor-application-name"></a>

La KCL nécessite une d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurer les informations d'identification
<a name="kinesis-record-processor-creds-dotnet"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété `AWSCredentialsProvider` pour définir un fournisseur d'informations d'identification. Le fichier [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la [chaîne des fournisseurs d'informations d'identification par défaut](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si vous exécutez votre application client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. Les informations d'identification AWS qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

Dans l'exemple, le fichier de propriétés configure la KCL pour traiter un flux de données Kinesis appelé « words » à l'aide du processeur d'enregistrements fourni dans `AmazonKinesisSampleConsumer.cs`. 

# Développez un client de bibliothèque cliente Kinesis en Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Python.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Python et que vous écrivez votre application grand public entièrement en Python, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger la KCL Python depuis GitHub, accédez à la bibliothèque [cliente Kinesis (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Pour télécharger un exemple de code pour une application client KCL Python, rendez-vous sur la page d'exemple de [projet KCL pour Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) sur. GitHub

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en Python :

**Topics**
+ [Implémenter les méthodes RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-py)

## Implémenter les méthodes RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

La classe `RecordProcess` doit étendre la classe `RecordProcessorBase` pour implémenter les méthodes ci-après. L'exemple fournit des implémentations que vous pouvez utiliser comme point de départ (voir `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialisation**  
La KCL appelle la méthode `initialize` lorsque le processeur d'enregistrements est instancié, en passant un ID de partition spécifique comme paramètre. Ce processeur d'enregistrements traite uniquement cette partition et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique *au moins une fois*, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut être traitée par plusieurs applications de travail, consultez la page [Utilisez le redécoupage, la mise à l'échelle et le traitement parallèle pour modifier le nombre de partitions](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 La KCL appelle cette méthode en passant une liste d'enregistrements de données issus de la partition spécifiée par la méthode `initialize`. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition. L'application de travail utilise ces valeurs lors du traitement des données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. Le dictionnaire `record` expose les paires clé-valeur suivantes pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Notez que les données sont encodées en Base64.

Dans l'exemple, la méthode `process_records` contient du code qui montre comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL assure ce suivi à votre place en passant un objet `Checkpointer` à `process_records`. Le processeur d'enregistrements appelle la méthode `checkpoint` sur cet objet pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si le travail échoue, la KCL utilise ces informations pour redémarrer le traitement de la partition au niveau du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé `checkpoint` pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas de paramètre, la KCL suppose que l'appel de `checkpoint` signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler `checkpoint` seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler `checkpoint` à chaque appel de `process_records`. Un processeur peut, par exemple, appeler `checkpoint` tous les trois appels. Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à `checkpoint`. Dans ce cas, la KCL suppose que tous les enregistrements ont été traités jusqu'à cet enregistrement uniquement.

Dans l'exemple, la méthode privée `checkpoint` montre comment appeler la méthode `Checkpointer.checkpoint` en utilisant la logique appropriée de traitement des exceptions et de nouvelle tentative.

La KCL s'appuie sur `process_records` pour gérer toutes les exceptions générées par le traitement des enregistrements de données. Si une exception est déclenchée depuis `process_records`, la KCL ignore les enregistrements de données qui ont été transmis à `process_records` avant l'exception. En d'autres termes, ces enregistrements ne sont pas renvoyés au processeur d'enregistrements qui a lancé l'exception ou à tout autre processeur d'enregistrement dans l'application consommateur.

**shutdown**  
 La KCL appelle la méthode `shutdown` soit à la fin du traitement (le motif de fermeture étant `TERMINATE`) ou lorsque l'application de travail ne répond plus (la raison de fermeture `reason` ayant la valeur `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

 La KCL passe également un objet `Checkpointer` à `shutdown`. Si le motif de fermeture `reason` est `TERMINATE`, le processeur d'enregistrements doit terminer le traitement des enregistrements de données et appeler ensuite la méthode `checkpoint` sur cette interface.

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-py"></a>

L'exemple fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir `sample.properties`).

### Application name (Nom de l'application)
<a name="kinesis-record-processor-application-name-py"></a>

La KCL nécessite un nom d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution qui sont associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurer les informations d'identification
<a name="kinesis-record-processor-creds-py"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété `AWSCredentialsProvider` pour définir un fournisseur d'informations d'identification. Le fichier [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la [chaîne des fournisseurs d'informations d'identification par défaut](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Si vous exécutez votre application client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. Les informations d'identification AWS qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

Dans l'exemple, le fichier de propriétés configure la KCL pour traiter un flux de données Kinesis appelé « words » à l'aide du processeur d'enregistrements fourni dans `sample_kclpy_app.py`. 

# Développez un client de bibliothèque cliente Kinesis dans Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Ruby.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Ruby et que vous écrivez votre application grand public entièrement en Ruby, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger la KCL Ruby depuis GitHub, accédez à la [bibliothèque cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client-ruby)Ruby). Pour télécharger un exemple de code pour une application grand public Ruby KCL, rendez-vous sur la page d'[exemple de projet KCL for Ruby](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) sur. GitHub

Pour plus d'informations sur la bibliothèque d'assistance KCL Ruby, consultez la [documentation KCL Ruby Gems](http://www.rubydoc.info/gems/aws-kclrb).

# Développez des consommateurs de KCL 2.x
<a name="developing-consumers-with-kcl-v2"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Cette rubrique montre comment utiliser la version 2.0 de la bibliothèque client Kinesis (KCL). 

Pour plus d'informations sur la KCL, consultez la présentation fournie dans [Développement d'applications consommateur à l'aide de la bibliothèque client Kinesis 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html) (français non garanti).

Choisissez l'une des rubriques suivantes en fonction de l'option que vous souhaitez utiliser.

**Topics**
+ [Développez un client de bibliothèque cliente Kinesis en Java](kcl2-standard-consumer-java-example.md)
+ [Développez un client de bibliothèque cliente Kinesis en Python](kcl2-standard-consumer-python-example.md)
+ [Développez de nouveaux fans avec KCL 2.x](building-enhanced-consumers-kcl-retired.md)

# Développez un client de bibliothèque cliente Kinesis en Java
<a name="kcl2-standard-consumer-java-example"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Le code suivant présente un exemple d'implémentation dans Java de `ProcessorFactory` et `RecordProcessor`. Si vous souhaitez tirer parti de la fonctionnalité de déploiement amélioré, consultez [Utilisation d'applications consommateur avec le déploiement amélioré ](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Développez un client de bibliothèque cliente Kinesis en Python
<a name="kcl2-standard-consumer-python-example"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Python.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. *MultiLangDaemon* Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Python et que vous écrivez votre application grand public entièrement en Python, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Pour télécharger la KCL Python depuis GitHub, accédez à la bibliothèque [cliente Kinesis (Python)](https://github.com/awslabs/amazon-kinesis-client-python). Pour télécharger un exemple de code pour une application client KCL Python, rendez-vous sur la page d'exemple de [projet KCL pour Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) sur. GitHub

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en Python :

**Topics**
+ [Implémenter les méthodes RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modifier les propriétés de configuration](#kinesis-record-processor-initialization-py)

## Implémenter les méthodes RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

La classe `RecordProcess` doit étendre la classe `RecordProcessorBase` pour implémenter les méthodes ci-après :

```
initialize
process_records
shutdown_requested
```

Cet exemple fournit des implémentations que vous pouvez utiliser comme point de départ.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modifier les propriétés de configuration
<a name="kinesis-record-processor-initialization-py"></a>

L'exemple fournit les valeurs par défaut des propriétés de configuration, comme illustré dans le script suivant. Vous pouvez remplacer ces propriétés par vos propres valeurs.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Application name (Nom de l'application)
<a name="kinesis-record-processor-application-name-py"></a>

La KCL nécessite un nom d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :
+ Tous les programmes d'exécution qui sont associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.
+ La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour de plus amples informations, veuillez consulter [Utilisez une table de location pour suivre les partitions traitées par l'application client KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Informations d’identification
<a name="kinesis-record-processor-creds-py"></a>

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs [d'informations d'identification par défaut](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Vous pouvez utiliser la propriété `AWSCredentialsProvider` pour définir un fournisseur d'informations d'identification. Si vous exécutez votre application client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. AWS les informations d'identification qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

# Développez de nouveaux fans avec KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Les applications consommateur utilisant la *diffusion améliorée* dans Amazon Kinesis Data Streams peuvent recevoir des enregistrements provenant d'un flux de données avec un débit dédié de jusqu'à 2 Mo de données par seconde par partition. Ce type d'application consommateur n'a pas besoin de se heurter à d'autres applications consommateur qui reçoivent des données à partir du flux. Pour de plus amples informations, veuillez consulter [Développez des clients fans améliorés grâce à un débit dédié](enhanced-consumers.md).

Vous pouvez utiliser la version 2.0 ou ultérieure de la bibliothèque client Kinesis (KCL) pour développer des applications qui utilisent la diffusion améliorée afin de recevoir des données provenant de flux. La KCL abonne automatiquement votre application à toutes les partitions d'un flux et garantit que votre application client peut lire avec une valeur de débit de 2 par partition. MB/sec Si vous souhaitez utiliser la KCL sans activer le déploiement amélioré, consultez [Développement d'applications consommateur à l'aide de la bibliothèque client Kinesis 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Développez des consommateurs de fans améliorés à l'aide de KCL 2.x en Java](building-enhanced-consumers-kcl-java.md)

# Développez des consommateurs de fans améliorés à l'aide de KCL 2.x en Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Vous pouvez utiliser la version 2.0 ou ultérieure de la bibliothèque client Kinesis (KCL) pour développer des applications dans Amazon Kinesis Data Streams qui utilisent la diffusion améliorée afin de recevoir des données à partir de flux. Le code suivant présente un exemple d'implémentation dans Java de `ProcessorFactory` et `RecordProcessor`.

Il est recommandé d'utiliser `KinesisClientUtil` pour créer `KinesisAsyncClient` et configurer `maxConcurrency` dans `KinesisAsyncClient`.

**Important**  
Le client Amazon Kinesis peut voir une augmentation significative de la latence, sauf si vous configurez `KinesisAsyncClient` pour avoir une`maxConcurrency` suffisamment élevée pour autoriser tous les baux et les utilisations supplémentaires de `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Migrer les consommateurs de KCL 1.x vers KCL 2.x
<a name="kcl-migration"></a>

**Important**  
Les versions 1.x et 2.x de la bibliothèque client Amazon Kinesis (KCL) sont obsolètes. KCL 1.x arrivera end-of-support le 30 janvier 2026. Nous vous **recommandons vivement** de migrer vos applications KCL à l'aide de la version 1.x vers la dernière version de KCL avant le 30 janvier 2026. Pour trouver la dernière version de KCL, consultez la page [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) sur. GitHub Pour plus d'informations sur les dernières versions de KCL, consultez[Utiliser la bibliothèque cliente Kinesis](kcl.md). Pour plus d'informations sur la migration de KCL 1.x vers KCL 3.x, consultez. [Migration de la KCL 1.x vers la KCL 3.x](kcl-migration-1-3.md)

Cette rubrique explique les différences entre les versions 1.x et 2.x de la bibliothèque client Kinesis (KCL). Elle montre également comment migrer votre consommateur de la version 1.x vers la version 2.x de la KCL. Après avoir migré votre client, il commencera à traiter les enregistrements à partir du dernier emplacement contrôlé.

La version 2.0 de la KCL introduit les changements d'interface suivants :


**Modifications de l'interface KCL**  

| Interface KCL 1.x | Interface KCL 2.0 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Intégré à software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [Migrer le processeur d'enregistrements](#recrod-processor-migration)
+ [Migrer l'usine de traitement des enregistrements](#recrod-processor-factory-migration)
+ [Migrer le travailleur](#worker-migration)
+ [Configuration du client Amazon Kinesis](#client-configuration)
+ [Suppression des temps d'inactivité](#idle-time-removal)
+ [Suppressions de configurations clientes](#client-configuration-removals)

## Migrer le processeur d'enregistrements
<a name="recrod-processor-migration"></a>

L'exemple suivant illustre un processeur d'enregistrements implémenté pour la KC 1.x :

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Pour migrer la classe de processeur d'enregistrements**

1. Modifiez les interfaces de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` et `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` de `software.amazon.kinesis.processor.ShardRecordProcessor`, comme suit :

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Mettez à jour les instructions `import` pour les méthodes `initialize` et `processRecords`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Remplacez la méthode `shutdown` par les nouvelles méthodes suivantes : `leaseLost`, `shardEnded` et `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Voici la version mise à jour de la classe du processeur d'enregistrements.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Migrer l'usine de traitement des enregistrements
<a name="recrod-processor-factory-migration"></a>

La fabrique de processeurs d’enregistrements est responsable de la création des processeurs d’enregistrements lorsqu’un bail est acquis. Voici un exemple de fabrique KCL 1.x.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**Pour migrer la fabrique de processeurs d'enregistrements**

1. Modifiez l'interface implémentée de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` vers `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, comme suit.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Modifiez la signature de retour pour `createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

Voici un exemple de fabrique de processeurs d'enregistrements dans 2.0 :

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrer le travailleur
<a name="worker-migration"></a>

Dans la version 2.0 de la KCL, une nouvelle classe, appelée `Scheduler`, remplace la classe `Worker`. Voici un exemple d'application de travail KCL 1.x.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**Pour migrer le worker**

1. Modifiez l’instruction `import` de la classe `Worker` pour les instructions d’importation pour les classes `Scheduler` et `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Créez le `ConfigsBuilder` et un `Scheduler` comme illustré dans l'exemple suivant.

   Il est recommandé d'utiliser `KinesisClientUtil` pour créer `KinesisAsyncClient` et configurer `maxConcurrency` dans `KinesisAsyncClient`.
**Important**  
Le client Amazon Kinesis peut voir une augmentation significative de la latence, sauf si vous configurez `KinesisAsyncClient` pour avoir une`maxConcurrency` suffisamment élevée pour autoriser tous les baux et les utilisations supplémentaires de `KinesisAsyncClient`.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Configuration du client Amazon Kinesis
<a name="client-configuration"></a>

Avec la version 2.0 de la bibliothèque client Kinesis, la configuration du client est passée d'une seule classe de configuration (`KinesisClientLibConfiguration`) à six classes de configuration. Le tableau suivant décrit la migration.


**Champs de configuration et leurs nouvelles classes**  

| Champ d'origine | Nouvelle classe de configuration | Description | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | Nom de cette application KCL. Utilisé par défaut pour le tableName et le consumerName. | 
| tableName | ConfigsBuilder | Permet de remplacer le nom du tableau utilisé par le tableau des baux Amazon DynamoDB. | 
| streamName | ConfigsBuilder | Nom du flux à partir duquel cette application traite les enregistrements. | 
| kinesisEndpoint | ConfigsBuilder | Cette option a été supprimée. Consultez Suppressions des configurations du client. | 
| dynamoDBEndpoint | ConfigsBuilder | Cette option a été supprimée. Consultez Suppressions des configurations du client. | 
| initialPositionInStreamExtended | RetrievalConfig | L'emplacement dans la partition à partir de laquelle la KCL débute l'extraction des enregistrements, en commençant par l'exécution initiale de l'application. | 
| kinesisCredentialsProvider | ConfigsBuilder | Cette option a été supprimée. Consultez Suppressions des configurations du client. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | Cette option a été supprimée. Consultez Suppressions des configurations du client. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | Cette option a été supprimée. Consultez Suppressions des configurations du client. | 
| failoverTimeMillis | LeaseManagementConfig | Nombre de millisecondes qui doivent s'écouler avant que vous puissiez considérer qu'un bail propriétaire a échoué. | 
| workerIdentifier | ConfigsBuilder | Identifiant unique qui représente cette instanciation du processeur d'applications. Il doit être unique. | 
| shardSyncIntervalMillis | LeaseManagementConfig | Délai entre les appels de synchronisation des partitions. | 
| maxRecords | PollingConfig | Permet de définir le nombre maximum d'enregistrements renvoyés par Kinesis. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | Cette option a été supprimée. Consultez Suppression du temps d'inactivité. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | Lorsqu'il est défini, le processeur d'enregistrements est appelé même si aucun enregistrement n'a été fourni depuis Kinesis. | 
| parentShardPollIntervalMillis | CoordinatorConfig | À quelle fréquence un processeur d'enregistrements doit-il interroger pour voir si la partition parent est terminée. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | Lorsqu'ils sont définis, les baux sont supprimés dès que les baux enfant ont commencé le traitement. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | Lorsqu'elles sont définies, les partitions enfant ont une partition ouverte qui est ignorée. Cela concerne principalement DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | Cette option a été supprimée. Consultez Suppressions des configurations du client. | 
| dynamoDBClientConfig | ConfigsBuilder | Cette option a été supprimée. Consultez Suppressions des configurations du client. | 
| cloudWatchClientConfig | ConfigsBuilder | Cette option a été supprimée. Consultez Suppressions des configurations du client. | 
| taskBackoffTimeMillis | LifecycleConfig | Durée d'attente pour relancer des tâches ayant échoué. | 
| metricsBufferTimeMillis | MetricsConfig | Contrôle la publication des CloudWatch métriques. | 
| metricsMaxQueueSize | MetricsConfig | Contrôle la publication des CloudWatch métriques. | 
| metricsLevel | MetricsConfig | Contrôle la publication des CloudWatch métriques. | 
| metricsEnabledDimensions | MetricsConfig | Contrôle la publication des CloudWatch métriques. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | Cette option a été supprimée. Consultez Validation du numéro de séquence des points de contrôle. | 
| regionName | ConfigsBuilder | Cette option a été supprimée. Consultez Suppression des configurations du client. | 
| maxLeasesForWorker | LeaseManagementConfig | Nombre maximum de baux qu'une instance unique de l'application doit accepter. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | Nombre maximum de baux qu'une application doit tenter de voler à la fois. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | La IOPs lecture DynamoDB utilisée si la bibliothèque cliente Kinesis doit créer une nouvelle table de bail DynamoDB. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | La IOPs lecture DynamoDB utilisée si la bibliothèque cliente Kinesis doit créer une nouvelle table de bail DynamoDB. | 
| initialPositionInStreamExtended | LeaseManagementConfig | La position initiale dans le flux à laquelle l'application doit commencer. Elle est utilisée uniquement lors de la création de bail initiale. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Désactivez la synchronisation des données de partition si la table des baux contient des baux existants. TOUT : -438 KinesisEco | 
| shardPrioritization | CoordinatorConfig | Définition des priorités de partition à utiliser. | 
| shutdownGraceMillis | N/A | Cette option a été supprimée. Voir MultiLang Suppressions. | 
| timeoutInSeconds | N/A | Cette option a été supprimée. Voir MultiLang Suppressions. | 
| retryGetRecordsInSeconds | PollingConfig | Configure le délai entre les GetRecords tentatives d'échec. | 
| maxGetRecordsThreadPool | PollingConfig | La taille du pool de threads utilisée pour GetRecords. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Contrôle la taille du pool de threads des renouvellements de baux. Plus votre application accepte de baux, plus la taille du pool doit être importante. | 
| recordsFetcherFactory | PollingConfig | Permet de remplacer la fabrique utilisée pour créer des extracteurs qui effectuent la récupération à partir de flux. | 
| logWarningForTaskAfterMillis | LifecycleConfig | Temps d'attente avec la consignation d'un avertissement si une tâche n'a pas été terminée. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | Nombre de millisecondes à attendre entre les appels de ListShards en cas de défaillance. | 
| maxListShardsRetryAttempts | RetrievalConfig | Nombre maximum de nouvelles tentatives par ListShards avant l'abandon. | 

## Suppression des temps d'inactivité
<a name="idle-time-removal"></a>

Dans la version 1.x de la KCL, le `idleTimeBetweenReadsInMillis` correspond à deux quantités : 
+ Durée entre les vérifications de répartition des tâches. Vous pouvez maintenant configurer cette durée entre les tâches en définissant `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ Durée de veille lorsqu'aucun enregistrement n'a été renvoyé à partir de Kinesis Data Streams. Dans la version 2.0, les enregistrements de diffusion améliorée sont transmis à partir de leur extracteur respectif. Les activités sur l'application consommateur de la partition ont lieu uniquement lorsqu'une demande push arrive. 

## Suppressions de configurations clientes
<a name="client-configuration-removals"></a>

Dans la version 2.0, la KCL ne crée plus de d'applications client. Il incombe à l'utilisateur de fournir un client valide. Avec cette modification, tous les paramètres de configuration qui contrôlaient la configuration du client ont été supprimés. Si vous avez besoin de ces paramètres, vous pouvez les définir sur les clients avant de fournir les clients à `ConfigsBuilder`.


****  

| Champ supprimé | Configuration équivalente | 
| --- | --- | 
| kinesisEndpoint | Configurez le SDK KinesisAsyncClient avec le point de terminaison préféré : KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build(). | 
| dynamoDBEndpoint | Configurez le SDK DynamoDbAsyncClient avec le point de terminaison préféré : DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build(). | 
| kinesisClientConfig | Configurez le SDK KinesisAsyncClient avec la configuration nécessaire : KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Configurez le SDK DynamoDbAsyncClient avec la configuration nécessaire : DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Configurez le SDK CloudWatchAsyncClient avec la configuration nécessaire : CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Configurez le SDK avec la région préférée. Elle est identique pour tous les clients du SDK. Par exemple, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 