Verarbeitung von Batchdaten mit einer Lambda-Funktion in Step Functions - AWS Step Functions

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verarbeitung von Batchdaten mit einer Lambda-Funktion in Step Functions

In diesem Tutorial verwenden Sie das ItemBatcher (Karte) Feld des Distributed Map-Status, um einen ganzen Stapel von Elementen innerhalb einer Lambda-Funktion zu verarbeiten. Jeder Stapel enthält maximal drei Elemente. Der Status Distributed Map startet vier untergeordnete Workflow-Ausführungen, wobei jede Ausführung drei Elemente verarbeitet, während eine Ausführung ein einzelnes Element verarbeitet. Jede untergeordnete Workflow-Ausführung ruft eine Lambda-Funktion auf, die über die einzelnen im Batch vorhandenen Elemente iteriert.

Sie erstellen eine Zustandsmaschine, die eine Multiplikation mit einer Reihe von ganzen Zahlen durchführt. Angenommen, das Integer-Array, das Sie als Eingabe angeben, ist [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] und der Multiplikationsfaktor ist. 7 Dann wird das resultierende Array, das nach der Multiplikation dieser ganzen Zahlen mit dem Faktor 7 gebildet wird, sein. [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

Schritt 1: Erstellen Sie die Zustandsmaschine

In diesem Schritt erstellen Sie den Workflow-Prototyp der Zustandsmaschine, die einen ganzen Datenstapel an die Lambda-Funktion übergibt, die Sie in Schritt 2 erstellen werden.

  • Verwenden Sie die folgende Definition, um mithilfe der Step Functions Functions-Konsole eine Zustandsmaschine zu erstellen. Informationen zum Erstellen einer Zustandsmaschine finden Sie Schritt 1: Erstellen Sie den Workflow-Prototyp im Tutorial Erste Schritte mit der Verwendung von Distributed Map State.

    In dieser Zustandsmaschine definieren Sie einen Distributed Map-Status, der ein Array von 10 Ganzzahlen als Eingabe akzeptiert und dieses Array stapelweise an eine Lambda-Funktion weitergibt. 3 Die Lambda-Funktion iteriert über die einzelnen im Batch vorhandenen Elemente und gibt ein Ausgabearray mit dem Namen zurück. multiplied Das Ausgabearray enthält das Ergebnis der Multiplikation, die mit den im Eingabearray übergebenen Elementen durchgeführt wurde.

    Wichtig

    Achten Sie darauf, den Amazon-Ressourcennamen (ARN) der Lambda-Funktion im folgenden Code durch den Namen ARN der Funktion zu ersetzen, die Sie in Schritt 2 erstellen werden.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }

Schritt 2: Lambda-Funktion erstellen

In diesem Schritt erstellen Sie die Lambda-Funktion, die alle im Stapel übergebenen Elemente verarbeitet.

Wichtig

Stellen Sie sicher, dass Ihre Lambda-Funktion unter derselben steht AWS-Region als Ihre Zustandsmaschine.

So erstellen Sie die Lambda-Funktion:
  1. Verwenden Sie die Lambda-Konsole, um eine Python-Lambda-Funktion mit dem Namen zu erstellen. ProcessEntireBatch Informationen zum Erstellen einer Lambda-Funktion finden Sie unter Schritt 4: Lambda-Funktion konfigurieren im Tutorial Erste Schritte mit der Verwendung von Distributed Map state.

  2. Kopieren Sie den folgenden Code für die Lambda-Funktion und fügen Sie ihn in den Abschnitt Codequelle Ihrer Lambda-Funktion ein.

    import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
  3. Nachdem Sie Ihre Lambda-Funktion erstellt haben, kopieren Sie die in der oberen rechten Ecke der Seite ARN angezeigte Funktion. Das Folgende ist ein Beispiel, wo ARN Funktionsname ist der Name der Lambda-Funktion (in diesem FallProcessEntireBatch):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    Sie müssen die Funktion ARN in der Zustandsmaschine bereitstellen, die Sie in Schritt 1 erstellt haben.

  4. Wählen Sie Deploy, um die Änderungen bereitzustellen.

Schritt 3: Führen Sie die Zustandsmaschine aus

Wenn Sie den Zustandsmaschine ausführen, startet der Status Distributed Map vier untergeordnete Workflow-Ausführungen, wobei jede Ausführung drei Elemente verarbeitet, während eine Ausführung ein einzelnes Element verarbeitet.

Das folgende Beispiel zeigt die Daten, die von einer der untergeordneten Workflow-Ausführungen an die ProcessEntireBatchFunktion übergeben wurden.

{ "BatchInput": { "MyMultiplicationFactor": 7 }, "Items": [1, 2, 3] }

Angesichts dieser Eingabe zeigt das folgende Beispiel das angegebene Ausgabe-Arraymultiplied, das von der Lambda-Funktion zurückgegeben wird.

{ "statusCode": 200, "multiplied": [7, 14, 21] }

Die Zustandsmaschine gibt die folgende Ausgabe zurück, die vier Arrays enthält, die multiplied nach den vier untergeordneten Workflow-Ausführungen benannt sind. Diese Arrays enthalten die Multiplikationsergebnisse der einzelnen Eingabeelemente.

[ { "statusCode": 200, "multiplied": [7, 14, 21] }, { "statusCode": 200, "multiplied": [28, 35, 42] }, { "statusCode": 200, "multiplied": [49, 56, 63] }, { "statusCode": 200, "multiplied": [70] } ]

Um alle zurückgegebenen Array-Elemente zu einem einzigen Ausgabe-Array zu kombinieren, können Sie das ResultSelector Feld verwenden. Definieren Sie dieses Feld im Status Distributed Map, um alle multiplied Arrays zu finden, alle Elemente in diesen Arrays zu extrahieren und sie dann zu einem einzigen Ausgabe-Array zu kombinieren.

Um das ResultSelector Feld zu verwenden, aktualisieren Sie Ihre State-Machine-Definition wie im folgenden Beispiel gezeigt.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }

Die aktualisierte Zustandsmaschine gibt ein konsolidiertes Ausgabe-Array zurück, wie im folgenden Beispiel gezeigt.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }