Traitement multi-flux avec KCL - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Traitement multi-flux avec KCL

Cette section décrit les modifications requises pour vous permettre de créer KCL des applications grand public capables de traiter plusieurs flux de données à la fois. KCL

Important
  • Le traitement multi-flux n'est pris en charge que dans la version KCL 2.3 ou ultérieure.

  • Le traitement multi-flux n'est pas pris en charge pour KCL les utilisateurs écrits dans des langages autres que Java qui s'exécutent avec. multilangdaemon

  • Le traitement multi-flux n'est pris en charge dans aucune version de KCL 1.x.

  • MultistreamTracker interface

    • Pour créer une application grand public capable de traiter plusieurs flux en même temps, vous devez implémenter une nouvelle interface appelée MultistreamTracker. Cette interface inclut la streamConfigList méthode qui renvoie la liste des flux de données et leurs configurations à traiter par l'application KCL client. Notez que les flux de données en cours de traitement peuvent être modifiés pendant l'exécution de l'application grand public. streamConfigListest appelé périodiquement KCL pour prendre connaissance de l'évolution des flux de données à traiter.

    • streamConfigListRenseigne la StreamConfigliste.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
    • Les champs StreamIdentifier et InitialPositionInStreamExtended sont obligatoires, alors qu'ils consumerArn sont facultatifs. Vous devez le fournir consumerArn uniquement si vous l'utilisez KCL pour implémenter une application grand public améliorée.

    • Pour plus d'informations surStreamIdentifier, consultez https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. Pour créer uneStreamIdentifier, nous vous recommandons de créer une instance multistream à partir de streamArn et streamCreationEpoch qui est disponible dans la KCL version 2.5.0 ou ultérieure. Dans les KCL versions 2.3 et 2.4, qui ne sont pas prises en chargestreamArm, créez une instance multistream en utilisant le format. account-id:StreamName:streamCreationTimestamp Ce format sera obsolète et ne sera plus pris en charge à compter de la prochaine version majeure.

    • MultistreamTracker inclut également une stratégie pour supprimer les baux des anciens flux dans la table des baux (formerStreamsLeasesDeletionStrategy). Notez que la stratégie CANNOT sera modifiée pendant l'exécution de l'application grand public. Pour plus d'informations, consultez https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client.java. src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy

Vous pouvez également l'initialiser ConfigsBuilder avec MultiStreamTracker si vous souhaitez implémenter une application KCL grand public qui traite plusieurs flux en même temps.

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Grâce à la prise en charge multiflux mise en œuvre pour votre application KCL client, chaque ligne de la table des baux de l'application contient désormais l'ID de partition et le nom de flux des multiples flux de données traités par cette application.

  • Lorsque le support multi-flux pour votre application KCL grand public est implémenté, la leaseKey structure est la suivante :account-id:StreamName:streamCreationTimestamp:ShardId. Par exemple, 111111111:multiStreamTest-1:12345:shardId-000000000336.

Important

Lorsque votre application KCL client existante est configurée pour traiter un seul flux de données, le leaseKey (qui est la clé de partition pour la table de location) est l'ID de partition. Si vous reconfigurez une application KCL client existante pour traiter plusieurs flux de données, cela interrompt votre table de location, car la leaseKey structure doit être la suivante : account-id:StreamName:StreamCreationTimestamp:ShardId pour prendre en charge le multi-flux.