

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Développez les consommateurs avec KCL
<a name="develop-kcl-consumers"></a>

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) pour créer des applications grand public qui traitent les données issues de vos flux de données Kinesis.

KCL est disponible en plusieurs langues. Cette rubrique explique comment développer des consommateurs KCL dans des langages Java et non-Java.
+ [Pour consulter la référence Javadoc de la bibliothèque cliente Kinesis, consultez le document Javadoc de la bibliothèque cliente Amazon Kinesis.](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html)
+ Pour télécharger KCL pour Java depuis GitHub, consultez la [bibliothèque client Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) pour Java.
+ Pour localiser la KCL pour Java sur Apache Maven, consultez le référentiel central de [KCL Maven](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client).

**Topics**
+ [Développez vos clients avec KCL en Java](develop-kcl-consumers-java.md)
+ [Développez vos clients avec KCL dans des langages autres que Java](develop-kcl-consumers-non-java.md)

# Développez vos clients avec KCL en Java
<a name="develop-kcl-consumers-java"></a>

## Conditions préalables
<a name="develop-kcl-consumers-java-prerequisites"></a>

Avant de commencer à utiliser KCL 3.x, assurez-vous que vous disposez des éléments suivants :
+ Kit de développement Java (JDK) 8 ou version ultérieure
+ AWS SDK pour Java 2. x
+ Maven ou Gradle pour la gestion des dépendances

KCL collecte des mesures d'utilisation du processeur, telles que l'utilisation du processeur, à partir de l'hôte de calcul sur lequel les travailleurs s'exécutent afin d'équilibrer la charge afin d'atteindre un niveau d'utilisation des ressources uniforme pour tous les travailleurs. Pour permettre à KCL de collecter des métriques d'utilisation du processeur auprès des travailleurs, vous devez remplir les conditions préalables suivantes :

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Votre système d'exploitation doit être Linux.
+ Vous devez l'activer [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)dans votre instance EC2.

 **Amazon Elastic Container Service (Amazon ECS) sur Amazon EC2**
+ Votre système d'exploitation doit être Linux.
+ Vous devez activer la [version 4 du point de terminaison des métadonnées des tâches ECS](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La version de votre agent de conteneur Amazon ECS doit être 1.39.0 ou ultérieure.

 **Amazon ECS sur AWS Fargate**
+ Vous devez activer la version 4 [du point de terminaison des métadonnées des tâches Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). Si vous utilisez la version 1.4.0 ou ultérieure de la plateforme Fargate, cette option est activée par défaut. 
+ Version 1.4.0 ou ultérieure de la plateforme Fargate.

 **Amazon Elastic Kubernetes Service (Amazon EKS) sur Amazon EC2** 
+ Votre système d'exploitation doit être Linux.

 **Amazon EKS sur AWS Fargate**
+ Plateforme Fargate 1.3.0 ou version ultérieure.

**Important**  
Si KCL ne peut pas collecter les indicateurs d'utilisation du processeur auprès des travailleurs, KCL utilisera à nouveau le débit par travailleur pour attribuer les baux et équilibrer la charge entre les travailleurs du parc. Pour de plus amples informations, veuillez consulter [Comment KCL attribue les baux aux travailleurs et équilibre la charge](kcl-dynamoDB.md#kcl-assign-leases).

## Installation et ajout de dépendances
<a name="develop-kcl-consumers-java-installation"></a>

Si vous utilisez Maven, ajoutez la dépendance suivante à votre `pom.xml` fichier. Assurez-vous d'avoir remplacé la version 3.x.x par la dernière version de KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Si vous utilisez Gradle, ajoutez ce qui suit à votre `build.gradle` fichier. Assurez-vous d'avoir remplacé la version 3.x.x par la dernière version de KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

Vous pouvez vérifier la dernière version de la KCL dans le référentiel [central Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Mettre en œuvre le consommateur
<a name="develop-kcl-consumers-java-implemetation"></a>

Une application KCL destinée aux consommateurs comprend les éléments clés suivants :

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Planificateur](#implementation-scheduler)
+ [Application principale destinée aux consommateurs](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor est le composant central dans lequel réside votre logique métier pour le traitement des enregistrements de flux de données Kinesis. Il définit la manière dont votre application traite les données qu'elle reçoit du flux Kinesis.

Principales responsabilités :
+ Initialiser le traitement d'une partition
+ Traiter des lots d'enregistrements issus du flux Kinesis
+ Arrêter le traitement d'une partition (par exemple, lorsque la partition se divise ou fusionne, ou lorsque le bail est transféré à un autre hôte)
+ Gérez les points de contrôle pour suivre les progrès

Voici un exemple de mise en œuvre :

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Voici une explication détaillée de chaque méthode utilisée dans l'exemple :

**initialiser (InitializationInput) InitializationInput**
+ Objectif : configurer les ressources ou les états nécessaires au traitement des enregistrements.
+ Quand il est appelé : une fois, lorsque KCL attribue une partition à ce processeur d'enregistrement.
+ Points clés :
  + `initializationInput.shardId()`: ID de la partition que ce processeur va gérer.
  + `initializationInput.extendedSequenceNumber()`: le numéro de séquence à partir duquel démarrer le traitement.

**ProcessRecords () ProcessRecordsInput processRecordsInput**
+ Objectif : traiter les enregistrements entrants et éventuellement vérifier la progression des points de contrôle.
+ Quand il est appelé : à plusieurs reprises, tant que le processeur d'enregistrements détient le bail de la partition.
+ Points clés :
  + `processRecordsInput.records()`: liste des enregistrements à traiter.
  + `processRecordsInput.checkpointer()`: Utilisé pour vérifier la progression.
  + Assurez-vous d'avoir géré toutes les exceptions pendant le traitement afin d'éviter que KCL ne tombe en panne.
  + Cette méthode doit être idempotente, car le même enregistrement peut être traité plusieurs fois dans certains scénarios, tels que les données qui n'ont pas été contrôlées avant un crash ou un redémarrage inattendu du travailleur.
  + Videz toujours toutes les données mises en mémoire tampon avant le point de contrôle pour garantir la cohérence des données.

**Bail perdu () LeaseLostInput leaseLostInput**
+ Objectif : Nettoyer toutes les ressources spécifiques au traitement de cette partition.
+ Quand il est appelé : lorsqu'un autre planificateur prend en charge le bail de cette partition.
+ Points clés :
  + Le point de contrôle n'est pas autorisé dans cette méthode.

**Partagé () ShardEndedInput shardEndedInput**
+ Objectif : terminer le traitement de cette partition et de ce point de contrôle.
+ Quand elle est appelée : lorsque la partition se divise ou fusionne, cela indique que toutes les données de cette partition ont été traitées.
+ Points clés :
  + `shardEndedInput.checkpointer()`: Utilisé pour effectuer le point de contrôle final.
  + Le point de contrôle utilisé dans cette méthode est obligatoire pour terminer le traitement.
  + Le fait de ne pas vider les données et de ne pas vérifier le point de contrôle ici peut entraîner une perte de données ou un double traitement lors de la réouverture de la partition.

**Arrêt demandé () ShutdownRequestedInput shutdownRequestedInput**
+ Objectif : Contrôler et nettoyer les ressources lors de la fermeture de KCL.
+ Quand il est appelé : lorsque KCL s'arrête, par exemple, lorsque l'application s'arrête).
+ Points clés :
  + `shutdownRequestedInput.checkpointer()`: Utilisé pour effectuer le pointage avant l'arrêt.
  + Assurez-vous d'avoir implémenté le point de contrôle dans la méthode afin que la progression soit enregistrée avant que l'application ne s'arrête.
  + Le fait de ne pas vider les données et le point de contrôle ici peut entraîner une perte de données ou un retraitement des enregistrements au redémarrage de l'application.

**Important**  
KCL 3.x réduit le retraitement des données lorsque le bail est transféré d'un travailleur à un autre en effectuant un point de contrôle avant que le travailleur précédent ne soit arrêté. Si vous n'implémentez pas la logique de point de contrôle dans la `shutdownRequested()` méthode, vous ne verrez pas cet avantage. Assurez-vous d'avoir implémenté une logique de point de contrôle dans la `shutdownRequested()` méthode.

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory est chargé de créer de nouvelles RecordProcessor instances. KCL utilise cette fabrique pour créer une nouvelle partition RecordProcessor pour chaque partition que l'application doit traiter.

Principales responsabilités :
+ Créez de nouvelles RecordProcessor instances à la demande
+ Assurez-vous que chacun RecordProcessor est correctement initialisé

Voici un exemple de mise en œuvre :

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

Dans cet exemple, la fabrique crée un nouveau SampleRecordProcessor chaque fois que shardRecordProcessor () est appelé. Vous pouvez étendre cela pour inclure toute logique d'initialisation nécessaire.

### Planificateur
<a name="implementation-scheduler"></a>

Le planificateur est un composant de haut niveau qui coordonne toutes les activités de l'application KCL. Il est responsable de l'orchestration globale du traitement des données.

Principales responsabilités :
+ Gérez le cycle de vie de RecordProcessors
+ Gérez la gestion des baux pour les partitions
+ Coordonner le pointage
+ Équilibrez la charge de traitement des partitions entre les différents intervenants de votre application
+ Gérez les signaux d'arrêt et de fin d'application en douceur

Le planificateur est généralement créé et démarré dans l'application principale. Vous pouvez consulter l'exemple d'implémentation de Scheduler dans la section suivante, Application client principale. 

### Application principale destinée aux consommateurs
<a name="implementation-main"></a>

L'application principale destinée aux consommateurs relie tous les composants entre eux. Il est chargé de configurer le consommateur KCL, de créer les clients nécessaires, de configurer le planificateur et de gérer le cycle de vie de l'application.

Principales responsabilités :
+ Configuration des clients AWS de service (Kinesis, DynamoDB,) CloudWatch
+ Configuration de l'application KCL
+ Création et démarrage du planificateur
+ Gérer l'arrêt de l'application

Voici un exemple de mise en œuvre :

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL crée un consommateur EFO (Enhanced Fan-out) avec un débit dédié par défaut. Pour plus d'informations sur la sortie de ventilateur améliorée, consultez. [Développez des clients fans améliorés grâce à un débit dédié](enhanced-consumers.md) Si vous avez moins de 2 consommateurs ou si vous n'avez pas besoin de délais de propagation de lecture inférieurs à 200 ms, vous devez définir la configuration suivante dans l'objet du planificateur pour utiliser des consommateurs à débit partagé :

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

Le code suivant est un exemple de création d'un objet planificateur utilisant des consommateurs à débit partagé :

**Importations** :

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Code** :

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Développez vos clients avec KCL dans des langages autres que Java
<a name="develop-kcl-consumers-non-java"></a>

Cette section décrit la mise en œuvre par les consommateurs de la bibliothèque cliente Kinesis (KCL) en Python, Node.js, .NET et Ruby.

KCL est une bibliothèque Java. Support pour les langages autres que Java est fourni à l'aide d'une interface multilingue appelée. `MultiLangDaemon` Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez une KCL avec un langage autre que Java. Par conséquent, si vous installez KCL pour des langages autres que Java et que vous écrivez votre application grand public entièrement dans des langages autres que Java, vous devez toujours installer Java sur votre système en raison du. `MultiLangDaemon` En outre, `MultiLangDaemon` il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation (par exemple, la région AWS à laquelle il se connecte). Pour plus d'informations sur le `MultiLangDaemon` on GitHub, consultez le [ MultiLangDaemon projet KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Bien que les concepts de base restent les mêmes d'une langue à l'autre, certaines considérations et implémentations spécifiques à chaque langue doivent être prises en compte. Pour les concepts de base relatifs au développement des consommateurs de KCL, voir[Développez vos clients avec KCL en Java](develop-kcl-consumers-java.md). Pour des informations plus détaillées sur le développement de consommateurs KCL en Python, Node.js, .NET et Ruby, ainsi que sur les dernières mises à jour, consultez les GitHub référentiels suivants :
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js : [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET : [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Rubis : [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**Important**  
N'utilisez pas les versions de bibliothèque KCL autres que Java suivantes si vous utilisez le JDK 8. Ces versions contiennent une dépendance (logback) incompatible avec le JDK 8.  
KCL Python 3.0.2 et 2.2.0
KCL Node.js 2.3.0
KCL.NET 3.1.0
KCL Ruby 2.2.0
Nous vous recommandons d'utiliser les versions publiées avant ou après les versions concernées lorsque vous travaillez avec le JDK 8.