Entwickeln Sie einen Kinesis Client Library-Consumer in Node.js - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Entwickeln Sie einen Kinesis Client Library-Consumer in Node.js

Sie können die Kinesis Client Library (KCL) verwenden, um Anwendungen zu erstellen, die Daten aus Ihren Kinesis-Datenströmen verarbeiten. Die Kinesis Client Library ist in mehreren Sprachen verfügbar. In diesem Thema wird Node.js behandelt.

Die KCL ist eine Java-Bibliothek. Unterstützung für andere Sprachen als Java wird über eine mehrsprachige Schnittstelle bereitgestellt, die als. MultiLangDaemon Dieser Daemon basiert auf Java und wird im Hintergrund ausgeführt, wenn Sie eine andere KCL Sprache als Java verwenden. Wenn Sie also KCL for Node.js installieren und Ihre Consumer-App vollständig in Node.js schreiben, muss Java aufgrund von trotzdem auf Ihrem System installiert sein. MultiLangDaemon Darüber hinaus MultiLangDaemon verfügt es über einige Standardeinstellungen, die Sie möglicherweise an Ihren Anwendungsfall anpassen müssen, z. B. an die AWS Region, mit der eine Verbindung hergestellt wird. Weitere Informationen dazu finden Sie MultiLangDaemon auf GitHub der KCL MultiLangDaemon Projektseite.

Gehen Sie zur Kinesis Client Library (Node.js) GitHub, um die Datei Node.js KCL von herunterzuladen.

Downloads von Beispiel-Code

In Node.js sind zwei Codebeispiele verfügbar: KCL

  • basic-sample

    Wird in den folgenden Abschnitten verwendet, um die Grundlagen der Erstellung einer KCL Verbraucheranwendung in Node.js zu veranschaulichen.

  • click-stream-sample

    Etwas komplexere und verwendet ein reales Szenario, nachdem Sie sich mit dem grundlegenden Beispiel-Code vertraut gemacht haben. Dieses Beispiel wird hier nicht behandelt, enthält aber eine README Datei mit weiteren Informationen.

Bei der Implementierung einer KCL Verbraucheranwendung in Node.js müssen Sie die folgenden Aufgaben ausführen:

Implementieren Sie den Record Processor

Der einfachste Benutzer, der KCL for Node.js verwendet, muss eine recordProcessor Funktion implementieren, die wiederum die Funktionen initializeprocessRecords, und enthältshutdown. Das Beispiel zeigt eine Implementierung, die Sie als Ausgangspunkt verwenden können (siehe sample_kcl_app.js).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
initialize

Der KCL ruft die initialize Funktion auf, wenn der Recordprozessor gestartet wird. Dieser Datensatzverarbeiter verarbeitet nur die Shard-ID, die als initializeInput.shardId übergeben wird. In der Regel ist dies auch umgekehrt der Fall (diese Shard wird nur durch diesen Datensatverarbeiter verarbeitet). Ihr Konsument sollte jedoch die Möglichkeit berücksichtigen, dass ein Datensatz mehr als einmal verarbeitet werden könnte. Das liegt daran, dass Kinesis Data Streams eine Semantik nach dem Grundsatz mindestens einmal hat. Das bedeutet, dass jeder Datensatz aus einer Shard mindestens einmal von einem Worker in Ihrem Konsumenten verarbeitet wird. Weitere Informationen zu Fällen, in denen eine bestimmte Shard möglicherweise durch mehr als einen Auftragnehmer verarbeitet wird, finden Sie unter Verwenden Sie Resharding, Skalierung und Parallelverarbeitung, um die Anzahl der Shards zu ändern.

initialize: function(initializeInput, completeCallback)
processRecords

Der KCL ruft diese Funktion mit einer Eingabe auf, die eine Liste von Datensätzen aus dem für die initialize Funktion angegebenen Shard enthält. Der von Ihnen implementierte Datensatzverarbeiter verarbeitet die Daten in diesen Datensätzen entsprechend der Semantik Ihres Konsumenten. Beispielsweise kann der Auftragnehmer eine Transformation für die Daten ausführen und das Ergebnis dann in einem Amazon Simple Storage Service (Amazon S3)-Bucket speichern.

processRecords: function(processRecordsInput, completeCallback)

Zusätzlich zu den Daten enthält der Datensatz auch eine Sequenznummer und einen Partitionsschlüssel, die der Auftragnehmer beim Verarbeiten der Daten verwenden kann. Beispielsweise könnte der Auftragnehmer basierend auf dem Wert des Partitionsschlüssels den S3-Bucket wählen, in dem die Daten gespeichert werden sollen. Das record-Anmeldeverzeichnis stellt die folgenden Schlüssel-Wert-Paare für den Zugriff auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel bereit:

record.data record.sequenceNumber record.partitionKey

Beachten Sie, dass die Daten Base64-kodiert sind.

Im einfachen Beispiel weist die Funktion processRecords Code auf, der zeigt, wie ein Auftragnehmer auf die Daten des Datensatzes, die Sequenznummer und den Partitionsschlüssel zugreifen kann.

Kinesis Data Streams erfordert, dass der Datensatzverarbeiter die Datensätze nachverfolgt, die bereits in einer Shard verarbeitet wurden. Der KCL kümmert sich um dieses Tracking für ein checkpointer Objekt, das als processRecordsInput.checkpointer übergeben wird. Ihr Datensatzprozessor ruft die checkpointer.checkpoint Funktion auf, um darüber zu informieren, KCL wie weit er bei der Verarbeitung der Datensätze im Shard fortgeschritten ist. Falls der Worker ausfällt, KCL verwendet er diese Informationen, wenn Sie die Verarbeitung des Shards erneut starten, sodass die Verarbeitung mit dem letzten bekannten verarbeiteten Datensatz fortgesetzt wird.

