Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Multistream-Verarbeitung mit KCL
In diesem Abschnitt werden die erforderlichen Änderungen beschriebenKCL, die es Ihnen ermöglichen, KCL Verbraucheranwendungen zu erstellen, die mehr als einen Datenstrom gleichzeitig verarbeiten können.
Wichtig
-
Die Multistream-Verarbeitung wird nur in Version KCL 2.3 oder höher unterstützt.
-
Die Multistream-Verarbeitung wird nicht für KCL Verbraucher unterstützt, die in anderen Sprachen als Java geschrieben sind und mit ausgeführt werden.
multilangdaemon
-
Die Multistream-Verarbeitung wird in keiner Version von 1.x unterstützt. KCL
-
MultistreamTracker Schnittstelle
-
Um eine Verbraucheranwendung zu erstellen, die mehrere Streams gleichzeitig verarbeiten kann, müssen Sie eine neue Schnittstelle namens implementieren MultistreamTracker
. Diese Schnittstelle enthält die streamConfigList
Methode, die die Liste der Datenströme und ihrer Konfigurationen zurückgibt, die von der KCL Verbraucheranwendung verarbeitet werden sollen. Beachten Sie, dass die Datenströme, die verarbeitet werden, während der Laufzeit der Verbraucheranwendung geändert werden können.streamConfigList
wird regelmäßig von aufgerufenKCL, um mehr über die Änderungen der zu verarbeitenden Datenströme zu erfahren. -
Das
streamConfigList
füllt die StreamConfigListe auf.
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
-
Die Felder
StreamIdentifier
undInitialPositionInStreamExtended
sind Pflichtfelder, während sie optionalconsumerArn
sind. Sie müssen das FeldconsumerArn
nur angeben, wenn Sie es KCL zur Implementierung einer erweiterten Fan-Out-Anwendung für Privatanwender verwenden. -
Weitere Informationen dazu finden Sie
StreamIdentifier
unter https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129.Um eine zu erstellen StreamIdentifier
, empfehlen wir, eine Multistream-Instanz aus demstreamArn
und dem zu erstellenstreamCreationEpoch
, das in KCL 2.5.0 oder höher verfügbar ist. Erstellen Sie in KCL Version 2.3 und Version 2.4, die dies nicht unterstützenstreamArm
, eine Multistream-Instance mithilfe des folgenden Formats.account-id:StreamName:streamCreationTimestamp
Dieses Format ist veraltet und wird ab der nächsten Hauptversion nicht mehr unterstützt. -
MultistreamTracker beinhaltet auch eine Strategie zum Löschen von Leasingverträgen alter Streams in der Leasetabelle (). formerStreamsLeases DeletionStrategy Beachten Sie, dass die Strategie während der Laufzeit der Verbraucheranwendung geändert CANNOT werden muss. Weitere Informationen finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client
.java. src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy
-
Oder Sie können ConfigsBuilder mit initialisieren, MultiStreamTracker
wenn Sie eine KCL Verbraucheranwendung implementieren möchten, die mehrere Streams gleichzeitig verarbeitet.
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Da die Multistream-Unterstützung für Ihre KCL Consumer-Anwendung implementiert ist, enthält jede Zeile der Leasetabelle der Anwendung jetzt die Shard-ID und den Stream-Namen der mehreren Datenströme, die diese Anwendung verarbeitet.
-
Wenn die Multistream-Unterstützung für Ihre KCL Verbraucheranwendung implementiert ist, hat sie die leaseKey folgende Struktur:.
account-id:StreamName:streamCreationTimestamp:ShardId
Beispiel,111111111:multiStreamTest-1:12345:shardId-000000000336
.
Wichtig
Wenn Ihre bestehende KCL Verbraucheranwendung so konfiguriert ist, dass sie nur einen Datenstrom verarbeitet, ist der leaseKey
(der Partitionsschlüssel für die Leasetabelle) die Shard-ID. Wenn Sie eine bestehende KCL Verbraucheranwendung so umkonfigurieren, dass sie mehrere Datenströme verarbeitet, wird dadurch Ihre Leasing-Tabelle beschädigt, da die leaseKey
Struktur wie folgt aussehen muss: account-id:StreamName:StreamCreationTimestamp:ShardId
um Multistream zu unterstützen.