Verarbeiten von Amazon Kinesis Data Streams-Datensätzen mit Lambda - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verarbeiten von Amazon Kinesis Data Streams-Datensätzen mit Lambda

Um Amazon Kinesis Data Streams-Datensätze mit Lambda zu verarbeiten, erstellen Sie einen Verbraucher für Ihren Stream und erstellen Sie dann eine Lambda-Zuordnung von Ereignisquellen.

Konfigurieren Ihres Daten-Streams und Ihrer Funktion

Ihre Lambda-Funktion ist eine Konsumentenanwendung für Ihren Daten-Stream. Sie verarbeitet jeweils einen Batch Datensätzen aus jedem Shard. Sie können eine Lambda-Funktion zu einem Konsumenten mit gemeinsam genutztem Durchsatz (Standard-Iterator) oder zu einem Konsumenten mit dediziertem Durchsatz mit erweitertem Rundsenden zuweisen.

  • Standard-Iterator: Lambda fragt jeden Shard in Ihrem Kinesis-Stream mit einer Basisrate von einmal pro Sekunde nach Datensätzen ab. Wenn mehr Datensätze verfügbar sind, verarbeitet Lambda Batches, bis die Funktion mit dem Stream gleichzieht. Die Ereignisquellenzuordnung teilt den Lesedurchsatz mit anderen Konsumenten des Shards zusammen.

  • Erweitertes Rundsenden: Um die Latenz zu minimieren und den Lesedurchsatz zu maximieren, erstellen Sie einen Daten-Stream-Konsumenten mit erweitertem Rundsenden. Stream-Konsumenten mit erweitertem Rundsenden erhalten eine dedizierte Verbindung für jeden Shard, der keine Auswirkungen auf andere Anwendungen hat, die aus dem Stream lesen. Stream-Konsumenten verwenden HTTP/2, um die Latenz zu reduzieren, indem Datensätze über eine langlebige Verbindung an Lambda übertragen und Anforderungs-Header komprimiert werden. Sie können einen Stream-Konsumenten mit der Kinesis-RegisterStreamConsumer-API erstellen.

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Die Ausgabe sollte folgendermaßen aussehen:

{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}

Um die Geschwindigkeit zu erhöhen, mit der Ihre Funktion Datensätze verarbeitet, fügen Sie Ihrem Datenstrom Shards hinzu. Lambda verarbeitet Datensätze in jedem Shard in der Reihenfolge. Es beendet die Verarbeitung zusätzlicher Datensätze in einem Shard, wenn Ihre Funktion einen Fehler zurückgibt. Mehr Shards bedeutet, dass mehr Stapel verarbeitet und gleichzeitig die Auswirkungen von Fehlern auf die Nebenläufigkeit verringert werden.

Wenn Ihre Funktion nicht hochskalieren kann, um alle gleichzeitigen Stapel zu verarbeiten, fordern Sie eine Kontingenterhöhung an oder reservieren Sie Gleichzeitigkeit für Ihre Funktion.

Erstellen einer Zuordnung von Ereignisquellen, um eine Lambda-Funktion aufzurufen

Um Ihre Lambda-Funktion mit Datensätzen aus Ihrem Datenstrom aufzurufen, erstellen Sie eine Zuordnung von Ereignisquellen. Sie können mehrere Ereignisquellenzuordnungen erstellen, um gleiche Daten mit mehreren Lambda-Funktionen oder Elemente aus mehreren Daten-Streams mit nur einer Funktion zu verarbeiten. Bei der Verarbeitung von Elementen aus mehreren Datenströmen enthält jeder Batch Datensätze aus nur einem einzigen Shard oder Stream.

Sie können Zuordnungen von Ereignisquellen konfigurieren, um Datensätze aus einem Stream in einem anderen AWS-Konto zu verarbeiten. Weitere Informationen hierzu finden Sie unter Erstellen einer kontoübergreifenden Zuordnung von Ereignisquellen.

Bevor Sie ein Zuordnung von Ereignisquellen erstellen, müssen Sie Ihrer Lambda-Funktion die Berechtigung zum Lesen aus einem Kinesis-Datenstrom erteilen. Lambda benötigt die folgenden Berechtigungen zum Verwalten von Ressourcen, die zu Ihrem Kinesis-Datenstrom gehören:

Die AWS-verwaltete Richtlinie AWSLambdaKinesisExecutionRole enthält diese Berechtigungen. Fügen Sie diese verwaltete Richtlinie zu Ihrer Funktion hinzu, wie im folgenden Verfahren beschrieben.

AWS Management Console
So fügen Sie Kinesis-Berechtigungen zu Ihrer Funktion hinzu
  1. Öffnen Sie die Funktionsseite der Lambda-Konsole und wählen Sie Ihre Funktion aus.

  2. Klicken Sie in der Registerkarte Konfiguration auf die Option Berechtigungen.

  3. Wählen Sie im Bereich Ausführungsrolle unter Rollenname den Link zur Ausführungsrolle Ihrer Funktion aus. Dieser Link öffnet die Seite für diese Rolle in der IAM-Konsole.

  4. Unter Berechtigungsrichtlinien im Abschnitt Berechtigungen hinzufügen wählen Sie dann Richtlinien anfügen aus.

  5. Geben Sie im Suchfeld AWSLambdaKinesisExecutionRole ein.

  6. Aktivieren Sie das Kontrollkästchen neben der Richtlinie und wählen Sie Berechtigung hinzufügen.

AWS CLI
So fügen Sie Kinesis-Berechtigungen zu Ihrer Funktion hinzu
  • Führen Sie den folgenden CLI-Befehl aus, um die AWSLambdaKinesisExecutionRole-Richtlinie zur Ausführungsrolle Ihrer Funktion hinzuzufügen:

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
So fügen Sie Kinesis-Berechtigungen zu Ihrer Funktion hinzu
  • Fügen Sie die Policies-Eigenschaft wie im folgenden Beispiel gezeigt in der Definition Ihrer Funktion hinzu:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

Nachdem Sie die erforderlichen Berechtigungen konfiguriert haben, erstellen Sie die Zuordnung von Ereignisquellen.

AWS Management Console
Die Kinesis-Zuordnung von Ereignisquellen erstellen
  1. Öffnen Sie die Funktionsseite der Lambda-Konsole und wählen Sie Ihre Funktion aus.

  2. Wählen Sie im Bereich Function overview (Funktionsübersicht) die Option Add trigger (Auslöser hinzufügen).

  3. Wählen Sie unter Trigger-Konfiguration als Quelle die Option Kinesis aus.

  4. Wählen Sie den Kinesis-Stream, für den Sie die Zuordnung von Ereignisquellen erstellen möchten und optional einen Konsumenten Ihres Streams.

  5. (Optional) Bearbeiten Sie die Batch-Größe, die Startposition und das Batch-Fenster für Ihre Zuordnung von Ereignisquellen.

  6. Wählen Sie Hinzufügen aus.

Wenn Sie die Zuordnung von Ereignisquellen von der Konsole aus erstellen, muss Ihre IAM-Rolle über die Berechtigungen kinesis:ListStreams und kinesis:ListStreamConsumers verfügen.

AWS CLI
Die Kinesis-Zuordnung von Ereignisquellen erstellen
  • Führen Sie den folgenden CLI-Befehl aus, um eine Kinesis-Zuordnung von Ereignisquellen zu erstellen. Wählen Sie Ihre eigene Batch-Größe und Startposition entsprechend Ihrem Anwendungsfall.

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

Um ein Batching-Fenster anzugeben, fügen Sie die --maximum-batching-window-in-seconds-Option hinzu. Weitere Informationen zur Verwendung dieses und anderer Parameter finden Sie unter create-event-source-mapping in der AWS CLI Befehlsreferenz.

AWS SAM
Die Kinesis-Zuordnung von Ereignisquellen erstellen
  • Fügen Sie die KinesisEvent-Eigenschaft wie im folgenden Beispiel gezeigt in der Definition Ihrer Funktion hinzu:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

Weitere Informationen zum Erstellen einer Zuordnung von Ereignisquellen für Kinesis Data Streams in AWS SAM finden Sie im AWS Serverless Application Model Entwicklerhandbuch zu Kinesis.

Abfrage und Startposition des Streams

Beachten Sie, dass die Stream-Abfrage bei der Erstellung und Aktualisierung der Zuordnung von Ereignisquellen letztendlich konsistent ist.

  • Bei der Erstellung der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis mit der Abfrage von Ereignissen aus dem Stream begonnen wird.

  • Bei Aktualisierungen der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis die Abfrage von Ereignissen aus dem Stream gestoppt und neu gestartet wird.

Dieses Verhalten bedeutet, dass, wenn Sie LATEST als Startposition für den Stream angeben, die Zuordnung von Ereignisquellen bei der Erstellung oder Aktualisierung möglicherweise Ereignisse übersieht. Um sicherzustellen, dass keine Ereignisse übersehen werden, geben Sie die Startposition des Streams als TRIM_HORIZON oder AT_TIMESTAMP an.

Erstellen einer kontoübergreifenden Zuordnung von Ereignisquellen

Amazon Kinesis Data Streams unterstützt ressourcenbasierte Richtlinien. Aus diesem Grund können Sie Daten, die in einem AWS-Konto in einen Stream aufgenommen wurden, mit einer Lambda-Funktion in einem anderen Konto verarbeiten.

Um eine Zuordnung von Ereignisquellen für Ihre Lambda-Funktion zu erstellen, die einen Kinesis-Stream in einem anderen AWS-Konto verwendet, müssen Sie den Stream mithilfe einer ressourcenbasierten Richtlinie konfigurieren, um Ihrer Lambda-Funktion die Berechtigung zum Lesen von Elementen zu erteilen. Wie Sie Ihren Stream so konfigurieren, dass er kontoübergreifenden Zugriff zulässt, erfahren Sie unter Gemeinsamer Zugriff mit kontoübergreifenden AWS Lambda-Funktionen im Amazon Kinesis Streams Developer Guide.

Sobald Sie Ihren Stream mit einer ressourcenbasierten Richtlinie konfiguriert haben, die Ihrer Lambda-Funktion die erforderlichen Berechtigungen erteilt, erstellen Sie die Zuordnung von Ereignisquellen mit einer der im vorherigen Abschnitt beschriebenen Methoden.

Wenn Sie Ihre Zuordnung von Ereignisquellen über die Lambda-Konsole erstellen möchten, fügen Sie den ARN Ihres Streams direkt in das Eingabefeld ein. Wenn Sie einen Verbraucher für Ihren Stream angeben möchten, wird durch Einfügen der ARN des Verbrauchers automatisch das Stream-Feld ausgefüllt.