KCLInformationen zu 1.x und 2.x - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

KCLInformationen zu 1.x und 2.x

Anmerkung

Die Versionen 1.x und 2.x der Kinesis Client Library (KCL) sind veraltet. Wir empfehlen die Migration auf KCLVersion 3.x, die eine verbesserte Leistung und neue Funktionen bietet. Die aktuelle KCL Dokumentation und den Migrationsleitfaden finden Sie unter. Verwenden Sie die Kinesis-Clientbibliothek

Eine der Methoden zur Entwicklung benutzerdefinierter Verbraucheranwendungen, die Daten aus KDS Datenströmen verarbeiten können, ist die Verwendung der Kinesis Client Library (KCL).

Anmerkung

Sowohl für KCL 1.x als auch für KCL 2.x wird empfohlen, je nach Nutzungsszenario ein Upgrade auf die neueste Version KCL KCL 1.x oder 2.x durchzuführen. Sowohl KCL 1.x als auch KCL 2.x werden regelmäßig mit neueren Versionen aktualisiert, die die neuesten Abhängigkeits- und Sicherheitspatches, Bugfixes und abwärtskompatible neue Funktionen enthalten. Weitere Informationen finden Sie unter /releases. https://github.com/awslabs/ amazon-kinesis-client

Über uns KCL (frühere Versionen)

KCLhilft Ihnen dabei, Daten aus einem Kinesis-Datenstrom zu nutzen und zu verarbeiten, indem es sich um viele der komplexen Aufgaben kümmert, die mit verteilter Datenverarbeitung verbunden sind. Dazu gehören der Lastenausgleich über mehrere Anwendungs-Instances hinweg, die Reaktion auf Ausfälle von Verbraucheranwendungs-Instances, die Überprüfung verarbeiteter Datensätze und die Reaktion auf Resharding. Der KCL kümmert sich um all diese Unteraufgaben, sodass Sie sich darauf konzentrieren können, Ihre benutzerdefinierte Logik für die Datensatzverarbeitung zu schreiben.

Das KCL unterscheidet sich von den Kinesis Data StreamsAPIs, die in der AWS SDKs verfügbar sind. Die Kinesis Data Streams APIs helfen Ihnen bei der Verwaltung vieler Aspekte von Kinesis Data Streams, darunter das Erstellen von Streams, das Resharding sowie das Einfügen und Abrufen von Datensätzen. Das KCL bietet eine Abstraktionsebene für all diese Unteraufgaben, sodass Sie sich auf die benutzerdefinierte Datenverarbeitungslogik Ihrer Verbraucheranwendung konzentrieren können. Informationen zu den Kinesis Data Streams API finden Sie in der Amazon Kinesis Kinesis-Referenz API.

Wichtig

Das KCL ist eine Java-Bibliothek. Die Support für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle namens bereitgestellt. MultiLangDaemon Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL Sprache als Java verwenden. Wenn Sie beispielsweise das KCL für Python installieren und Ihre Verbraucheranwendung vollständig in Python schreiben, müssen Sie trotzdem Java auf Ihrem System installieren, da MultiLangDaemon Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. an die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon GitHub unter KCL MultiLangDaemon Projekt.

Der KCL fungiert als Vermittler zwischen Ihrer Datensatzverarbeitungslogik und Kinesis Data Streams.

Frühere KCL-Versionen

Derzeit können Sie eine der folgenden unterstützten Versionen von verwenden, um Ihre benutzerdefinierten Anwendungen für Privatanwender KCL zu erstellen:

Sie können entweder KCL 1.x oder KCL 2.x verwenden, um Verbraucheranwendungen zu erstellen, die einen gemeinsamen Durchsatz verwenden. Weitere Informationen finden Sie unter Entwickeln Sie benutzerdefinierte Verbraucher mit gemeinsamem Durchsatz mithilfe von KCL.

Um Verbraucheranwendungen zu erstellen, die einen dedizierten Durchsatz verwenden (erweiterte Fan-Out-Verbraucher), können Sie nur 2.x verwenden. KCL Weitere Informationen finden Sie unter Entwickeln Sie verbesserte Fan-Out-Verbraucher mit dediziertem Durchsatz.

Informationen zu den Unterschieden zwischen KCL 1.x und KCL 2.x sowie Anweisungen zur Migration von KCL 1.x zu 2.x finden Sie unter. KCL Migrieren Sie Verbraucher von KCL 1.x auf 2.x KCL

KCLKonzepte (frühere Versionen)

  • KCLAnwendung für Privatanwender — eine Anwendung, die speziell für das Lesen KCL und Verarbeiten von Datensätzen aus Datenströmen entwickelt wurde.

  • Anwendungsinstanz für KCL Privatanwender — Anwendungen für Privatanwender sind in der Regel verteilt, wobei eine oder mehrere Anwendungsinstanzen gleichzeitig ausgeführt werden, um Fehler zu koordinieren und einen dynamischen Lastenausgleich bei der Verarbeitung von Datensätzen vorzunehmen.

  • Worker — eine übergeordnete Klasse, die eine Anwendungsinstanz für KCL Privatanwender verwendet, um mit der Datenverarbeitung zu beginnen.

    Wichtig

    Jede Instanz einer KCL Verbraucheranwendung hat einen Worker.

    Der Worker initialisiert und überwacht verschiedene Aufgaben, darunter das Synchronisieren von Shard- und Leasing-Informationen, das Verfolgen von Shard-Zuweisungen und das Verarbeiten von Daten aus den Shards. Ein Worker stellt KCL die Konfigurationsinformationen für die Verbraucheranwendung bereit, z. B. den Namen des Datenstroms, dessen Datensätze diese KCL Verbraucheranwendung verarbeiten wird, und die AWS Anmeldeinformationen, die für den Zugriff auf diesen Datenstrom erforderlich sind. Der Worker startet außerdem die spezifische Instanz der KCL Verbraucheranwendung, um Datensätze aus dem Datenstrom an die Datensatzprozessoren weiterzuleiten.

    Wichtig

    In KCL 1.x heißt diese Klasse Worker. Weitere Informationen (dies sind die KCL Java-Repositorys) finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. In KCL 2.x heißt diese Klasse Scheduler. Der Zweck von Scheduler in KCL 2.x ist identisch mit dem Zweck von Worker in 1.x. KCL Weitere Informationen zur Scheduler-Klasse in KCL 2.x finden Sie unter/.java. https://github.com/awslabs/ amazon-kinesis-client blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler

  • Lease – Daten, die die Bindung zwischen einem Worker und einem Shard definieren. Verteilte KCL Verbraucheranwendungen verwenden Leases, um die Verarbeitung von Datensätzen auf eine ganze Flotte von Mitarbeitern zu verteilen. Zu einem bestimmten Zeitpunkt ist jede Datenmenge durch einen durch die Variable identifizierten Leasingvertrag an einen bestimmten Mitarbeiter gebunden. leaseKey

    Standardmäßig kann ein Arbeitnehmer einen oder mehrere Mietverträge (abhängig vom Wert der Variablen maxLeasesForWorker) gleichzeitig abschließen.

    Wichtig

    Jeder Worker versucht, alle verfügbaren Leases für alle verfügbaren Shards in einem Datenstrom zu halten. Aber es kann nur jeweils ein Worker jeden Lease zu einem bestimmten Zeitpunkt erfolgreich halten.

    Wenn Sie beispielsweise eine Konsumentenanwendungs-Instance A mit Worker A haben, die einen Datenstrom mit 4 Shards verarbeitet, kann Worker A Leases für die Shards 1, 2, 3 und 4 gleichzeitig halten. Wenn Sie jedoch zwei Konsumentenanwendungs-Instances haben: A und B mit Worker A und Worker B und diese Instances einen Datenstrom mit 4 Shards verarbeiten, können Worker A und Worker B nicht gleichzeitig den Lease für Shard 1 halten. Ein Worker hält den Lease für einen bestimmten Shard, bis er bereit ist, die Verarbeitung der Datensätze dieses Shards zu beenden, oder bis er ausfällt. Wenn ein Worker den Lease beendet, nimmt ein anderer Worker den Lease auf und hält ihn.

    Weitere Informationen (dies sind die KCL Java-Repositorys) finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java für KCL 1.x und https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java für 2.x. KCL

  • Leasing-Tabelle — eine einzigartige Amazon DynamoDB-Tabelle, die verwendet wird, um die Shards in einem KDS Datenstrom zu verfolgen, die von den Mitarbeitern der Verbraucheranwendung geleast und verarbeitet werden. KCL Die Leasing-Tabelle muss (innerhalb eines Workers und für alle Mitarbeiter) mit den neuesten Shard-Informationen aus dem Datenstrom synchron bleiben, während die Consumer-Anwendung ausgeführt wirdKCL. Weitere Informationen finden Sie unter Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der Consumer-Anwendung verarbeitet wurden KCL.

  • Record Processor — die Logik, die definiert, wie Ihre KCL Verbraucheranwendung die Daten verarbeitet, die sie aus den Datenströmen erhält. Zur Laufzeit instanziiert eine KCL Consumer-Anwendungsinstanz einen Worker, und dieser Worker instanziiert einen Datensatzprozessor für jeden Shard, für den er eine Lease hält.

Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der Consumer-Anwendung verarbeitet wurden KCL

Was ist eine Leasing-Tabelle

KCLVerwendet für jede Amazon Kinesis Data Streams Streams-Anwendung eine eindeutige Leasetabelle (gespeichert in einer Amazon DynamoDB-Tabelle), um den Überblick über die Shards in einem KDS Datenstream zu behalten, die von den Mitarbeitern der Verbraucheranwendung geleast und verarbeitet werden. KCL

Wichtig

KCLverwendet den Namen der Verbraucheranwendung, um den Namen der Leasetabelle zu erstellen, die diese Consumer-Anwendung verwendet. Daher muss jeder Consumer-Anwendungsname eindeutig sein.

Sie können die Leasetabelle mit der Amazon-DynamoDB-Konsole anzeigen, während die Konsumentenanwendung ausgeführt wird.

Wenn die Leasetabelle für Ihre KCL Consumer-Anwendung beim Start der Anwendung nicht vorhanden ist, erstellt einer der Worker die Leasingtabelle für diese Anwendung.

Wichtig

Ihr Konto wird neben den Kosten für Kinesis Data Streams mit den Kosten belastet, die für die DynamoDB-Tabelle anfallen.

Jede Zeile in der Leasetabelle stellt einen Shard dar, der von den Workern Ihrer Konsumentenanwendung verarbeitet wird. Wenn Ihre KCL Consumer-Anwendung nur einen Datenstrom verarbeitetleaseKey, dann ist der Hash-Schlüssel für die Leasing-Tabelle die Shard-ID. Wenn jaVerarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung, dann leaseKey sieht die Struktur von so aus:account-id:StreamName:streamCreationTimestamp:ShardId. Beispiel, 111111111:multiStreamTest-1:12345:shardId-000000000336.

Neben der Shard-ID enthält jede Zeile noch folgende Daten:

  • checkpoint: Die letzte Prüfpunkt-Sequenznummer des Shards. Dieser Wert ist für alle Shards im Datenstrom eindeutig.

  • checkpointSubSequenceNummer: Wenn Sie die Aggregationsfunktion der Kinesis Producer Library verwenden, ist dies eine Erweiterung des Checkpoints, mit der einzelne Benutzerdatensätze innerhalb des Kinesis-Datensatzes verfolgt werden.

  • leaseCounter: Wird für die Versionierung von Leasing-Verträgen verwendet, sodass Mitarbeiter erkennen können, dass ihr Leasing von einem anderen Mitarbeiter angenommen wurde.

  • leaseKey: Eine eindeutige Kennung für einen Mietvertrag. Jeder Lease gilt für einen bestimmten Shard im Datenstrom und wird immer nur von einem Worker gehalten.

  • leaseOwner: Der Arbeitnehmer, der diesen Mietvertrag innehat.

  • ownerSwitchesSinceCheckpoint: Wie oft hat dieser Mietvertrag die Mitarbeiter gewechselt, seit ein Checkpoint das letzte Mal geschrieben wurde.

  • parentShardId: Wird verwendet, um sicherzustellen, dass der übergeordnete Shard vollständig verarbeitet ist, bevor die Verarbeitung der untergeordneten Shards beginnt. So wird sichergestellt, dass Datensätze in der gleichen Reihenfolge verarbeitet werden, in der sie in den Stream eingespeist wurden.

  • hashrange: Wird von PeriodicShardSyncManager verwendet, um regelmäßige Synchronisierungen durchzuführen, um fehlende Shards in der Leasetabelle zu finden und bei Bedarf Leases für sie zu erstellen.

    Anmerkung

    Diese Daten sind in der Leasetabelle für jeden Shard enthalten, der mit KCL 1.14 und 2.3 beginnt. KCL Weitere Hinweise zu PeriodicShardSyncManager und die periodische Synchronisierung zwischen Leases und Shards finden Sie unter So wird eine Leasing-Tabelle mit den Shards in einem Kinesis-Datenstrom synchronisiert.

  • childshards: Wird von LeaseCleanupManager verwendet, um den Verarbeitungsstatus des untergeordneten Shards zu überprüfen und zu entscheiden, ob der übergeordnete Shard aus der Leasetabelle gelöscht werden kann.

    Anmerkung

    Diese Daten sind in der Leasing-Tabelle für jeden Shard enthalten, der mit KCL 1.14 und 2.3 beginnt. KCL

  • shardID: Die ID des Shards.

    Anmerkung

    Diese Daten sind nur dann in der Leasetabelle enthalten, wenn Sie Verarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung sind. Dies wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und höher.

  • Streamname Die Kennung des Datenstroms im folgenden Format: account-id:StreamName:streamCreationTimestamp.

    Anmerkung

    Diese Daten sind nur dann in der Leasetabelle enthalten, wenn Sie Verarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung sind. Dies wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und höher.

Durchsatz

Wenn Ihre Amazon Kinesis Data Streams-Anwendung Ausnahmen für den bereitgestellten Durchsatz erhält, sollten Sie den bereitgestellten Durchsatz für die DynamoDB-Tabelle erhöhen. Das KCL erstellt die Tabelle mit einem bereitgestellten Durchsatz von 10 Lesevorgängen pro Sekunde und 10 Schreibvorgängen pro Sekunde, was für Ihre Anwendung jedoch möglicherweise nicht ausreichend ist. Beispiel: Wenn Ihre Amazon Kinesis Data Streams-Anwendung häufig Prüfpunkte setzt oder einen Stream verarbeitet, der aus vielen Shards besteht, müssen Sie den Durchsatz möglicherweise erhöhen.

Weitere Informationen zum bereitgestellten Durchsatz in DynamoDB finden Sie unter Lese-/Schreibkapazitätsmodus und Arbeiten mit Tabellen und Daten im Amazon-DynamoDB-Entwicklerhandbuch.

So wird eine Leasing-Tabelle mit den Shards in einem Kinesis-Datenstrom synchronisiert

Mitarbeiter in KCL Verbraucheranwendungen verwenden Leases, um Shards aus einem bestimmten Datenstrom zu verarbeiten. Die Informationen darüber, welcher Worker zu einem bestimmten Zeitpunkt welchen Shard least, werden in einer Leasetabelle gespeichert. Die Leasetabelle muss mit den neuesten Shard-Informationen aus dem Datenstrom synchron bleiben, während die KCL Verbraucheranwendung ausgeführt wird. KCLsynchronisiert die Leasetabelle mit den Shard-Informationen, die vom Kinesis Data Streams Streams-Dienst während des Bootstrapings der Consumer-Anwendung (entweder wenn die Consumer-Anwendung initialisiert oder neu gestartet wird) und auch immer dann, wenn ein Shard, der gerade verarbeitet wird, ein Ende erreicht (Resharding), erhält. Mit anderen Worten, die Worker oder eine KCL Verbraucheranwendung werden mit dem Datenstrom synchronisiert, den sie beim ersten Bootstrap der Verbraucheranwendung und immer dann verarbeiten, wenn die Verbraucheranwendung auf ein Datenstrom-Reshard-Ereignis trifft.

Synchronisation in KCL 1.0-1.13 und 2.0-2.2 KCL

In KCL 1.0-1.13 und KCL 2.0-2.2 wird beim Bootstraping der Verbraucheranwendung und auch bei jedem Reshard-Ereignis für den Datenstrom die Leasetabelle mit den Shard-Informationen KCL synchronisiert, die vom Kinesis Data Streams Streams-Dienst abgerufen wurden, indem die oder die Erkennung aufgerufen wird. ListShards DescribeStream APIs In allen oben aufgeführten KCL Versionen führt jeder Worker einer KCL Consumer-Anwendung die folgenden Schritte aus, um den Lease-/Shard-Synchronisierungsprozess während des Bootstrappings der Consumer-Anwendung und bei jedem Stream-Reshard-Ereignis durchzuführen:

  • Ruft alle Shards für Daten des Streams ab, der gerade verarbeitet wird

  • Ruft alle Shard-Leases aus der Leasetabelle ab

  • Filtert jeden offenen Shard heraus, für den es in der Leasetabelle keinen Lease gibt

  • Iteriert über alle gefundenen offenen Shards und für jeden offenen Shard ohne offenes übergeordnetes Element:

    • Durchläuft den Hierarchiebaum über den Pfad der Vorgänger, um festzustellen, ob der Shard ein Nachfolger ist. Ein Shard wird als untergeordnetes Element betrachtet, wenn gerade ein Vorgänger-Shard verarbeitet wird (der Leaseeintrag für den Vorgänger-Shard ist in der Leasetabelle vorhanden) oder wenn ein Vorgänger-Shard verarbeitet werden soll (wenn die Anfangsposition beispielsweise TRIM_HORIZON oder AT_TIMESTAMP ist)

    • Wenn es sich bei dem offenen Shard im Kontext um ein abgeleitetes Objekt handelt, KCL überprüft es den Shard anhand seiner ursprünglichen Position und erstellt, falls erforderlich, Leasingverträge für seine übergeordneten Elemente

