Implementierung der statusbehafteten Kinesis Data Streams Streams-Verarbeitung in Lambda - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Implementierung der statusbehafteten Kinesis Data Streams Streams-Verarbeitung in Lambda

Lambda-Funktionen können kontinuierliche Stream-Verarbeitungsanwendungen ausführen. Ein Stream entspricht einer unbegrenzten Menge von Daten, die kontinuierlich durch Ihre Anwendung fließen. Um Informationen aus dieser sich ständig aktualisierenden Eingabe zu analysieren, können Sie die enthaltenen Datensätze mithilfe eines zeitlich definierten Fensters binden.

Rollierende Fenster sind unterschiedliche Zeitfenster, die sich in regelmäßigen Abständen öffnen und schließen. Standardmäßig sind Lambda-Aufrufe zustandslos – Sie können sie nicht für die Verarbeitung von Daten über mehrere kontinuierliche Aufrufe hinweg ohne eine externe Datenbank verwenden. Mit rollierenden Fenstern können Sie jedoch Ihren Status über Aufrufe hinweg beibehalten. Dieser Zustand enthält das Gesamtergebnis der Nachrichten, die zuvor für das aktuelle Fenster verarbeitet wurden. Ihr Zustand kann maximal 1 MB pro Shard betragen. Wenn er diese Größe überschreitet, wird Lambda das Fenster vorzeitig beenden.

Jeder Datensatz in einem Stream gehört zu einem bestimmten Fenster. Lambda verarbeitet jeden Datensatz mindestens einmal, garantiert jedoch nicht, dass jeder Datensatz nur einmal verarbeitet wird. In seltenen Fällen, etwa bei der Fehlerbehandlung, werden einige Datensätze möglicherweise mehrmals verarbeitet. Datensätze werden beim ersten Mal immer in der richtigen Reihenfolge verarbeitet. Wenn Datensätze mehr als einmal verarbeitet werden, werden sie nicht in der richtigen Reihenfolge verarbeitet.

Aggregation und Verarbeitung

Ihre benutzerverwaltete Funktion wird sowohl zur Aggregation als auch zur Verarbeitung der Endergebnisse dieser Aggregation aufgerufen. Lambda aggregiert alle im Fenster empfangenen Datensätze. Sie können diese Datensätze in mehreren Stapeln erhalten, jeweils als ein separater Aufruf. Jeder Aufruf erhält einen Zustand. Wenn Sie also rollierende Fenster verwenden, muss Ihre Lambda-Funktionsantwort eine state-Eigenschaft enthalten. Wenn die Antwort keine state-Eigenschaft enthält, betrachtet Lambda dies als fehlgeschlagenen Aufruf. Um diese Bedingung zu erfüllen, kann Ihre Funktion ein TimeWindowEventResponse-Objekt zurückgeben, das die folgende JSON-Form aufweist:

Beispiel TimeWindowEventResponse-Werte
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Anmerkung

Für Java-Funktionen empfehlen wir, eine Map<String, String> zu verwenden, um den Status darzustellen.

Am Ende des Fensters wird das Flag isFinalInvokeForWindow auf true gesetzt, um anzugeben, dass es sich um den Endzustand handelt und dass es für die Verarbeitung bereit ist. Nach der Verarbeitung werden das Fenster und Ihr endgültiger Aufruf wird abgeschlossen, und dann wird der Zustand gelöscht.

Am Ende Ihres Fensters verwendet Lambda die endgültige Verarbeitung für Aktionen an den Aggregationsergebnissen. Ihre endgültige Verarbeitung wird synchron aufgerufen. Nach erfolgreichem Aufruf zeigt Ihre Funktion auf die Sequenznummer und die Stream-Verarbeitung wird fortgesetzt. Wenn der Aufruf nicht erfolgreich ist, unterbricht Ihre Lambda-Funktion die weitere Verarbeitung bis zu einem erfolgreichen Aufruf.

Beispiel KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Konfiguration

Sie können rollierende Fenster konfigurieren, wenn Sie eine Ereignisquellenzuordnung erstellen oder aktualisieren. Um ein Tumbling-Fenster zu konfigurieren, geben Sie das Fenster in Sekunden an (). TumblingWindowInSeconds Der folgende Beispielbefehl AWS Command Line Interface (AWS CLI) erstellt eine Quellenzuordnung für Streaming-Ereignisse mit einem Wechselfenster von 120 Sekunden. Die für Aggregation und Verarbeitung definierte Lambda-Funktion wird tumbling-window-example-function genannt.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda bestimmt die rollierenden Fenstergrenzen basierend auf dem Zeitpunkt, zu dem Datensätze in den Stream eingefügt wurden. Für alle Datensätze steht ein ungefährer Zeitstempel zur Verfügung, den Lambda in Grenzbestimmungen verwendet.

Rollierende Fensteraggregationen unterstützen kein Resharding. Wenn ein Shard endet, betrachtet Lambda das aktuelle Fenster als geschlossen, und alle untergeordneten Shards beginnen ihr eigenes Fenster in einem neuen Zustand. Wenn dem aktuellen Fenster keine neuen Datensätze hinzugefügt werden, wartet Lambda bis zu 2 Minuten, bevor es annimmt, dass das Fenster vorbei ist. Dadurch wird sichergestellt, dass die Funktion alle Datensätze im aktuellen Fenster liest, auch wenn die Datensätze zeitweise hinzugefügt werden.

Rollierende Fenster unterstützen vollständig die bestehenden Wiederholungsrichtlinien maxRetryAttempts und maxRecordAge.

Beispiel Handler.py – Aggregation und Verarbeitung

Die folgende Python-Funktion veranschaulicht, wie Sie Ihren Endzustand aggregieren und dann verarbeiten:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}