Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Développez vos clients avec KCL Java
Prérequis
Avant de commencer à utiliser la KCL version 3.x, assurez-vous que vous disposez des éléments suivants :
-
Kit de développement Java (8JDK) ou version ultérieure
-
AWS SDK for Java 2. x
-
Maven ou Gradle pour la gestion des dépendances
KCLcollecte des indicateurs CPU d'utilisation tels que CPU l'utilisation provenant de l'hôte de calcul sur lequel les travailleurs s'exécutent afin d'équilibrer la charge afin d'atteindre un niveau d'utilisation des ressources uniforme pour tous les travailleurs. Pour permettre KCL de collecter des métriques CPU d'utilisation auprès des travailleurs, vous devez remplir les conditions préalables suivantes :
Amazon Elastic Compute Cloud(AmazonEC2)
-
Votre système d'exploitation doit être Linux.
-
Vous devez l'activer IMDSv2dans votre EC2 instance.
Amazon Elastic Container Service (AmazonECS) sur Amazon EC2
-
Votre système d'exploitation doit être Linux.
-
Vous devez activer la version 4 du point de terminaison des métadonnées des ECS tâches.
-
La version de votre agent de ECS conteneur Amazon doit être 1.39.0 ou ultérieure.
Amazon ECS sur AWS Fargate
-
Vous devez activer la version 4 du point de terminaison des métadonnées des tâches Fargate. Si vous utilisez la version 1.4.0 ou ultérieure de la plateforme Fargate, cette option est activée par défaut.
-
Plateforme Fargate version 1.4.0 ou ultérieure.
Amazon Elastic Kubernetes Service (Amazon) sur Amazon EKS EC2
-
Votre système d'exploitation doit être Linux.
Amazon EKS sur AWS Fargate
-
Plateforme Fargate 1.3.0 ou version ultérieure.
Important
Si vous KCL ne CPU parvenez pas à collecter les indicateurs d'utilisation auprès des travailleurs, KCL vous aurez recours au débit par travailleur pour attribuer des baux et équilibrer la charge entre les travailleurs de la flotte. Pour de plus amples informations, veuillez consulter Comment KCL attribuer les baux aux travailleurs et équilibrer la charge.
Installation et ajout de dépendances
Si vous utilisez Maven, ajoutez la dépendance suivante à votre pom.xml
fichier. Assurez-vous d'avoir remplacé la version 3.x.x par la dernière versionKCL.
<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>
Si vous utilisez Gradle, ajoutez ce qui suit à votre build.gradle
fichier. Assurez-vous d'avoir remplacé la version 3.x.x par la dernière versionKCL.
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
Vous pouvez vérifier la dernière version du dans KCL le référentiel central de Maven
Mettre en œuvre le consommateur
Une application destinée KCL aux consommateurs comprend les éléments clés suivants :
Composants clés
RecordProcessor
RecordProcessor est le composant central dans lequel réside votre logique métier pour le traitement des enregistrements de flux de données Kinesis. Il définit la manière dont votre application traite les données qu'elle reçoit du flux Kinesis.
Principales responsabilités :
-
Initialiser le traitement d'une partition
-
Traiter des lots d'enregistrements issus du flux Kinesis
-
Arrêter le traitement d'une partition (par exemple, lorsque la partition se divise ou fusionne, ou lorsque le bail est transféré à un autre hôte)
-
Gérez les points de contrôle pour suivre les progrès
Voici un exemple de mise en œuvre :
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }
Vous trouverez ci-dessous une explication détaillée de chaque méthode utilisée dans l'exemple :
initialiser () InitializationInput initializationInput
-
Objectif : configurer les ressources ou les états nécessaires au traitement des enregistrements.
-
Quand il est appelé : une fois, lors de l'KCLattribution d'une partition à ce processeur d'enregistrement.
-
Points clés :
-
initializationInput.shardId()
: ID de la partition que ce processeur va gérer. -
initializationInput.extendedSequenceNumber()
: numéro de séquence à partir duquel démarrer le traitement.
-
processRecords(ProcessRecordsInput processRecordsInput)
-
Objectif : traiter les enregistrements entrants et éventuellement vérifier la progression des points de contrôle.
-
Quand il est appelé : à plusieurs reprises, tant que le processeur d'enregistrements détient le bail de la partition.
-
Points clés :
-
processRecordsInput.records()
: liste des enregistrements à traiter. -
processRecordsInput.checkpointer()
: Utilisé pour vérifier la progression. -
Assurez-vous d'avoir géré toutes les exceptions pendant le traitement pour éviter KCL tout échec.
-
Cette méthode doit être idempotente, car le même enregistrement peut être traité plusieurs fois dans certains scénarios, tels que les données qui n'ont pas été contrôlées avant un crash ou un redémarrage inattendu du travailleur.
-
Videz toujours toutes les données mises en mémoire tampon avant le point de contrôle pour garantir la cohérence des données.
-
leaseLost(LeaseLostInput leaseLostInput)
-
Objectif : Nettoyer toutes les ressources spécifiques au traitement de ce fragment.
-
Quand il est appelé : lorsqu'un autre planificateur prend en charge le bail de cette partition.
-
Points clés :
-
Le point de contrôle n'est pas autorisé dans cette méthode.
-
shardEnded(ShardEndedInput shardEndedInput)
-
Objectif : terminer le traitement de cette partition et de ce point de contrôle.
-
Quand elle est appelée : lorsque la partition se divise ou fusionne, cela indique que toutes les données de cette partition ont été traitées.
-
Points clés :
-
shardEndedInput.checkpointer()
: Utilisé pour effectuer le point de contrôle final. -
Le point de contrôle utilisé dans cette méthode est obligatoire pour terminer le traitement.
-
Le fait de ne pas vider les données et de ne pas vérifier le point de contrôle ici peut entraîner une perte de données ou un double traitement lors de la réouverture de la partition.
-
shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)
-
Objectif : Contrôler et nettoyer les ressources lors de KCL la fermeture.
-
Quand elle KCL est appelée : Quand s'arrête, par exemple, lorsque l'application s'arrête).
-
Points clés :
-
shutdownRequestedInput.checkpointer()
: Utilisé pour effectuer le pointage avant l'arrêt. -
Assurez-vous d'avoir implémenté le point de contrôle dans la méthode afin que la progression soit enregistrée avant que l'application ne s'arrête.
-
Le fait de ne pas vider les données et le point de contrôle ici peut entraîner une perte de données ou un retraitement des enregistrements au redémarrage de l'application.
-
Important
KCL3.x permet de réduire le retraitement des données lorsque le bail est transféré d'un travailleur à un autre en effectuant un point de contrôle avant que le travailleur précédent ne soit arrêté. Si vous n'implémentez pas la logique de point de contrôle dans la shutdownRequested()
méthode, vous ne verrez pas cet avantage. Assurez-vous d'avoir implémenté une logique de point de contrôle dans la shutdownRequested()
méthode.
RecordProcessorFactory
RecordProcessorFactory est chargé de créer de nouvelles RecordProcessor instances. KCLutilise cette fabrique pour créer une nouvelle RecordProcessor partition pour chaque partition que l'application doit traiter.
Principales responsabilités :
-
Créez de nouvelles RecordProcessor instances à la demande
-
Assurez-vous que chacun RecordProcessor est correctement initialisé
Voici un exemple de mise en œuvre :
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }
Dans cet exemple, la fabrique crée un nouveau SampleRecordProcessor chaque fois que shardRecordProcessor () est appelé. Vous pouvez étendre cela pour inclure toute logique d'initialisation nécessaire.
Planificateur
Le planificateur est un composant de haut niveau qui coordonne toutes les activités de l'KCLapplication. Il est responsable de l'orchestration globale du traitement des données.
Principales responsabilités :
-
Gérez le cycle de vie de RecordProcessors
-
Gérez la gestion des baux pour les partitions
-
Coordonner le pointage
-
Équilibrez la charge de traitement des partitions entre les différents intervenants de votre application
-
Gérez les signaux d'arrêt et de fin d'application en douceur
Le planificateur est généralement créé et démarré dans l'application principale. Vous pouvez consulter l'exemple d'implémentation de Scheduler dans la section suivante, Application client principale.
Application principale destinée aux consommateurs
L'application principale destinée aux consommateurs relie tous les composants entre eux. Il est chargé de configurer le KCL consommateur, de créer les clients nécessaires, de configurer le planificateur et de gérer le cycle de vie de l'application.
Principales responsabilités :
-
Configuration des clients AWS de service (Kinesis, DynamoDB,) CloudWatch
-
Configuration de l'KCLapplication
-
Création et démarrage du planificateur
-
Gérer l'arrêt de l'application
Voici un exemple de mise en œuvre :
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }
KCLcrée un consommateur Enhanced Fan-out (EFO) avec un débit dédié par défaut. Pour plus d'informations sur la sortie de ventilateur améliorée, consultez. Développez des clients fans améliorés grâce à un débit dédié Si vous avez moins de 2 consommateurs ou si vous n'avez pas besoin de délais de propagation de lecture inférieurs à 200 ms, vous devez définir la configuration suivante dans l'objet du planificateur pour utiliser des consommateurs à débit partagé :
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
Le code suivant est un exemple de création d'un objet planificateur qui utilise des consommateurs à débit partagé :
Importations :
import software.amazon.kinesis.retrieval.polling.PollingConfig;
Code :
Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/