Synchronisation in KCL 2.x, beginnend mit 2.3 und höher KCL

Beginnend mit den neuesten unterstützten Versionen von KCL 2.x (KCL2.3) und höher unterstützt die Bibliothek jetzt die folgenden Änderungen am Synchronisationsprozess. Diese Änderungen an der Lease-/Shard-Synchronisierung reduzieren die Anzahl der API Aufrufe von KCL Verbraucheranwendungen an den Kinesis Data Streams Streams-Service erheblich und optimieren das Lease-Management in Ihrer Verbraucheranwendung. KCL

  • Wenn die Leasetabelle beim Bootstraping der Anwendung leer ist, KCL verwendet sie die Filteroption ListShard API von (den ShardFilter optionalen Anforderungsparameter), um Leases nur für einen Snapshot von Shards abzurufen und zu erstellen, die zu dem durch den Parameter angegebenen Zeitpunkt geöffnet sind. ShardFilter Mit dem ShardFilter Parameter können Sie die Antwort von herausfiltern. ListShards API Die einzige erforderliche Eigenschaft des ShardFilter-Parameters ist Type. KCLverwendet die Eigenschaft Type filter und die folgenden gültigen Werte, um offene Shards, für die möglicherweise neue Leases erforderlich sind, zu identifizieren und eine Momentaufnahme zurückzugeben:

    • AT_TRIM_HORIZON – Die Antwort umfasst alle Shards, die am TRIM_HORIZON geöffnet waren.

    • AT_LATEST – Die Antwort enthält nur die aktuell geöffneten Shards des Datenstroms.

    • AT_TIMESTAMP – Die Antwort umfasst alle Shards, deren Startzeitstempel kleiner oder gleich dem angegebenen Zeitstempel sind und deren Endzeitstempel größer oder gleich dem angegebenen Zeitstempel ist oder solche, die noch offen sind.

    ShardFilter wird verwendet, wenn Leases für eine leere Leasetabelle erstellt werden, um Leases für einen Snapshot von Shards zu initialisieren, die unter RetrievalConfig#initialPositionInStreamExtended angegeben sind.

    Mehr über ShardFilter erfahren Sie unter https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Anstatt dass alle Worker die lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard Synchronisation durchführen.

  • KCL2.3 verwendet den ChildShards Rückgabeparameter von GetRecords und, SubscribeToShard APIs um die Leasing-/Shard-Synchronisierung durchzuführen, die bei SHARD_END geschlossenen Shards stattfindet, sodass ein KCL Worker nur Leases für die untergeordneten Shards des Shards erstellen kann, dessen Verarbeitung abgeschlossen ist. Diese Optimierung der Leasing-/Shard-Synchronisierung verwendet bei der gemeinsamen Nutzung von Consumer-Anwendungen den Parameter von. ChildShards GetRecords API Für Verbraucheranwendungen mit dediziertem Durchsatz (erweiterter Fan-Out) verwendet diese Optimierung der Lease-/Shard-Synchronisierung den Parameter von. ChildShards SubscribeToShard API Weitere Informationen finden Sie unter GetRecords, und. SubscribeToShardsChildShard

  • Mit den oben genannten Änderungen geht das Verhalten von von KCL weg von dem Modell, dass alle Mitarbeiter über alle vorhandenen Shards Bescheid wissen, hin zu dem Modell, dass Arbeitnehmer nur über die untergeordneten Scherben der Shards, die jeder Arbeiter besitzt, lernen. Daher werden neben der Synchronisation, die beim Bootstraping von Verbraucheranwendungen und bei Reshard-Ereignissen stattfindet, KCL jetzt auch zusätzliche regelmäßige Shard-/Lease-Scans durchgeführt, um potenzielle Lücken in der Leasetabelle zu identifizieren (mit anderen Worten, um mehr über alle neuen Shards zu erfahren), um sicherzustellen, dass der gesamte Hash-Bereich des Datenstroms verarbeitet wird, und um bei Bedarf Leases für sie zu erstellen. PeriodicShardSyncManagerist die Komponente, die für die regelmäßige Ausführung von Lease-/Shard-Scans verantwortlich ist.

    Weitere Informationen zu PeriodicShardSyncManager Version KCL 2.3 finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java #L201 -L213.

    In KCL 2.3 stehen neue Konfigurationsoptionen zur Konfiguration zur Verfügung in: PeriodicShardSyncManager LeaseManagementConfig

    Name Standardwert Beschreibung
    leasesRecoveryAuditorExecutionFrequencyMillis

    120 000 (2 Minuten)

    Häufigkeit (in Millionen), mit der der Auditor in der Leasetabelle nach teilweisen Leases sucht. Wenn der Auditor eine Lücke in den Leases für einen Stream feststellt, würde er die Shard-Synchronisierung auf der Grundlage von leasesRecoveryAuditorInconsistencyConfidenceThreshold auslösen.

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Konfidenzschwellenwert für die regelmäßige Auditortätigkeit, um festzustellen, ob die Leases für einen Datenstrom in der Leasetabelle inkonsistent sind. Wenn der Auditor für diesen Datenstrom mehrmals hintereinander dieselben Inkonsistenzen feststellt, würde er eine Shard-Synchronisierung auslösen.

    Außerdem werden jetzt neue CloudWatch Messwerte ausgegeben, um den Zustand von zu überwachenPeriodicShardSyncManager. Weitere Informationen finden Sie unter PeriodicShardSyncManager.

  • Einschließlich einer Optimierung auf HierarchicalShardSyncer, um nur Leases für eine Shard-Ebene zu erstellen.

Synchronisation in KCL 1.x, beginnend mit KCL 1.14 und höher

Beginnend mit den neuesten unterstützten Versionen von KCL 1.x (KCL1.14) und höher unterstützt die Bibliothek jetzt die folgenden Änderungen am Synchronisationsprozess. Diese Änderungen an der Lease-/Shard-Synchronisierung reduzieren die Anzahl der API Aufrufe von KCL Verbraucheranwendungen an den Kinesis Data Streams Streams-Service erheblich und optimieren das Lease-Management in Ihrer Verbraucheranwendung. KCL

  • Wenn die Leasetabelle beim Bootstraping der Anwendung leer ist, KCL verwendet sie die Filteroption ListShard API von (den ShardFilter optionalen Anforderungsparameter), um Leases nur für einen Snapshot von Shards abzurufen und zu erstellen, die zu dem durch den Parameter angegebenen Zeitpunkt geöffnet sind. ShardFilter Mit dem ShardFilter Parameter können Sie die Antwort von herausfiltern. ListShards API Die einzige erforderliche Eigenschaft des ShardFilter-Parameters ist Type. KCLverwendet die Eigenschaft Type filter und die folgenden gültigen Werte, um offene Shards, für die möglicherweise neue Leases erforderlich sind, zu identifizieren und eine Momentaufnahme zurückzugeben:

    • AT_TRIM_HORIZON – Die Antwort umfasst alle Shards, die am TRIM_HORIZON geöffnet waren.

    • AT_LATEST – Die Antwort enthält nur die aktuell geöffneten Shards des Datenstroms.

    • AT_TIMESTAMP – Die Antwort umfasst alle Shards, deren Startzeitstempel kleiner oder gleich dem angegebenen Zeitstempel sind und deren Endzeitstempel größer oder gleich dem angegebenen Zeitstempel ist oder solche, die noch offen sind.

    ShardFilter wird verwendet, wenn Leases für eine leere Leasetabelle erstellt werden, um Leases für einen Snapshot von Shards zu initialisieren, die unter KinesisClientLibConfiguration#initialPositionInStreamExtended angegeben sind.

    Mehr über ShardFilter erfahren Sie unter https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Anstatt dass alle Worker die lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard Synchronisation durchführen.

  • KCL1.14 verwendet den ChildShards Rückgabeparameter von GetRecords und, SubscribeToShard APIs um die Leasing-/Shard-Synchronisierung durchzuführen, die bei geschlossenen Shards stattfindet, sodass ein KCL Worker nur Leases SHARD_END für die untergeordneten Shards des Shards erstellen kann, dessen Verarbeitung abgeschlossen ist. GetRecordsWeitere ChildShardInformationen finden Sie unter und.

  • Mit den oben genannten Änderungen KCL geht das Verhalten von weg von dem Modell, dass alle Arbeitnehmer über alle vorhandenen Scherben Bescheid wissen, hin zu dem Modell, dass die Arbeitnehmer nur über die untergeordneten Scherben der Scherben, die jeder Arbeiter besitzt, lernen. Daher werden neben der Synchronisation, die beim Bootstraping von Verbraucheranwendungen und bei Reshard-Ereignissen stattfindet, KCL jetzt auch zusätzliche regelmäßige Shard-/Lease-Scans durchgeführt, um potenzielle Lücken in der Leasetabelle zu identifizieren (mit anderen Worten, um mehr über alle neuen Shards zu erfahren), um sicherzustellen, dass der gesamte Hash-Bereich des Datenstroms verarbeitet wird, und um bei Bedarf Leases für sie zu erstellen. PeriodicShardSyncManagerist die Komponente, die für die regelmäßige Ausführung von Lease-/Shard-Scans verantwortlich ist.

    Wenn KinesisClientLibConfiguration#shardSyncStrategyType auf ShardSyncStrategyType.SHARD_END gesetzt ist, wird PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold verwendet, um den Schwellenwert für die Anzahl der aufeinanderfolgenden Scans mit Lücken in der Leasetabelle zu bestimmen, nach dem eine Shard-Synchronisierung erzwungen werden soll. Wenn KinesisClientLibConfiguration#shardSyncStrategyType auf ShardSyncStrategyType.PERIODIC gesetzt ist, wird leasesRecoveryAuditorInconsistencyConfidenceThreshold ignoriert.

    Weitere Informationen zu PeriodicShardSyncManager Version KCL 1.14 finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java #L987 -L999.

    In KCL 1.14 steht eine neue Konfigurationsoption zur Konfiguration zur Verfügung in: PeriodicShardSyncManager LeaseManagementConfig

    Name Standardwert Beschreibung
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Konfidenzschwellenwert für die regelmäßige Auditortätigkeit, um festzustellen, ob die Leases für einen Datenstrom in der Leasetabelle inkonsistent sind. Wenn der Auditor für diesen Datenstrom mehrmals hintereinander dieselben Inkonsistenzen feststellt, würde er eine Shard-Synchronisierung auslösen.

    Neue CloudWatch Metriken werden jetzt auch ausgegeben, um den Zustand von zu überwachen. PeriodicShardSyncManager Weitere Informationen finden Sie unter PeriodicShardSyncManager.

  • KCL1.14 unterstützt jetzt auch die verzögerte Leasingbereinigung. Leases werden asynchron von LeaseCleanupManager bei Erreichen von SHARD_END gelöscht, wenn ein Shard entweder die Aufbewahrungsfrist des Datenstroms überschritten hat oder wenn er aufgrund eines Resharding-Vorgangs geschlossen wurde.

    Es stehen neue Konfigurationsoptionen zur Verfügung, um LeaseCleanupManager zu konfigurieren.

    Name Standardwert Beschreibung
    leaseCleanupIntervalMillis

    1 Minute

    Intervall, in dem der Lease-Cleanup-Thread ausgeführt werden soll.

    completedLeaseCleanupIntervalMillis 5 Minuten

    Intervall, in dem überprüft werden soll, ob ein Lease abgeschlossen ist oder nicht.

    garbageLeaseCleanupIntervalMillis 30 Minuten

    Intervall, in dem geprüft werden soll, ob ein Lease Datenmüll ist (d. h. über die Aufbewahrungsfrist des Datenstroms hinaus abgeschnitten wurde) oder nicht.

  • Einschließlich einer Optimierung auf KinesisShardSyncer, um nur Leases für eine Shard-Ebene zu erstellen.

Verarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung

In diesem Abschnitt werden die folgenden Änderungen in KCL 2.x für Java beschrieben, mit denen Sie KCL Verbraucheranwendungen erstellen können, die mehr als einen Datenstrom gleichzeitig verarbeiten können.

Wichtig

Die Multistream-Verarbeitung wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und höher.

Die Multistream-Verarbeitung wird für alle anderen Sprachen NOT unterstützt, in denen KCL 2.x implementiert werden kann.

Die Multistream-Verarbeitung wird in allen Versionen von 1.x NOT unterstützt. KCL

  • MultistreamTracker Schnittstelle

    Um eine Verbraucheranwendung zu erstellen, die mehrere Streams gleichzeitig verarbeiten kann, müssen Sie eine neue Schnittstelle namens implementieren MultistreamTracker. Diese Schnittstelle enthält die streamConfigList Methode, die die Liste der Datenströme und ihrer Konfigurationen zurückgibt, die von der KCL Verbraucheranwendung verarbeitet werden sollen. Beachten Sie, dass die Datenströme, die verarbeitet werden, während der Laufzeit der Verbraucheranwendung geändert werden können. streamConfigListwird regelmäßig von der aufgerufenKCL, um mehr über die Änderungen der zu verarbeitenden Datenströme zu erfahren.

    Die streamConfigList Methode füllt die StreamConfigListe auf.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    Beachten Sie, dass es sich bei den Feldern StreamIdentifier und InitialPositionInStreamExtended um Pflichtfelder handelt, während consumerArn optional ist. Sie müssen das consumerArn nur angeben, wenn Sie KCL 2.x verwenden, um eine erweiterte Fan-Out-Consumer-Anwendung zu implementieren.

    Weitere Informationen StreamIdentifier dazu finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. Um eine zu erstellenStreamIdentifier, empfehlen wir, dass Sie eine Multistream-Instanz aus der streamArn und der erstellenstreamCreationEpoch, die in Version 2.5.0 und höher verfügbar ist. Erstellen Sie in KCL Version 2.3 und Version 2.4, die dies nicht unterstützenstreamArm, eine Multistream-Instance mithilfe des folgenden Formats. account-id:StreamName:streamCreationTimestamp Dieses Format ist veraltet und wird ab der nächsten Hauptversion nicht mehr unterstützt.

    MultistreamTracker beinhaltet auch eine Strategie zum Löschen von Leases alter Streams in der Leasetabelle (formerStreamsLeasesDeletionStrategy). Beachten Sie, dass die Strategie CANNOT während der Laufzeit der Verbraucheranwendung geändert wird. Weitere Informationen finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client.java src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilderist eine anwendungsweite Klasse, mit der Sie alle 2.x-Konfigurationseinstellungen angeben können, die beim Erstellen Ihrer Verbraucheranwendung verwendet werden sollen. KCL KCL ConfigsBuilderDie Klasse unterstützt jetzt die Schnittstelle. MultistreamTracker Sie können ConfigsBuilder entweder mit dem Namen des einen Datenstroms initialisieren, aus dem Datensätze abgerufen werden sollen:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    Oder Sie können ConfigsBuilder mit initialisieren, MultiStreamTracker wenn Sie eine KCL Verbraucheranwendung implementieren möchten, die mehrere Streams gleichzeitig verarbeitet.

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Da die Multistream-Unterstützung für Ihre KCL Consumer-Anwendung implementiert ist, enthält jede Zeile der Leasetabelle der Anwendung jetzt die Shard-ID und den Stream-Namen der mehreren Datenströme, die diese Anwendung verarbeitet.

  • Wenn die Multistream-Unterstützung für Ihre KCL Verbraucheranwendung implementiert ist, hat sie die leaseKey folgende Struktur:. account-id:StreamName:streamCreationTimestamp:ShardId Beispiel, 111111111:multiStreamTest-1:12345:shardId-000000000336.

    Wichtig

    Wenn Ihre bestehende KCL Verbraucheranwendung so konfiguriert ist, dass sie nur einen Datenstrom verarbeitet, ist der leaseKey (der Hash-Schlüssel für die Leasing-Tabelle) die Shard-ID. Wenn Sie diese bestehende KCL Verbraucheranwendung so umkonfigurieren, dass sie mehrere Datenströme verarbeitet, wird Ihre Leasing-Tabelle beschädigt, da bei Multistream-Unterstützung die leaseKey Struktur wie folgt aussehen muss:. account-id:StreamName:StreamCreationTimestamp:ShardId

