Procesamiento de datos por lotes con una función Lambda en Step Functions - AWS Step Functions

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Procesamiento de datos por lotes con una función Lambda en Step Functions

En este tutorial, utilizará el campo ItemBatcher (Mapa) de estado Distributed Map para procesar un lote completo de elementos dentro de una función de Lambda. Cada lote contiene un máximo de tres elementos. El estado Distributed Map inicia cuatro ejecuciones de flujos de trabajo secundarios, en las que cada ejecución procesa tres elementos, mientras que una ejecución procesa un solo elemento. Cada ejecución del flujo de trabajo secundario invoca una función de Lambda que itera sobre los elementos individuales presentes en el lote.

Creará una máquina de estado que realiza la multiplicación de una matriz de números enteros. Supongamos que la matriz de enteros que proporciona como entrada es [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] y el factor de multiplicación es 7. Entonces, la matriz resultante formada después de multiplicar estos enteros por un factor de 7, será [7, 14, 21, 28, 35, 42, 49, 56, 63, 70].

Paso 1: Crear la máquina de estado

En este paso, creará el prototipo de flujo de trabajo de la máquina de estado que pasa un lote completo de datos a la función de Lambda que creará en el Paso 2.

  • Utilice la siguiente definición para crear una máquina de estado mediante la consola de Step Functions. Para obtener información sobre cómo crear una máquina de estado, consulte Paso 1: Crear el prototipo de flujo de trabajo en el tutorial Cómo empezar a usar el estado Distributed Map.

    En esta máquina de estado, se define un estado Distributed Map que acepta una matriz de 10 enteros como entrada y pasa esta matriz a una función de Lambda en lotes de 3. La función de Lambda itera sobre los elementos individuales presentes en el lote y devuelve una matriz de salida denominada multiplied. La matriz de salida contiene el resultado de la multiplicación realizada con los elementos pasados a la matriz de entrada.

    importante

    Asegúrese de sustituir el nombre de recurso de Amazon (ARN) de la función Lambda en el código siguiente por el ARN de la función que va a crear en el paso 2.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }

Paso 2: Crear la función de Lambda

En este paso creará la función de Lambda que procesa todos los elementos que se pasan en el lote.

importante

Asegúrese de que su función Lambda esté en el mismo nivel Región de AWS como tu máquina de estados.

Para crear la función de Lambda
  1. Utilice la consola Lambda para crear una función Lambda de Python denominada. ProcessEntireBatch Para obtener información sobre la creación de una función de Lambda, consulte el paso 4: Configurar la función de Lambda en el tutorial Cómo empezar a usar el estado Distributed Map.

  2. Copie el siguiente código para la función de Lambda y péguelo en la sección Código fuente de la función de Lambda.

    import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
  3. Después de crear la función Lambda, copia la función que ARN se muestra en la esquina superior derecha de la página. A continuación se muestra un ejemplo en el que ARN nombre de la función es el nombre de la función Lambda (en este caso,ProcessEntireBatch):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    Deberá proporcionar la función ARN en la máquina de estados que creó en el paso 1.

  4. Elija Implementar para implementar estos cambios.

Paso 3: Ejecutar la máquina de estado

Al ejecutar la máquina de estado, el estado Distributed Map inicia cuatro ejecuciones de flujo de trabajo secundarias, en las que cada ejecución procesa tres elementos, mientras que una ejecución procesa un solo elemento.

En el siguiente ejemplo, se muestran los datos transferidos a la función ProcessEntireBatch por una de las ejecuciones del flujo de trabajo secundario.

{ "BatchInput": { "MyMultiplicationFactor": 7 }, "Items": [1, 2, 3] }

Con esta entrada, el siguiente ejemplo muestra la matriz de salida llamada multiplied que devuelve la función de Lambda.

{ "statusCode": 200, "multiplied": [7, 14, 21] }

La máquina de estado devuelve el siguiente resultado, que contiene cuatro matrices denominadas multiplied para las cuatro ejecuciones del flujo de trabajo secundario. Estas matrices contienen los resultados de la multiplicación de los elementos de entrada individuales.

[ { "statusCode": 200, "multiplied": [7, 14, 21] }, { "statusCode": 200, "multiplied": [28, 35, 42] }, { "statusCode": 200, "multiplied": [49, 56, 63] }, { "statusCode": 200, "multiplied": [70] } ]

Para combinar todos los elementos de la matriz devueltos en una sola matriz de salida, puede usar el campo ResultSelector. Defina este campo dentro del estado Map Distributed para buscar todas las matrices multiplied, extraer todos los elementos de estas matrices y, a continuación, combinarlos en una sola matriz de salida.

Para utilizar el campo ResultSelector, actualice la definición de la máquina de estado como se muestra en el siguiente ejemplo.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }

La máquina de estado actualizado devuelve una matriz de salida consolidada, como se muestra en el siguiente ejemplo.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }