Elaborazione di dati batch con una funzione Lambda in Step Functions - AWS Step Functions

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Elaborazione di dati batch con una funzione Lambda in Step Functions

In questo tutorial, usi il ItemBatcher (Mappa) campo dello stato della mappa distribuita per elaborare un intero batch di elementi all'interno di una funzione Lambda. Ogni batch contiene un massimo di tre articoli. Lo stato Distributed Map avvia quattro esecuzioni di workflow secondarie, in cui ogni esecuzione elabora tre elementi, mentre un'esecuzione elabora un singolo elemento. Ogni esecuzione del workflow secondario richiama una funzione Lambda che esegue un'iterazione sui singoli elementi presenti nel batch.

Creerai una macchina a stati che esegue la moltiplicazione su una matrice di numeri interi. Supponiamo che l'array di numeri interi fornito come input sia [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] e che il fattore di moltiplicazione sia. 7 Quindi, l'array risultante formato dopo aver moltiplicato questi numeri interi per un fattore 7, sarà. [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

Fase 1: Creare la macchina a stati

In questo passaggio, crei il prototipo di flusso di lavoro della macchina a stati che passa un intero batch di dati alla funzione Lambda che creerai nel passaggio 2.

  • Usa la seguente definizione per creare una macchina a stati utilizzando la console Step Functions. Per informazioni sulla creazione di una macchina a stati, Fase 1: Creare il prototipo del flusso di lavoro consultate il tutorial Guida introduttiva all'uso di Distributed Map state tutorial.

    In questa macchina a stati, si definisce uno stato della mappa distribuita che accetta un array di 10 numeri interi come input e lo passa a una funzione Lambda in batch di. 3 La funzione Lambda esegue un'iterazione sui singoli elementi presenti nel batch e restituisce un array di output denominato. multiplied L'array di output contiene il risultato della moltiplicazione eseguita sugli elementi passati nell'array di input.

    Importante

    Assicurati di sostituire Amazon Resource Name (ARN) della funzione Lambda nel codice seguente con quello ARN della funzione che creerai nello Step 2.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }

Fase 2: Creare la funzione Lambda

In questo passaggio, si crea la funzione Lambda che elabora tutti gli elementi passati nel batch.

Importante

Assicurati che la tua funzione Lambda sia sotto la stessa Regione AWS come macchina a stati.

Creazione della funzione Lambda
  1. Usa la console Lambda per creare una funzione Python Lambda denominata. ProcessEntireBatch Per informazioni sulla creazione di una funzione Lambda, consulta Fase 4: Configurazione della funzione Lambda nel tutorial Guida introduttiva all'uso di Distributed Map state.

  2. Copia il codice seguente per la funzione Lambda e incollalo nella sezione Codice sorgente della tua funzione Lambda.

    import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
  3. Dopo aver creato la funzione Lambda, copia la funzione ARN visualizzata nell'angolo superiore destro della pagina. Di seguito è riportato un esempio, in cui ARN nome della funzione è il nome della funzione Lambda (in questo caso,ProcessEntireBatch):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    Dovrai fornire la funzione ARN nella macchina a stati creata nel passaggio 1.

  4. Scegli Deploy per distribuire le modifiche.

Passaggio 3: Esegui la macchina a stati

Quando si esegue la macchina a stati, lo stato Distributed Map avvia quattro esecuzioni di workflow secondarie, in cui ogni esecuzione elabora tre elementi, mentre un'esecuzione elabora un singolo elemento.

L'esempio seguente mostra i dati passati alla ProcessEntireBatchfunzione da una delle esecuzioni secondarie del flusso di lavoro.

{ "BatchInput": { "MyMultiplicationFactor": 7 }, "Items": [1, 2, 3] }

Dato questo input, l'esempio seguente mostra l'array di output denominato multiplied restituito dalla funzione Lambda.

{ "statusCode": 200, "multiplied": [7, 14, 21] }

La macchina a stati restituisce il seguente output che contiene quattro array denominati multiplied per le quattro esecuzioni dei flussi di lavoro secondari. Questi array contengono i risultati della moltiplicazione dei singoli elementi di input.

[ { "statusCode": 200, "multiplied": [7, 14, 21] }, { "statusCode": 200, "multiplied": [28, 35, 42] }, { "statusCode": 200, "multiplied": [49, 56, 63] }, { "statusCode": 200, "multiplied": [70] } ]

Per combinare tutti gli elementi dell'array restituiti in un unico array di output, puoi usare il campo. ResultSelector Definisci questo campo all'interno dello stato della mappa distribuita per trovare tutti gli multiplied array, estrarre tutti gli elementi all'interno di questi array e quindi combinarli in un unico array di output.

Per utilizzare il ResultSelector campo, aggiorna la definizione della macchina a stati come mostrato nell'esempio seguente.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }

La macchina a stati aggiornata restituisce un array di output consolidato come mostrato nell'esempio seguente.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }