Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
KCLInformationen zu 1.x und 2.x
Anmerkung
Die Versionen 1.x und 2.x der Kinesis Client Library (KCL) sind veraltet. Wir empfehlen die Migration auf KCLVersion 3.x, die eine verbesserte Leistung und neue Funktionen bietet. Die aktuelle KCL Dokumentation und den Migrationsleitfaden finden Sie unter. Verwenden Sie die Kinesis-Clientbibliothek
Eine der Methoden zur Entwicklung benutzerdefinierter Verbraucheranwendungen, die Daten aus KDS Datenströmen verarbeiten können, ist die Verwendung der Kinesis Client Library (KCL).
Themen
- Über uns KCL (frühere Versionen)
- Frühere KCL-Versionen
- KCLKonzepte (frühere Versionen)
- Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der Consumer-Anwendung verarbeitet wurden KCL
- Verarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung
- Verwenden Sie das KCL mit der Schemaregistrierung AWS Glue
Anmerkung
Sowohl für KCL 1.x als auch für KCL 2.x wird empfohlen, je nach Nutzungsszenario ein Upgrade auf die neueste Version KCL KCL 1.x oder 2.x durchzuführen. Sowohl KCL 1.x als auch KCL 2.x werden regelmäßig mit neueren Versionen aktualisiert, die die neuesten Abhängigkeits- und Sicherheitspatches, Bugfixes und abwärtskompatible neue Funktionen enthalten. Weitere Informationen finden Sie unter /releases. https://github.com/awslabs/ amazon-kinesis-client
Über uns KCL (frühere Versionen)
KCLhilft Ihnen dabei, Daten aus einem Kinesis-Datenstrom zu nutzen und zu verarbeiten, indem es sich um viele der komplexen Aufgaben kümmert, die mit verteilter Datenverarbeitung verbunden sind. Dazu gehören der Lastenausgleich über mehrere Anwendungs-Instances hinweg, die Reaktion auf Ausfälle von Verbraucheranwendungs-Instances, die Überprüfung verarbeiteter Datensätze und die Reaktion auf Resharding. Der KCL kümmert sich um all diese Unteraufgaben, sodass Sie sich darauf konzentrieren können, Ihre benutzerdefinierte Logik für die Datensatzverarbeitung zu schreiben.
Das KCL unterscheidet sich von den Kinesis Data StreamsAPIs, die in der AWS SDKs verfügbar sind. Die Kinesis Data Streams APIs helfen Ihnen bei der Verwaltung vieler Aspekte von Kinesis Data Streams, darunter das Erstellen von Streams, das Resharding sowie das Einfügen und Abrufen von Datensätzen. Das KCL bietet eine Abstraktionsebene für all diese Unteraufgaben, sodass Sie sich auf die benutzerdefinierte Datenverarbeitungslogik Ihrer Verbraucheranwendung konzentrieren können. Informationen zu den Kinesis Data Streams API finden Sie in der Amazon Kinesis Kinesis-Referenz API.
Wichtig
Das KCL ist eine Java-Bibliothek. Die Support für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle namens bereitgestellt. MultiLangDaemon Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL Sprache als Java verwenden. Wenn Sie beispielsweise das KCL für Python installieren und Ihre Verbraucheranwendung vollständig in Python schreiben, müssen Sie trotzdem Java auf Ihrem System installieren, da MultiLangDaemon Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. an die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon GitHub unter KCL MultiLangDaemon Projekt
Der KCL fungiert als Vermittler zwischen Ihrer Datensatzverarbeitungslogik und Kinesis Data Streams.
Frühere KCL-Versionen
Derzeit können Sie eine der folgenden unterstützten Versionen von verwenden, um Ihre benutzerdefinierten Anwendungen für Privatanwender KCL zu erstellen:
-
KCL1.x
Weitere Informationen finden Sie unter Entwickeln Sie KCL 1.x-Verbraucher
-
KCL2.x
Weitere Informationen finden Sie unter Entwickeln Sie KCL 2.x-Verbraucher
Sie können entweder KCL 1.x oder KCL 2.x verwenden, um Verbraucheranwendungen zu erstellen, die einen gemeinsamen Durchsatz verwenden. Weitere Informationen finden Sie unter Entwickeln Sie benutzerdefinierte Verbraucher mit gemeinsamem Durchsatz mithilfe von KCL.
Um Verbraucheranwendungen zu erstellen, die einen dedizierten Durchsatz verwenden (erweiterte Fan-Out-Verbraucher), können Sie nur 2.x verwenden. KCL Weitere Informationen finden Sie unter Entwickeln Sie verbesserte Fan-Out-Verbraucher mit dediziertem Durchsatz.
Informationen zu den Unterschieden zwischen KCL 1.x und KCL 2.x sowie Anweisungen zur Migration von KCL 1.x zu 2.x finden Sie unter. KCL Migrieren Sie Verbraucher von KCL 1.x auf 2.x KCL
KCLKonzepte (frühere Versionen)
-
KCLAnwendung für Privatanwender — eine Anwendung, die speziell für das Lesen KCL und Verarbeiten von Datensätzen aus Datenströmen entwickelt wurde.
-
Anwendungsinstanz für KCL Privatanwender — Anwendungen für Privatanwender sind in der Regel verteilt, wobei eine oder mehrere Anwendungsinstanzen gleichzeitig ausgeführt werden, um Fehler zu koordinieren und einen dynamischen Lastenausgleich bei der Verarbeitung von Datensätzen vorzunehmen.
-
Worker — eine übergeordnete Klasse, die eine Anwendungsinstanz für KCL Privatanwender verwendet, um mit der Datenverarbeitung zu beginnen.
Wichtig
Jede Instanz einer KCL Verbraucheranwendung hat einen Worker.
Der Worker initialisiert und überwacht verschiedene Aufgaben, darunter das Synchronisieren von Shard- und Leasing-Informationen, das Verfolgen von Shard-Zuweisungen und das Verarbeiten von Daten aus den Shards. Ein Worker stellt KCL die Konfigurationsinformationen für die Verbraucheranwendung bereit, z. B. den Namen des Datenstroms, dessen Datensätze diese KCL Verbraucheranwendung verarbeiten wird, und die AWS Anmeldeinformationen, die für den Zugriff auf diesen Datenstrom erforderlich sind. Der Worker startet außerdem die spezifische Instanz der KCL Verbraucheranwendung, um Datensätze aus dem Datenstrom an die Datensatzprozessoren weiterzuleiten.
Wichtig
In KCL 1.x heißt diese Klasse Worker. Weitere Informationen (dies sind die KCL Java-Repositorys) finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker
.java. In KCL 2.x heißt diese Klasse Scheduler. Der Zweck von Scheduler in KCL 2.x ist identisch mit dem Zweck von Worker in 1.x. KCL Weitere Informationen zur Scheduler-Klasse in KCL 2.x finden Sie unter/.java. https://github.com/awslabs/ amazon-kinesis-client blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler -
Lease – Daten, die die Bindung zwischen einem Worker und einem Shard definieren. Verteilte KCL Verbraucheranwendungen verwenden Leases, um die Verarbeitung von Datensätzen auf eine ganze Flotte von Mitarbeitern zu verteilen. Zu einem bestimmten Zeitpunkt ist jede Datenmenge durch einen durch die Variable identifizierten Leasingvertrag an einen bestimmten Mitarbeiter gebunden. leaseKey
Standardmäßig kann ein Arbeitnehmer einen oder mehrere Mietverträge (abhängig vom Wert der Variablen maxLeasesForWorker) gleichzeitig abschließen.
Wichtig
Jeder Worker versucht, alle verfügbaren Leases für alle verfügbaren Shards in einem Datenstrom zu halten. Aber es kann nur jeweils ein Worker jeden Lease zu einem bestimmten Zeitpunkt erfolgreich halten.
Wenn Sie beispielsweise eine Konsumentenanwendungs-Instance A mit Worker A haben, die einen Datenstrom mit 4 Shards verarbeitet, kann Worker A Leases für die Shards 1, 2, 3 und 4 gleichzeitig halten. Wenn Sie jedoch zwei Konsumentenanwendungs-Instances haben: A und B mit Worker A und Worker B und diese Instances einen Datenstrom mit 4 Shards verarbeiten, können Worker A und Worker B nicht gleichzeitig den Lease für Shard 1 halten. Ein Worker hält den Lease für einen bestimmten Shard, bis er bereit ist, die Verarbeitung der Datensätze dieses Shards zu beenden, oder bis er ausfällt. Wenn ein Worker den Lease beendet, nimmt ein anderer Worker den Lease auf und hält ihn.
Weitere Informationen (dies sind die KCL Java-Repositorys) finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java für KCL 1.x und https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease
.java für 2.x. KCL -
Leasing-Tabelle — eine einzigartige Amazon DynamoDB-Tabelle, die verwendet wird, um die Shards in einem KDS Datenstrom zu verfolgen, die von den Mitarbeitern der Verbraucheranwendung geleast und verarbeitet werden. KCL Die Leasing-Tabelle muss (innerhalb eines Workers und für alle Mitarbeiter) mit den neuesten Shard-Informationen aus dem Datenstrom synchron bleiben, während die Consumer-Anwendung ausgeführt wirdKCL. Weitere Informationen finden Sie unter Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der Consumer-Anwendung verarbeitet wurden KCL.
-
Record Processor — die Logik, die definiert, wie Ihre KCL Verbraucheranwendung die Daten verarbeitet, die sie aus den Datenströmen erhält. Zur Laufzeit instanziiert eine KCL Consumer-Anwendungsinstanz einen Worker, und dieser Worker instanziiert einen Datensatzprozessor für jeden Shard, für den er eine Lease hält.
Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der Consumer-Anwendung verarbeitet wurden KCL
Themen
Was ist eine Leasing-Tabelle
KCLVerwendet für jede Amazon Kinesis Data Streams Streams-Anwendung eine eindeutige Leasetabelle (gespeichert in einer Amazon DynamoDB-Tabelle), um den Überblick über die Shards in einem KDS Datenstream zu behalten, die von den Mitarbeitern der Verbraucheranwendung geleast und verarbeitet werden. KCL
Wichtig
KCLverwendet den Namen der Verbraucheranwendung, um den Namen der Leasetabelle zu erstellen, die diese Consumer-Anwendung verwendet. Daher muss jeder Consumer-Anwendungsname eindeutig sein.
Sie können die Leasetabelle mit der Amazon-DynamoDB-Konsole anzeigen, während die Konsumentenanwendung ausgeführt wird.
Wenn die Leasetabelle für Ihre KCL Consumer-Anwendung beim Start der Anwendung nicht vorhanden ist, erstellt einer der Worker die Leasingtabelle für diese Anwendung.
Wichtig
Ihr Konto wird neben den Kosten für Kinesis Data Streams mit den Kosten belastet, die für die DynamoDB-Tabelle anfallen.
Jede Zeile in der Leasetabelle stellt einen Shard dar, der von den Workern Ihrer Konsumentenanwendung verarbeitet wird. Wenn Ihre KCL Consumer-Anwendung nur einen Datenstrom verarbeitetleaseKey
, dann ist der Hash-Schlüssel für die Leasing-Tabelle die Shard-ID. Wenn jaVerarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung, dann leaseKey sieht die Struktur von so aus:account-id:StreamName:streamCreationTimestamp:ShardId
. Beispiel, 111111111:multiStreamTest-1:12345:shardId-000000000336
.
Neben der Shard-ID enthält jede Zeile noch folgende Daten:
-
checkpoint: Die letzte Prüfpunkt-Sequenznummer des Shards. Dieser Wert ist für alle Shards im Datenstrom eindeutig.
-
checkpointSubSequenceNummer: Wenn Sie die Aggregationsfunktion der Kinesis Producer Library verwenden, ist dies eine Erweiterung des Checkpoints, mit der einzelne Benutzerdatensätze innerhalb des Kinesis-Datensatzes verfolgt werden.
-
leaseCounter: Wird für die Versionierung von Leasing-Verträgen verwendet, sodass Mitarbeiter erkennen können, dass ihr Leasing von einem anderen Mitarbeiter angenommen wurde.
-
leaseKey: Eine eindeutige Kennung für einen Mietvertrag. Jeder Lease gilt für einen bestimmten Shard im Datenstrom und wird immer nur von einem Worker gehalten.
-
leaseOwner: Der Arbeitnehmer, der diesen Mietvertrag innehat.
-
ownerSwitchesSinceCheckpoint: Wie oft hat dieser Mietvertrag die Mitarbeiter gewechselt, seit ein Checkpoint das letzte Mal geschrieben wurde.
-
parentShardId: Wird verwendet, um sicherzustellen, dass der übergeordnete Shard vollständig verarbeitet ist, bevor die Verarbeitung der untergeordneten Shards beginnt. So wird sichergestellt, dass Datensätze in der gleichen Reihenfolge verarbeitet werden, in der sie in den Stream eingespeist wurden.
-
hashrange: Wird von
PeriodicShardSyncManager
verwendet, um regelmäßige Synchronisierungen durchzuführen, um fehlende Shards in der Leasetabelle zu finden und bei Bedarf Leases für sie zu erstellen.Anmerkung
Diese Daten sind in der Leasetabelle für jeden Shard enthalten, der mit KCL 1.14 und 2.3 beginnt. KCL Weitere Hinweise zu
PeriodicShardSyncManager
und die periodische Synchronisierung zwischen Leases und Shards finden Sie unter So wird eine Leasing-Tabelle mit den Shards in einem Kinesis-Datenstrom synchronisiert. -
childshards: Wird von
LeaseCleanupManager
verwendet, um den Verarbeitungsstatus des untergeordneten Shards zu überprüfen und zu entscheiden, ob der übergeordnete Shard aus der Leasetabelle gelöscht werden kann.Anmerkung
Diese Daten sind in der Leasing-Tabelle für jeden Shard enthalten, der mit KCL 1.14 und 2.3 beginnt. KCL
-
shardID: Die ID des Shards.
Anmerkung
Diese Daten sind nur dann in der Leasetabelle enthalten, wenn Sie Verarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung sind. Dies wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und höher.
-
Streamname Die Kennung des Datenstroms im folgenden Format:
account-id:StreamName:streamCreationTimestamp
.Anmerkung
Diese Daten sind nur dann in der Leasetabelle enthalten, wenn Sie Verarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung sind. Dies wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und höher.
Durchsatz
Wenn Ihre Amazon Kinesis Data Streams-Anwendung Ausnahmen für den bereitgestellten Durchsatz erhält, sollten Sie den bereitgestellten Durchsatz für die DynamoDB-Tabelle erhöhen. Das KCL erstellt die Tabelle mit einem bereitgestellten Durchsatz von 10 Lesevorgängen pro Sekunde und 10 Schreibvorgängen pro Sekunde, was für Ihre Anwendung jedoch möglicherweise nicht ausreichend ist. Beispiel: Wenn Ihre Amazon Kinesis Data Streams-Anwendung häufig Prüfpunkte setzt oder einen Stream verarbeitet, der aus vielen Shards besteht, müssen Sie den Durchsatz möglicherweise erhöhen.
Weitere Informationen zum bereitgestellten Durchsatz in DynamoDB finden Sie unter Lese-/Schreibkapazitätsmodus und Arbeiten mit Tabellen und Daten im Amazon-DynamoDB-Entwicklerhandbuch.
So wird eine Leasing-Tabelle mit den Shards in einem Kinesis-Datenstrom synchronisiert
Mitarbeiter in KCL Verbraucheranwendungen verwenden Leases, um Shards aus einem bestimmten Datenstrom zu verarbeiten. Die Informationen darüber, welcher Worker zu einem bestimmten Zeitpunkt welchen Shard least, werden in einer Leasetabelle gespeichert. Die Leasetabelle muss mit den neuesten Shard-Informationen aus dem Datenstrom synchron bleiben, während die KCL Verbraucheranwendung ausgeführt wird. KCLsynchronisiert die Leasetabelle mit den Shard-Informationen, die vom Kinesis Data Streams Streams-Dienst während des Bootstrapings der Consumer-Anwendung (entweder wenn die Consumer-Anwendung initialisiert oder neu gestartet wird) und auch immer dann, wenn ein Shard, der gerade verarbeitet wird, ein Ende erreicht (Resharding), erhält. Mit anderen Worten, die Worker oder eine KCL Verbraucheranwendung werden mit dem Datenstrom synchronisiert, den sie beim ersten Bootstrap der Verbraucheranwendung und immer dann verarbeiten, wenn die Verbraucheranwendung auf ein Datenstrom-Reshard-Ereignis trifft.
Themen
Synchronisation in KCL 1.0-1.13 und 2.0-2.2 KCL
In KCL 1.0-1.13 und KCL 2.0-2.2 wird beim Bootstraping der Verbraucheranwendung und auch bei jedem Reshard-Ereignis für den Datenstrom die Leasetabelle mit den Shard-Informationen KCL synchronisiert, die vom Kinesis Data Streams Streams-Dienst abgerufen wurden, indem die oder die Erkennung aufgerufen wird. ListShards
DescribeStream
APIs In allen oben aufgeführten KCL Versionen führt jeder Worker einer KCL Consumer-Anwendung die folgenden Schritte aus, um den Lease-/Shard-Synchronisierungsprozess während des Bootstrappings der Consumer-Anwendung und bei jedem Stream-Reshard-Ereignis durchzuführen:
-
Ruft alle Shards für Daten des Streams ab, der gerade verarbeitet wird
-
Ruft alle Shard-Leases aus der Leasetabelle ab
-
Filtert jeden offenen Shard heraus, für den es in der Leasetabelle keinen Lease gibt
-
Iteriert über alle gefundenen offenen Shards und für jeden offenen Shard ohne offenes übergeordnetes Element:
-
Durchläuft den Hierarchiebaum über den Pfad der Vorgänger, um festzustellen, ob der Shard ein Nachfolger ist. Ein Shard wird als untergeordnetes Element betrachtet, wenn gerade ein Vorgänger-Shard verarbeitet wird (der Leaseeintrag für den Vorgänger-Shard ist in der Leasetabelle vorhanden) oder wenn ein Vorgänger-Shard verarbeitet werden soll (wenn die Anfangsposition beispielsweise
TRIM_HORIZON
oderAT_TIMESTAMP
ist) -
Wenn es sich bei dem offenen Shard im Kontext um ein abgeleitetes Objekt handelt, KCL überprüft es den Shard anhand seiner ursprünglichen Position und erstellt, falls erforderlich, Leasingverträge für seine übergeordneten Elemente
-
Synchronisation in KCL 2.x, beginnend mit 2.3 und höher KCL
Beginnend mit den neuesten unterstützten Versionen von KCL 2.x (KCL2.3) und höher unterstützt die Bibliothek jetzt die folgenden Änderungen am Synchronisationsprozess. Diese Änderungen an der Lease-/Shard-Synchronisierung reduzieren die Anzahl der API Aufrufe von KCL Verbraucheranwendungen an den Kinesis Data Streams Streams-Service erheblich und optimieren das Lease-Management in Ihrer Verbraucheranwendung. KCL
-
Wenn die Leasetabelle beim Bootstraping der Anwendung leer ist, KCL verwendet sie die Filteroption
ListShard
API von (denShardFilter
optionalen Anforderungsparameter), um Leases nur für einen Snapshot von Shards abzurufen und zu erstellen, die zu dem durch den Parameter angegebenen Zeitpunkt geöffnet sind.ShardFilter
Mit demShardFilter
Parameter können Sie die Antwort von herausfiltern.ListShards
API Die einzige erforderliche Eigenschaft desShardFilter
-Parameters istType
. KCLverwendet die EigenschaftType
filter und die folgenden gültigen Werte, um offene Shards, für die möglicherweise neue Leases erforderlich sind, zu identifizieren und eine Momentaufnahme zurückzugeben:-
AT_TRIM_HORIZON
– Die Antwort umfasst alle Shards, die amTRIM_HORIZON
geöffnet waren. -
AT_LATEST
– Die Antwort enthält nur die aktuell geöffneten Shards des Datenstroms. -
AT_TIMESTAMP
– Die Antwort umfasst alle Shards, deren Startzeitstempel kleiner oder gleich dem angegebenen Zeitstempel sind und deren Endzeitstempel größer oder gleich dem angegebenen Zeitstempel ist oder solche, die noch offen sind.
ShardFilter
wird verwendet, wenn Leases für eine leere Leasetabelle erstellt werden, um Leases für einen Snapshot von Shards zu initialisieren, die unterRetrievalConfig#initialPositionInStreamExtended
angegeben sind.Mehr über
ShardFilter
erfahren Sie unter https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
Anstatt dass alle Worker die lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard Synchronisation durchführen.
-
KCL2.3 verwendet den
ChildShards
Rückgabeparameter vonGetRecords
und,SubscribeToShard
APIs um die Leasing-/Shard-Synchronisierung durchzuführen, die beiSHARD_END
geschlossenen Shards stattfindet, sodass ein KCL Worker nur Leases für die untergeordneten Shards des Shards erstellen kann, dessen Verarbeitung abgeschlossen ist. Diese Optimierung der Leasing-/Shard-Synchronisierung verwendet bei der gemeinsamen Nutzung von Consumer-Anwendungen den Parameter von.ChildShards
GetRecords
API Für Verbraucheranwendungen mit dediziertem Durchsatz (erweiterter Fan-Out) verwendet diese Optimierung der Lease-/Shard-Synchronisierung den Parameter von.ChildShards
SubscribeToShard
API Weitere Informationen finden Sie unter GetRecords, und. SubscribeToShardsChildShard -
Mit den oben genannten Änderungen geht das Verhalten von von KCL weg von dem Modell, dass alle Mitarbeiter über alle vorhandenen Shards Bescheid wissen, hin zu dem Modell, dass Arbeitnehmer nur über die untergeordneten Scherben der Shards, die jeder Arbeiter besitzt, lernen. Daher werden neben der Synchronisation, die beim Bootstraping von Verbraucheranwendungen und bei Reshard-Ereignissen stattfindet, KCL jetzt auch zusätzliche regelmäßige Shard-/Lease-Scans durchgeführt, um potenzielle Lücken in der Leasetabelle zu identifizieren (mit anderen Worten, um mehr über alle neuen Shards zu erfahren), um sicherzustellen, dass der gesamte Hash-Bereich des Datenstroms verarbeitet wird, und um bei Bedarf Leases für sie zu erstellen.
PeriodicShardSyncManager
ist die Komponente, die für die regelmäßige Ausführung von Lease-/Shard-Scans verantwortlich ist.Weitere Informationen zu
PeriodicShardSyncManager
Version KCL 2.3 finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201 -L213. In KCL 2.3 stehen neue Konfigurationsoptionen zur Konfiguration zur Verfügung in:
PeriodicShardSyncManager
LeaseManagementConfig
Name Standardwert Beschreibung leasesRecoveryAuditorExecutionFrequencyMillis 120 000 (2 Minuten)
Häufigkeit (in Millionen), mit der der Auditor in der Leasetabelle nach teilweisen Leases sucht. Wenn der Auditor eine Lücke in den Leases für einen Stream feststellt, würde er die Shard-Synchronisierung auf der Grundlage von
leasesRecoveryAuditorInconsistencyConfidenceThreshold
auslösen.leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
Konfidenzschwellenwert für die regelmäßige Auditortätigkeit, um festzustellen, ob die Leases für einen Datenstrom in der Leasetabelle inkonsistent sind. Wenn der Auditor für diesen Datenstrom mehrmals hintereinander dieselben Inkonsistenzen feststellt, würde er eine Shard-Synchronisierung auslösen.
Außerdem werden jetzt neue CloudWatch Messwerte ausgegeben, um den Zustand von zu überwachen
PeriodicShardSyncManager
. Weitere Informationen finden Sie unter PeriodicShardSyncManager. -
Einschließlich einer Optimierung auf
HierarchicalShardSyncer
, um nur Leases für eine Shard-Ebene zu erstellen.
Synchronisation in KCL 1.x, beginnend mit KCL 1.14 und höher
Beginnend mit den neuesten unterstützten Versionen von KCL 1.x (KCL1.14) und höher unterstützt die Bibliothek jetzt die folgenden Änderungen am Synchronisationsprozess. Diese Änderungen an der Lease-/Shard-Synchronisierung reduzieren die Anzahl der API Aufrufe von KCL Verbraucheranwendungen an den Kinesis Data Streams Streams-Service erheblich und optimieren das Lease-Management in Ihrer Verbraucheranwendung. KCL
-
Wenn die Leasetabelle beim Bootstraping der Anwendung leer ist, KCL verwendet sie die Filteroption
ListShard
API von (denShardFilter
optionalen Anforderungsparameter), um Leases nur für einen Snapshot von Shards abzurufen und zu erstellen, die zu dem durch den Parameter angegebenen Zeitpunkt geöffnet sind.ShardFilter
Mit demShardFilter
Parameter können Sie die Antwort von herausfiltern.ListShards
API Die einzige erforderliche Eigenschaft desShardFilter
-Parameters istType
. KCLverwendet die EigenschaftType
filter und die folgenden gültigen Werte, um offene Shards, für die möglicherweise neue Leases erforderlich sind, zu identifizieren und eine Momentaufnahme zurückzugeben:-
AT_TRIM_HORIZON
– Die Antwort umfasst alle Shards, die amTRIM_HORIZON
geöffnet waren. -
AT_LATEST
– Die Antwort enthält nur die aktuell geöffneten Shards des Datenstroms. -
AT_TIMESTAMP
– Die Antwort umfasst alle Shards, deren Startzeitstempel kleiner oder gleich dem angegebenen Zeitstempel sind und deren Endzeitstempel größer oder gleich dem angegebenen Zeitstempel ist oder solche, die noch offen sind.
ShardFilter
wird verwendet, wenn Leases für eine leere Leasetabelle erstellt werden, um Leases für einen Snapshot von Shards zu initialisieren, die unterKinesisClientLibConfiguration#initialPositionInStreamExtended
angegeben sind.Mehr über
ShardFilter
erfahren Sie unter https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
Anstatt dass alle Worker die lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard Synchronisation durchführen.
-
KCL1.14 verwendet den
ChildShards
Rückgabeparameter vonGetRecords
und,SubscribeToShard
APIs um die Leasing-/Shard-Synchronisierung durchzuführen, die bei geschlossenen Shards stattfindet, sodass ein KCL Worker nur LeasesSHARD_END
für die untergeordneten Shards des Shards erstellen kann, dessen Verarbeitung abgeschlossen ist. GetRecordsWeitere ChildShardInformationen finden Sie unter und. -
Mit den oben genannten Änderungen KCL geht das Verhalten von weg von dem Modell, dass alle Arbeitnehmer über alle vorhandenen Scherben Bescheid wissen, hin zu dem Modell, dass die Arbeitnehmer nur über die untergeordneten Scherben der Scherben, die jeder Arbeiter besitzt, lernen. Daher werden neben der Synchronisation, die beim Bootstraping von Verbraucheranwendungen und bei Reshard-Ereignissen stattfindet, KCL jetzt auch zusätzliche regelmäßige Shard-/Lease-Scans durchgeführt, um potenzielle Lücken in der Leasetabelle zu identifizieren (mit anderen Worten, um mehr über alle neuen Shards zu erfahren), um sicherzustellen, dass der gesamte Hash-Bereich des Datenstroms verarbeitet wird, und um bei Bedarf Leases für sie zu erstellen.
PeriodicShardSyncManager
ist die Komponente, die für die regelmäßige Ausführung von Lease-/Shard-Scans verantwortlich ist.Wenn
KinesisClientLibConfiguration#shardSyncStrategyType
aufShardSyncStrategyType.SHARD_END
gesetzt ist, wirdPeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold
verwendet, um den Schwellenwert für die Anzahl der aufeinanderfolgenden Scans mit Lücken in der Leasetabelle zu bestimmen, nach dem eine Shard-Synchronisierung erzwungen werden soll. WennKinesisClientLibConfiguration#shardSyncStrategyType
aufShardSyncStrategyType.PERIODIC
gesetzt ist, wirdleasesRecoveryAuditorInconsistencyConfidenceThreshold
ignoriert.Weitere Informationen zu
PeriodicShardSyncManager
Version KCL 1.14 finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java #L987 -L999. In KCL 1.14 steht eine neue Konfigurationsoption zur Konfiguration zur Verfügung in:
PeriodicShardSyncManager
LeaseManagementConfig
Name Standardwert Beschreibung leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
Konfidenzschwellenwert für die regelmäßige Auditortätigkeit, um festzustellen, ob die Leases für einen Datenstrom in der Leasetabelle inkonsistent sind. Wenn der Auditor für diesen Datenstrom mehrmals hintereinander dieselben Inkonsistenzen feststellt, würde er eine Shard-Synchronisierung auslösen.
Neue CloudWatch Metriken werden jetzt auch ausgegeben, um den Zustand von zu überwachen.
PeriodicShardSyncManager
Weitere Informationen finden Sie unter PeriodicShardSyncManager. -
KCL1.14 unterstützt jetzt auch die verzögerte Leasingbereinigung. Leases werden asynchron von
LeaseCleanupManager
bei Erreichen vonSHARD_END
gelöscht, wenn ein Shard entweder die Aufbewahrungsfrist des Datenstroms überschritten hat oder wenn er aufgrund eines Resharding-Vorgangs geschlossen wurde.Es stehen neue Konfigurationsoptionen zur Verfügung, um
LeaseCleanupManager
zu konfigurieren.Name Standardwert Beschreibung leaseCleanupIntervalMillis 1 Minute
Intervall, in dem der Lease-Cleanup-Thread ausgeführt werden soll.
completedLeaseCleanupIntervalMillis 5 Minuten Intervall, in dem überprüft werden soll, ob ein Lease abgeschlossen ist oder nicht.
garbageLeaseCleanupIntervalMillis 30 Minuten Intervall, in dem geprüft werden soll, ob ein Lease Datenmüll ist (d. h. über die Aufbewahrungsfrist des Datenstroms hinaus abgeschnitten wurde) oder nicht.
-
Einschließlich einer Optimierung auf
KinesisShardSyncer
, um nur Leases für eine Shard-Ebene zu erstellen.
Verarbeiten Sie mehrere Datenströme mit derselben KCL 2.x für Java-Consumer-Anwendung
In diesem Abschnitt werden die folgenden Änderungen in KCL 2.x für Java beschrieben, mit denen Sie KCL Verbraucheranwendungen erstellen können, die mehr als einen Datenstrom gleichzeitig verarbeiten können.
Wichtig
Die Multistream-Verarbeitung wird nur in KCL 2.x für Java unterstützt, beginnend mit KCL 2.3 für Java und höher.
Die Multistream-Verarbeitung wird für alle anderen Sprachen NOT unterstützt, in denen KCL 2.x implementiert werden kann.
Die Multistream-Verarbeitung wird in allen Versionen von 1.x NOT unterstützt. KCL
-
MultistreamTracker Schnittstelle
Um eine Verbraucheranwendung zu erstellen, die mehrere Streams gleichzeitig verarbeiten kann, müssen Sie eine neue Schnittstelle namens implementieren MultistreamTracker
. Diese Schnittstelle enthält die streamConfigList
Methode, die die Liste der Datenströme und ihrer Konfigurationen zurückgibt, die von der KCL Verbraucheranwendung verarbeitet werden sollen. Beachten Sie, dass die Datenströme, die verarbeitet werden, während der Laufzeit der Verbraucheranwendung geändert werden können.streamConfigList
wird regelmäßig von der aufgerufenKCL, um mehr über die Änderungen der zu verarbeitenden Datenströme zu erfahren.Die
streamConfigList
Methode füllt die StreamConfigListe auf. package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
Beachten Sie, dass es sich bei den Feldern
StreamIdentifier
undInitialPositionInStreamExtended
um Pflichtfelder handelt, währendconsumerArn
optional ist. Sie müssen dasconsumerArn
nur angeben, wenn Sie KCL 2.x verwenden, um eine erweiterte Fan-Out-Consumer-Anwendung zu implementieren.Weitere Informationen
StreamIdentifier
dazu finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129.Um eine zu erstellen StreamIdentifier
, empfehlen wir, dass Sie eine Multistream-Instanz aus derstreamArn
und der erstellenstreamCreationEpoch
, die in Version 2.5.0 und höher verfügbar ist. Erstellen Sie in KCL Version 2.3 und Version 2.4, die dies nicht unterstützenstreamArm
, eine Multistream-Instance mithilfe des folgenden Formats.account-id:StreamName:streamCreationTimestamp
Dieses Format ist veraltet und wird ab der nächsten Hauptversion nicht mehr unterstützt.MultistreamTracker
beinhaltet auch eine Strategie zum Löschen von Leases alter Streams in der Leasetabelle (formerStreamsLeasesDeletionStrategy
). Beachten Sie, dass die Strategie CANNOT während der Laufzeit der Verbraucheranwendung geändert wird. Weitere Informationen finden Sie unter https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client.java src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy -
ConfigsBuilder
ist eine anwendungsweite Klasse, mit der Sie alle 2.x-Konfigurationseinstellungen angeben können, die beim Erstellen Ihrer Verbraucheranwendung verwendet werden sollen. KCL KCL ConfigsBuilder
Die Klasse unterstützt jetzt die Schnittstelle.MultistreamTracker
Sie können ConfigsBuilder entweder mit dem Namen des einen Datenstroms initialisieren, aus dem Datensätze abgerufen werden sollen:/** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
Oder Sie können ConfigsBuilder mit initialisieren,
MultiStreamTracker
wenn Sie eine KCL Verbraucheranwendung implementieren möchten, die mehrere Streams gleichzeitig verarbeitet.* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Da die Multistream-Unterstützung für Ihre KCL Consumer-Anwendung implementiert ist, enthält jede Zeile der Leasetabelle der Anwendung jetzt die Shard-ID und den Stream-Namen der mehreren Datenströme, die diese Anwendung verarbeitet.
-
Wenn die Multistream-Unterstützung für Ihre KCL Verbraucheranwendung implementiert ist, hat sie die leaseKey folgende Struktur:.
account-id:StreamName:streamCreationTimestamp:ShardId
Beispiel,111111111:multiStreamTest-1:12345:shardId-000000000336
.Wichtig
Wenn Ihre bestehende KCL Verbraucheranwendung so konfiguriert ist, dass sie nur einen Datenstrom verarbeitet, ist der leaseKey (der Hash-Schlüssel für die Leasing-Tabelle) die Shard-ID. Wenn Sie diese bestehende KCL Verbraucheranwendung so umkonfigurieren, dass sie mehrere Datenströme verarbeitet, wird Ihre Leasing-Tabelle beschädigt, da bei Multistream-Unterstützung die leaseKey Struktur wie folgt aussehen muss:.
account-id:StreamName:StreamCreationTimestamp:ShardId
Verwenden Sie das KCL mit der Schemaregistrierung AWS Glue
Sie können Ihre Kinesis-Datenströme in die AWS Glue Schema Registry integrieren. Mit der AWS Glue Schema Registry können Sie Schemas zentral erkennen, steuern und weiterentwickeln und gleichzeitig sicherstellen, dass die erstellten Daten kontinuierlich anhand eines registrierten Schemas validiert werden. Ein Schema definiert die Struktur und das Format eines Datensatzes. Ein Schema ist eine versionierte Spezifikation für zuverlässige Datenveröffentlichung, -nutzung oder -speicherung. Mit der AWS Glue Schema Registry können Sie die end-to-end Datenqualität und die Datenverwaltung in Ihren Streaming-Anwendungen verbessern. Weitere Informationen finden Sie unter AWS Glue Schema Registry. Eine Möglichkeit, diese Integration einzurichten, ist die KCL in Java.
Wichtig
Derzeit wird die Integration von Kinesis Data Streams und AWS Glue Schema Registry nur für Kinesis-Datenstreams unterstützt, die in Java implementierte KCL 2.3-Consumer verwenden. Mehrsprachige Unterstützung wird nicht bereitgestellt. KCL1.0-Verbraucher werden nicht unterstützt. KCL2.x-Verbraucher vor KCL 2.3 werden nicht unterstützt.
Detaillierte Anweisungen zum Einrichten der Integration von Kinesis Data Streams mit Schema Registry mithilfe von finden Sie im Abschnitt „Interaktion mit Daten mithilfe der KPL KCL /-Bibliotheken“ unter Anwendungsfall: Integration von Amazon Kinesis Data Streams mit der AWS Glue Schema Registry. KCL