Multistream-Verarbeitung mit KCL - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Multistream-Verarbeitung mit KCL

In diesem Abschnitt werden die erforderlichen Änderungen beschriebenKCL, die es Ihnen ermöglichen, KCL Verbraucheranwendungen zu erstellen, die mehr als einen Datenstrom gleichzeitig verarbeiten können.

Wichtig
  • Die Multistream-Verarbeitung wird nur in Version KCL 2.3 oder höher unterstützt.

  • Die Multistream-Verarbeitung wird nicht für KCL Verbraucher unterstützt, die in anderen Sprachen als Java geschrieben sind und mit ausgeführt werden. multilangdaemon

  • Die Multistream-Verarbeitung wird in keiner Version von 1.x unterstützt. KCL

  • MultistreamTracker Schnittstelle

    • Um eine Verbraucheranwendung zu erstellen, die mehrere Streams gleichzeitig verarbeiten kann, müssen Sie eine neue Schnittstelle namens implementieren MultistreamTracker. Diese Schnittstelle enthält die streamConfigList Methode, die die Liste der Datenströme und ihrer Konfigurationen zurückgibt, die von der KCL Verbraucheranwendung verarbeitet werden sollen. Beachten Sie, dass die Datenströme, die verarbeitet werden, während der Laufzeit der Verbraucheranwendung geändert werden können. streamConfigListwird regelmäßig von aufgerufenKCL, um mehr über die Änderungen der zu verarbeitenden Datenströme zu erfahren.

    • Das streamConfigList füllt die StreamConfigListe auf.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
    • Die Felder StreamIdentifier und InitialPositionInStreamExtended sind Pflichtfelder, während sie optional consumerArn sind. Sie müssen das Feld consumerArn nur angeben, wenn Sie es KCL zur Implementierung einer erweiterten Fan-Out-Anwendung für Privatanwender verwenden.

    • Weitere Informationen dazu finden Sie StreamIdentifier unter https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. Um eine zu erstellenStreamIdentifier, empfehlen wir, eine Multistream-Instanz aus dem streamArn und dem zu erstellenstreamCreationEpoch, das in KCL 2.5.0 oder höher verfügbar ist. Erstellen Sie in KCL Version 2.3 und Version 2.4, die dies nicht unterstützenstreamArm, eine Multistream-Instance mithilfe des folgenden Formats. account-id:StreamName:streamCreationTimestamp Dieses Format ist veraltet und wird ab der nächsten Hauptversion nicht mehr unterstützt.

    • MultistreamTracker beinhaltet auch eine Strategie zum Löschen von Leasingverträgen alter Streams in der Leasetabelle (). formerStreamsLeases DeletionStrategy Beachten Sie, dass die Strategie während der Laufzeit der Verbraucheranwendung geändert CANNOT werden muss. Weitere Informationen finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client.java. src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy

Oder Sie können ConfigsBuilder mit initialisieren, MultiStreamTracker wenn Sie eine KCL Verbraucheranwendung implementieren möchten, die mehrere Streams gleichzeitig verarbeitet.

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Da die Multistream-Unterstützung für Ihre KCL Consumer-Anwendung implementiert ist, enthält jede Zeile der Leasetabelle der Anwendung jetzt die Shard-ID und den Stream-Namen der mehreren Datenströme, die diese Anwendung verarbeitet.

  • Wenn die Multistream-Unterstützung für Ihre KCL Verbraucheranwendung implementiert ist, hat sie die leaseKey folgende Struktur:. account-id:StreamName:streamCreationTimestamp:ShardId Beispiel, 111111111:multiStreamTest-1:12345:shardId-000000000336.

Wichtig

Wenn Ihre bestehende KCL Verbraucheranwendung so konfiguriert ist, dass sie nur einen Datenstrom verarbeitet, ist der leaseKey (der Partitionsschlüssel für die Leasetabelle) die Shard-ID. Wenn Sie eine bestehende KCL Verbraucheranwendung so umkonfigurieren, dass sie mehrere Datenströme verarbeitet, wird dadurch Ihre Leasing-Tabelle beschädigt, da die leaseKey Struktur wie folgt aussehen muss: account-id:StreamName:StreamCreationTimestamp:ShardId um Multistream zu unterstützen.