Bei einem Split- oder Merge-Vorgang beginnt der erst KCL mit der Verarbeitung der neuen Shards, wenn die Prozessoren für die ursprünglichen Shards aufgerufen haben, checkpoint um zu signalisieren, dass die gesamte Verarbeitung der ursprünglichen Shards abgeschlossen ist.

Wenn Sie die Sequenznummer nicht an die checkpoint Funktion übergeben, KCL wird davon ausgegangen, dass der Aufruf von checkpoint bedeutet, dass alle Datensätze verarbeitet wurden, bis zu dem letzten Datensatz, der an den Datensatzprozessor übergeben wurde. Daher sollte der Datensatzverarbeiter die Methode checkpoint erst aufrufen, wenn er alle Datensätze in der Liste, die ihm übergeben wurden, verarbeitet hat. Datensatzverarbeiter müssen checkpoint nicht bei jedem Aufruf von processRecords aufrufen. Ein Verarbeiter könnte beispielsweise checkpoint bei jedem dritten Aufruf oder beim Eintritt eines Ereignisses außerhalb Ihres Datensatzverarbeiters aufrufen, beispielsweise eines von Ihnen implementierten Verifizierung-/Validierungs-Service.

Sie können optional die exakte Sequenznummer eines Datensatzes als Parameter für checkpoint angeben. In diesem Fall KCL wird davon ausgegangen, dass alle Datensätze nur bis zu diesem Datensatz verarbeitet wurden.

Die einfache Beispielanwendung zeigt den einfachsten möglichen Aufruf der Funktion checkpointer.checkpoint. Sie können weitere Checkpoint-Logik hinzufügen, die Sie an diesem Punkt in der Funktion für Ihren Konsumenten benötigen.

shutdown

Der KCL ruft die shutdown Funktion entweder auf, wenn die Verarbeitung beendet shutdownInput.reason ist TERMINATE oder wenn der Worker nicht mehr reagiert (shutdownInput.reasonistZOMBIE).

shutdown: function(shutdownInput, completeCallback)

Die Verarbeitung endet, wenn der Datensatzverarbeiter keine weiteren Datensätze aus der Shard erhält, entweder weil die Shard geteilt oder zusammengeführt wurde oder weil der Stream gelöscht wurde.

Der übergibt KCL auch ein shutdownInput.checkpointer Objekt anshutdown. Wenn der Grund für das Herunterfahren TERMINATE ist, sollten Sie sicherstellen, dass der Datensatzverarbeiter die Verarbeitung aller Datensätze fertiggestellt hat, und dann die Funktion checkpoint in seiner Schnittstelle aufrufen.

Ändern Sie die Konfigurationseigenschaften

Das Beispiel zeigt Standardwerte für die Konfigurationseigenschaften. Sie können diese Eigenschaften mit eigenen Werten überschreiben (siehe sample.properties im einfachen Beispiel).

Anwendungsname

Das KCL erfordert eine Anwendung, die für Ihre Anwendungen und für Amazon DynamoDB-Tabellen in derselben Region einzigartig ist. Sie verwendet den Wert der Anwendungsnamenkonfiguration auf folgende Arten:

  • Für mit diesem Anwendungsnamen verknüpfte Auftragnehmer wird angenommen, dass sie gemeinsam im gleichen Stream arbeiten. Diese Auftragnehmer können auf mehrere Instances verteilt sein. Wenn Sie eine zusätzliche Instance desselben Anwendungscodes ausführen, jedoch mit einem anderen Anwendungsnamen, KCL behandelt das die zweite Instance als eine völlig separate Anwendung, die ebenfalls auf demselben Stream ausgeführt wird.

  • Das KCL erstellt eine DynamoDB-Tabelle mit dem Anwendungsnamen und verwendet die Tabelle, um Statusinformationen (wie Checkpoints und Worker-Shard-Mapping) für die Anwendung zu verwalten. Jede Anwendung verfügt über eine eigene DynamoDB-Tabelle. Weitere Informationen finden Sie unter Verwenden Sie eine Leasetabelle, um nachzuverfolgen, welche Shards von der Consumer-Anwendung verarbeitet wurden KCL.

Richten Sie Anmeldeinformationen ein

Sie müssen Ihre AWS Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Kette der Standardanmeldedienstanbieter zur Verfügung stellen. Sie können die Eigenschaft AWSCredentialsProvider verwenden, um einen Anmeldeinformationsanbieter einzurichten. Die sample.properties-Datei muss Anmeldeinformationen einem der Anmeldeinformationsanbieter in der Anmeldeinformationsanbieter-Standardkette bereitstellen. Wenn Sie Ihren Consumer auf einer EC2 Amazon-Instance ausführen, empfehlen wir Ihnen, die Instance mit einer IAM Rolle zu konfigurieren. AWS Anmeldeinformationen, die die mit dieser IAM Rolle verknüpften Berechtigungen widerspiegeln, werden Anwendungen auf der Instance über ihre Instance-Metadaten zur Verfügung gestellt. Dies ist die sicherste Methode zur Verwaltung von Anmeldeinformationen für eine Verbraucheranwendung, die auf einer EC2 Instance ausgeführt wird.

Im folgenden Beispiel wird die Verarbeitung eines Kinesis-Datenstroms konfiguriertKCL, der kclnodejssample mithilfe des in bereitgestellten Datensatzprozessors benannt wird: sample_kcl_app.js

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON