Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Migrer les consommateurs de la version KCL 1.x vers la version KCL 2.x
Cette rubrique explique les différences entre les versions 1.x et 2.x de la bibliothèque cliente Kinesis (). KCL Il vous montre également comment faire migrer votre client de la version 1.x vers la version 2.x du. KCL Après avoir migré votre client, il commencera à traiter les enregistrements à partir du dernier emplacement contrôlé.
La version 2.0 KCL introduit les modifications d'interface suivantes :
KCLChangements d'interface | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
KCLInterface 1.x | KCLInterface 2.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
Intégré à software.amazon.kinesis.processor.ShardRecordProcessor |
Rubriques
Migrer le processeur d'enregistrements
L'exemple suivant montre un processeur d'enregistrement implémenté pour la version KCL 1.x :
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Pour migrer la classe de processeur d'enregistrements
-
Modifiez les interfaces de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
etcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
desoftware.amazon.kinesis.processor.ShardRecordProcessor
, comme suit :// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
Mettez à jour les instructions
import
pour les méthodesinitialize
etprocessRecords
.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
Remplacez la méthode
shutdown
par les nouvelles méthodes suivantes :leaseLost
,shardEnded
etshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Voici la version mise à jour de la classe du processeur d'enregistrements.
package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Migrer l'usine de traitement des enregistrements
La fabrique de processeurs d'enregistrements est responsable de la création des processeurs d'enregistrements lorsqu'un bail est acquis. Voici un exemple d'usine KCL 1.x.
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Pour migrer la fabrique de processeurs d'enregistrements
-
Modifiez l'interface implémentée de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
verssoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, comme suit.// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
Modifiez la signature de retour pour
createProcessor
.// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Voici un exemple de fabrique de processeurs d'enregistrements dans 2.0 :
package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }
Migrer le travailleur
Dans la version 2.0 deKCL, une nouvelle classe, appeléeScheduler
, remplace la Worker
classe. Voici un exemple de worker KCL 1.x.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Pour migrer l'application de travail
-
Modifiez la déclaration
import
de la classeWorker
pour les instructions d'importation pour les classesScheduler
etConfigsBuilder
.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Créez le
ConfigsBuilder
et unScheduler
comme illustré dans l'exemple suivant.Il est recommandé d'utiliser
KinesisClientUtil
pour créerKinesisAsyncClient
et configurermaxConcurrency
dansKinesisAsyncClient
.Important
Le client Amazon Kinesis peut voir une augmentation significative de la latence, sauf si vous configurez
KinesisAsyncClient
pour avoir unemaxConcurrency
suffisamment élevée pour autoriser tous les baux et les utilisations supplémentaires deKinesisAsyncClient
.import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Configuration du client Amazon Kinesis
Avec la version 2.0 de la bibliothèque client Kinesis, la configuration du client est passée d'une seule classe de configuration (KinesisClientLibConfiguration
) à six classes de configuration. Le tableau suivant décrit la migration.
Champs de configuration et leurs nouvelles classes | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Champ d'origine | Nouvelle classe de configuration | Description | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
applicationName |
ConfigsBuilder |
Le nom de cette KCL application est. Utilisé par défaut pour le tableName et le consumerName . |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tableName |
ConfigsBuilder |
Permet de remplacer le nom du tableau utilisé par le tableau des baux Amazon DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamName |
ConfigsBuilder |
Nom du flux à partir duquel cette application traite les enregistrements. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisEndpoint |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppressions des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBEndpoint |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppressions des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialPositionInStreamExtended |
RetrievalConfig |
Emplacement de la partition à partir duquel KCL commence à extraire les enregistrements, en commençant par l'exécution initiale de l'application. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisCredentialsProvider |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppressions des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBCredentialsProvider |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppressions des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloudWatchCredentialsProvider |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppressions des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
failoverTimeMillis |
LeaseManagementConfig | Nombre de millisecondes qui doivent s'écouler avant que vous puissiez considérer qu'un bail propriétaire a échoué. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workerIdentifier |
ConfigsBuilder |
Identifiant unique qui représente cette instanciation du processeur d'applications. Il doit être unique. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardSyncIntervalMillis |
LeaseManagementConfig | Délai entre les appels de synchronisation des partitions. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxRecords |
PollingConfig |
Permet de définir le nombre maximum d'enregistrements renvoyés par Kinesis. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
Cette option a été supprimée. Consultez Suppression du temps d'inactivité. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
Lorsqu'il est défini, le processeur d'enregistrements est appelé même si aucun enregistrement n'a été fourni depuis Kinesis. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
parentShardPollIntervalMillis |
CoordinatorConfig |
À quelle fréquence un processeur d'enregistrements doit-il interroger pour voir si la partition parent est terminée. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
Lorsqu'ils sont définis, les baux sont supprimés dès que les baux enfant ont commencé le traitement. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ignoreUnexpectedChildShards |
LeaseManagementConfig |
Lorsqu'elles sont définies, les partitions enfant ont une partition ouverte qui est ignorée. Cela concerne principalement DynamoDB Streams. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisClientConfig |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppressions des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBClientConfig |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppressions des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloudWatchClientConfig |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppressions des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
taskBackoffTimeMillis |
LifecycleConfig |
Durée d'attente pour relancer des tâches ayant échoué. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsBufferTimeMillis |
MetricsConfig |
Contrôle la publication des CloudWatch métriques. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsMaxQueueSize |
MetricsConfig |
Contrôle la publication des CloudWatch métriques. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsLevel |
MetricsConfig |
Contrôle la publication des CloudWatch métriques. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsEnabledDimensions |
MetricsConfig |
Contrôle la publication des CloudWatch métriques. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
Cette option a été supprimée. Consultez Validation du numéro de séquence des points de contrôle. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
regionName |
ConfigsBuilder |
Cette option a été supprimée. Consultez Suppression des configurations du client. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeasesForWorker |
LeaseManagementConfig |
Nombre maximum de baux qu'une instance unique de l'application doit accepter. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
Nombre maximum de baux qu'une application doit tenter de voler à la fois. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialLeaseTableReadCapacity |
LeaseManagementConfig |
La IOPs lecture DynamoDB utilisée si la bibliothèque cliente Kinesis doit créer une nouvelle table de bail DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
La IOPs lecture DynamoDB utilisée si la bibliothèque cliente Kinesis doit créer une nouvelle table de bail DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialPositionInStreamExtended |
LeaseManagementConfig | La position initiale dans le flux à laquelle l'application doit commencer. Elle est utilisée uniquement lors de la création de bail initiale. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
Désactivez la synchronisation des données de partition si la table des baux contient des baux existants. TODO: KinesisEco -438 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardPrioritization |
CoordinatorConfig |
Définition des priorités de partition à utiliser. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shutdownGraceMillis |
N/A | Cette option a été supprimée. Voir MultiLang Suppressions. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
timeoutInSeconds |
N/A | Cette option a été supprimée. Voir MultiLang Suppressions. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
retryGetRecordsInSeconds |
PollingConfig |
Configure le délai entre les GetRecords tentatives d'échec. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxGetRecordsThreadPool |
PollingConfig |
La taille du pool de threads utilisée pour GetRecords. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeaseRenewalThreads |
LeaseManagementConfig |
Contrôle la taille du pool de threads des renouvellements de baux. Plus votre application accepte de baux, plus la taille du pool doit être importante. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
recordsFetcherFactory |
PollingConfig |
Permet de remplacer la fabrique utilisée pour créer des extracteurs qui effectuent la récupération à partir de flux. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logWarningForTaskAfterMillis |
LifecycleConfig |
Temps d'attente avec la consignation d'un avertissement si une tâche n'a pas été terminée. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
listShardsBackoffTimeInMillis |
RetrievalConfig |
Nombre de millisecondes à attendre entre les appels de ListShards en cas de défaillance. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxListShardsRetryAttempts |
RetrievalConfig |
Nombre maximum de nouvelles tentatives par ListShards avant l'abandon. |
Suppression des temps d'inactivité
Dans la version 1.x duKCL, cela idleTimeBetweenReadsInMillis
correspondait à deux quantités :
-
Durée entre les vérifications de répartition des tâches. Vous pouvez maintenant configurer cette durée entre les tâches en définissant
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
. -
Durée de veille lorsqu'aucun enregistrement n'a été renvoyé à partir de Kinesis Data Streams. Dans la version 2.0, les enregistrements de diffusion améliorée sont transmis à partir de leur extracteur respectif. Les activités sur l'application consommateur de la partition ont lieu uniquement lorsqu'une demande push arrive.
Suppressions de configurations clientes
Dans la version 2.0, le KCL ne crée plus de clients. Il incombe à l'utilisateur de fournir un client valide. Avec cette modification, tous les paramètres de configuration qui contrôlaient la configuration du client ont été supprimés. Si vous avez besoin de ces paramètres, vous pouvez les définir sur les clients avant de fournir les clients à ConfigsBuilder
.
Champ supprimé | Configuration équivalente |
---|---|
kinesisEndpoint |
Configurez le SDK KinesisAsyncClient avec le point de terminaison préféré :KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis
endpoint>")).build() . |
dynamoDBEndpoint |
Configurez le SDK DynamoDbAsyncClient avec le point de terminaison préféré :DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb
endpoint>")).build() . |
kinesisClientConfig |
Configurez le SDK KinesisAsyncClient avec la configuration requise :KinesisAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
dynamoDBClientConfig |
Configurez le SDK DynamoDbAsyncClient avec la configuration requise :DynamoDbAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
cloudWatchClientConfig |
Configurez le SDK CloudWatchAsyncClient avec la configuration requise :CloudWatchAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
regionName |
Configurez le SDK avec la région préférée. Il en va de même pour tous les SDK clients. Par exemple, KinesisAsyncClient.builder().region(Region.US_WEST_2).build() . |