Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Traitement multi-flux avec KCL
Cette section décrit les modifications requises pour vous permettre de créer KCL des applications grand public capables de traiter plusieurs flux de données à la fois. KCL
Important
-
Le traitement multi-flux n'est pris en charge que dans la version KCL 2.3 ou ultérieure.
-
Le traitement multi-flux n'est pas pris en charge pour KCL les utilisateurs écrits dans des langages autres que Java qui s'exécutent avec.
multilangdaemon
-
Le traitement multi-flux n'est pris en charge dans aucune version de KCL 1.x.
-
MultistreamTracker interface
-
Pour créer une application grand public capable de traiter plusieurs flux en même temps, vous devez implémenter une nouvelle interface appelée MultistreamTracker
. Cette interface inclut la streamConfigList
méthode qui renvoie la liste des flux de données et leurs configurations à traiter par l'application KCL client. Notez que les flux de données en cours de traitement peuvent être modifiés pendant l'exécution de l'application grand public.streamConfigList
est appelé périodiquement KCL pour prendre connaissance de l'évolution des flux de données à traiter. -
streamConfigList
Renseigne la StreamConfigliste.
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
-
Les champs
StreamIdentifier
etInitialPositionInStreamExtended
sont obligatoires, alors qu'ilsconsumerArn
sont facultatifs. Vous devez le fournirconsumerArn
uniquement si vous l'utilisez KCL pour implémenter une application grand public améliorée. -
Pour plus d'informations sur
StreamIdentifier
, consultez https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. Pour créer une StreamIdentifier
, nous vous recommandons de créer une instance multistream à partir destreamArn
etstreamCreationEpoch
qui est disponible dans la KCL version 2.5.0 ou ultérieure. Dans les KCL versions 2.3 et 2.4, qui ne sont pas prises en chargestreamArm
, créez une instance multistream en utilisant le format.account-id:StreamName:streamCreationTimestamp
Ce format sera obsolète et ne sera plus pris en charge à compter de la prochaine version majeure. -
MultistreamTracker inclut également une stratégie pour supprimer les baux des anciens flux dans la table des baux (formerStreamsLeasesDeletionStrategy). Notez que la stratégie CANNOT sera modifiée pendant l'exécution de l'application grand public. Pour plus d'informations, consultez https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client
.java. src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy
-
Vous pouvez également l'initialiser ConfigsBuilder avec MultiStreamTracker
si vous souhaitez implémenter une application KCL grand public qui traite plusieurs flux en même temps.
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Grâce à la prise en charge multiflux mise en œuvre pour votre application KCL client, chaque ligne de la table des baux de l'application contient désormais l'ID de partition et le nom de flux des multiples flux de données traités par cette application.
-
Lorsque le support multi-flux pour votre application KCL grand public est implémenté, la leaseKey structure est la suivante :
account-id:StreamName:streamCreationTimestamp:ShardId
. Par exemple,111111111:multiStreamTest-1:12345:shardId-000000000336
.
Important
Lorsque votre application KCL client existante est configurée pour traiter un seul flux de données, le leaseKey
(qui est la clé de partition pour la table de location) est l'ID de partition. Si vous reconfigurez une application KCL client existante pour traiter plusieurs flux de données, cela interrompt votre table de location, car la leaseKey
structure doit être la suivante : account-id:StreamName:StreamCreationTimestamp:ShardId
pour prendre en charge le multi-flux.