Migrer les consommateurs de la version KCL 1.x vers la version KCL 2.x - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Migrer les consommateurs de la version KCL 1.x vers la version KCL 2.x

Cette rubrique explique les différences entre les versions 1.x et 2.x de la bibliothèque cliente Kinesis (). KCL Il vous montre également comment faire migrer votre client de la version 1.x vers la version 2.x du. KCL Après avoir migré votre client, il commencera à traiter les enregistrements à partir du dernier emplacement contrôlé.

La version 2.0 KCL introduit les modifications d'interface suivantes :

KCLChangements d'interface
KCLInterface 1.x KCLInterface 2.0
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware Intégré à software.amazon.kinesis.processor.ShardRecordProcessor

Migrer le processeur d'enregistrements

L'exemple suivant montre un processeur d'enregistrement implémenté pour la version KCL 1.x :

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Pour migrer la classe de processeur d'enregistrements
  1. Modifiez les interfaces de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor et com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware de software.amazon.kinesis.processor.ShardRecordProcessor, comme suit :

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. Mettez à jour les instructions import pour les méthodes initialize et processRecords.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. Remplacez la méthode shutdown par les nouvelles méthodes suivantes : leaseLost, shardEnded et shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Voici la version mise à jour de la classe du processeur d'enregistrements.

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

Migrer l'usine de traitement des enregistrements

La fabrique de processeurs d'enregistrements est responsable de la création des processeurs d'enregistrements lorsqu'un bail est acquis. Voici un exemple d'usine KCL 1.x.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Pour migrer la fabrique de processeurs d'enregistrements
  1. Modifiez l'interface implémentée de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory vers software.amazon.kinesis.processor.ShardRecordProcessorFactory, comme suit.

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. Modifiez la signature de retour pour createProcessor.

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Voici un exemple de fabrique de processeurs d'enregistrements dans 2.0 :

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

Migrer le travailleur

Dans la version 2.0 deKCL, une nouvelle classe, appeléeScheduler, remplace la Worker classe. Voici un exemple de worker KCL 1.x.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Pour migrer l'application de travail
  1. Modifiez la déclaration import de la classe Worker pour les instructions d'importation pour les classes Scheduler et ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Créez le ConfigsBuilder et un Scheduler comme illustré dans l'exemple suivant.

    Il est recommandé d'utiliser KinesisClientUtil pour créer KinesisAsyncClient et configurer maxConcurrency dans KinesisAsyncClient.

    Important

    Le client Amazon Kinesis peut voir une augmentation significative de la latence, sauf si vous configurez KinesisAsyncClient pour avoir unemaxConcurrency suffisamment élevée pour autoriser tous les baux et les utilisations supplémentaires de KinesisAsyncClient.

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Configuration du client Amazon Kinesis

Avec la version 2.0 de la bibliothèque client Kinesis, la configuration du client est passée d'une seule classe de configuration (KinesisClientLibConfiguration) à six classes de configuration. Le tableau suivant décrit la migration.