Verwenden Sie das KCL mit der Schemaregistrierung AWS Glue

Sie können Ihre Kinesis-Datenströme in die AWS Glue Schema Registry integrieren. Mit der AWS Glue Schema Registry können Sie Schemas zentral erkennen, steuern und weiterentwickeln und gleichzeitig sicherstellen, dass die erstellten Daten kontinuierlich anhand eines registrierten Schemas validiert werden. Ein Schema definiert die Struktur und das Format eines Datensatzes. Ein Schema ist eine versionierte Spezifikation für zuverlässige Datenveröffentlichung, -nutzung oder -speicherung. Mit der AWS Glue Schema Registry können Sie die end-to-end Datenqualität und die Datenverwaltung in Ihren Streaming-Anwendungen verbessern. Weitere Informationen finden Sie unter AWS Glue Schema Registry. Eine Möglichkeit, diese Integration einzurichten, ist die KCL in Java.

Wichtig

Derzeit wird die Integration von Kinesis Data Streams und AWS Glue Schema Registry nur für Kinesis-Datenstreams unterstützt, die in Java implementierte KCL 2.3-Consumer verwenden. Mehrsprachige Unterstützung wird nicht bereitgestellt. KCL1.0-Verbraucher werden nicht unterstützt. KCL2.x-Verbraucher vor KCL 2.3 werden nicht unterstützt.

Detaillierte Anweisungen zum Einrichten der Integration von Kinesis Data Streams mit Schema Registry mithilfe von finden Sie im Abschnitt „Interaktion mit Daten mithilfe der KPL KCL /-Bibliotheken“ unter Anwendungsfall: Integration von Amazon Kinesis Data Streams mit der AWS Glue Schema Registry. KCL