Champs de configuration et leurs nouvelles classes
Champ d'origine Nouvelle classe de configuration Description
applicationName ConfigsBuilder Le nom de cette KCL application est. Utilisé par défaut pour le tableName et le consumerName.
tableName ConfigsBuilder Permet de remplacer le nom du tableau utilisé par le tableau des baux Amazon DynamoDB.
streamName ConfigsBuilder Nom du flux à partir duquel cette application traite les enregistrements.
kinesisEndpoint ConfigsBuilder Cette option a été supprimée. Consultez Suppressions des configurations du client.
dynamoDBEndpoint ConfigsBuilder Cette option a été supprimée. Consultez Suppressions des configurations du client.
initialPositionInStreamExtended RetrievalConfig Emplacement de la partition à partir duquel KCL commence à extraire les enregistrements, en commençant par l'exécution initiale de l'application.
kinesisCredentialsProvider ConfigsBuilder Cette option a été supprimée. Consultez Suppressions des configurations du client.
dynamoDBCredentialsProvider ConfigsBuilder Cette option a été supprimée. Consultez Suppressions des configurations du client.
cloudWatchCredentialsProvider ConfigsBuilder Cette option a été supprimée. Consultez Suppressions des configurations du client.
failoverTimeMillis LeaseManagementConfig Nombre de millisecondes qui doivent s'écouler avant que vous puissiez considérer qu'un bail propriétaire a échoué.
workerIdentifier ConfigsBuilder Identifiant unique qui représente cette instanciation du processeur d'applications. Il doit être unique.
shardSyncIntervalMillis LeaseManagementConfig Délai entre les appels de synchronisation des partitions.
maxRecords PollingConfig Permet de définir le nombre maximum d'enregistrements renvoyés par Kinesis.
idleTimeBetweenReadsInMillis CoordinatorConfig Cette option a été supprimée. Consultez Suppression du temps d'inactivité.
callProcessRecordsEvenForEmptyRecordList ProcessorConfig Lorsqu'il est défini, le processeur d'enregistrements est appelé même si aucun enregistrement n'a été fourni depuis Kinesis.
parentShardPollIntervalMillis CoordinatorConfig À quelle fréquence un processeur d'enregistrements doit-il interroger pour voir si la partition parent est terminée.
cleanupLeasesUponShardCompletion LeaseManagementConfig Lorsqu'ils sont définis, les baux sont supprimés dès que les baux enfant ont commencé le traitement.
ignoreUnexpectedChildShards LeaseManagementConfig Lorsqu'elles sont définies, les partitions enfant ont une partition ouverte qui est ignorée. Cela concerne principalement DynamoDB Streams.
kinesisClientConfig ConfigsBuilder Cette option a été supprimée. Consultez Suppressions des configurations du client.
dynamoDBClientConfig ConfigsBuilder Cette option a été supprimée. Consultez Suppressions des configurations du client.
cloudWatchClientConfig ConfigsBuilder Cette option a été supprimée. Consultez Suppressions des configurations du client.
taskBackoffTimeMillis LifecycleConfig Durée d'attente pour relancer des tâches ayant échoué.
metricsBufferTimeMillis MetricsConfig Contrôle la publication des CloudWatch métriques.
metricsMaxQueueSize MetricsConfig Contrôle la publication des CloudWatch métriques.
metricsLevel MetricsConfig Contrôle la publication des CloudWatch métriques.
metricsEnabledDimensions MetricsConfig Contrôle la publication des CloudWatch métriques.
validateSequenceNumberBeforeCheckpointing CheckpointConfig Cette option a été supprimée. Consultez Validation du numéro de séquence des points de contrôle.
regionName ConfigsBuilder Cette option a été supprimée. Consultez Suppression des configurations du client.
maxLeasesForWorker LeaseManagementConfig Nombre maximum de baux qu'une instance unique de l'application doit accepter.
maxLeasesToStealAtOneTime LeaseManagementConfig Nombre maximum de baux qu'une application doit tenter de voler à la fois.
initialLeaseTableReadCapacity LeaseManagementConfig La IOPs lecture DynamoDB utilisée si la bibliothèque cliente Kinesis doit créer une nouvelle table de bail DynamoDB.
initialLeaseTableWriteCapacity LeaseManagementConfig La IOPs lecture DynamoDB utilisée si la bibliothèque cliente Kinesis doit créer une nouvelle table de bail DynamoDB.
initialPositionInStreamExtended LeaseManagementConfig La position initiale dans le flux à laquelle l'application doit commencer. Elle est utilisée uniquement lors de la création de bail initiale.
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig Désactivez la synchronisation des données de partition si la table des baux contient des baux existants. TODO: KinesisEco -438
shardPrioritization CoordinatorConfig Définition des priorités de partition à utiliser.
shutdownGraceMillis N/A Cette option a été supprimée. Voir MultiLang Suppressions.
timeoutInSeconds N/A Cette option a été supprimée. Voir MultiLang Suppressions.
retryGetRecordsInSeconds PollingConfig Configure le délai entre les GetRecords tentatives d'échec.
maxGetRecordsThreadPool PollingConfig La taille du pool de threads utilisée pour GetRecords.
maxLeaseRenewalThreads LeaseManagementConfig Contrôle la taille du pool de threads des renouvellements de baux. Plus votre application accepte de baux, plus la taille du pool doit être importante.
recordsFetcherFactory PollingConfig Permet de remplacer la fabrique utilisée pour créer des extracteurs qui effectuent la récupération à partir de flux.
logWarningForTaskAfterMillis LifecycleConfig Temps d'attente avec la consignation d'un avertissement si une tâche n'a pas été terminée.
listShardsBackoffTimeInMillis RetrievalConfig Nombre de millisecondes à attendre entre les appels de ListShards en cas de défaillance.
maxListShardsRetryAttempts RetrievalConfig Nombre maximum de nouvelles tentatives par ListShards avant l'abandon.

Suppression des temps d'inactivité

Dans la version 1.x duKCL, cela idleTimeBetweenReadsInMillis correspondait à deux quantités :

  • Durée entre les vérifications de répartition des tâches. Vous pouvez maintenant configurer cette durée entre les tâches en définissant CoordinatorConfig#shardConsumerDispatchPollIntervalMillis.

  • Durée de veille lorsqu'aucun enregistrement n'a été renvoyé à partir de Kinesis Data Streams. Dans la version 2.0, les enregistrements de diffusion améliorée sont transmis à partir de leur extracteur respectif. Les activités sur l'application consommateur de la partition ont lieu uniquement lorsqu'une demande push arrive.

Suppressions de configurations clientes

Dans la version 2.0, le KCL ne crée plus de clients. Il incombe à l'utilisateur de fournir un client valide. Avec cette modification, tous les paramètres de configuration qui contrôlaient la configuration du client ont été supprimés. Si vous avez besoin de ces paramètres, vous pouvez les définir sur les clients avant de fournir les clients à ConfigsBuilder.

Champ supprimé Configuration équivalente
kinesisEndpoint Configurez le SDK KinesisAsyncClient avec le point de terminaison préféré :KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build().
dynamoDBEndpoint Configurez le SDK DynamoDbAsyncClient avec le point de terminaison préféré :DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build().
kinesisClientConfig Configurez le SDK KinesisAsyncClient avec la configuration requise :KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build().
dynamoDBClientConfig Configurez le SDK DynamoDbAsyncClient avec la configuration requise :DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build().
cloudWatchClientConfig Configurez le SDK CloudWatchAsyncClient avec la configuration requise :CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build().
regionName Configurez le SDK avec la région préférée. Il en va de même pour tous les SDK clients. Par exemple, KinesisAsyncClient.builder().region(Region.US_WEST_2).build().