Processamento de dados em lote com uma função Lambda em Step Functions - AWS Step Functions

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Processamento de dados em lote com uma função Lambda em Step Functions

Neste tutorial, você usa o campo ItemBatcher (Mapa) do estado Mapa distribuído para processar um lote inteiro de itens dentro de uma função do Lambda. Cada lote contém até três itens. O estado Mapa distribuído inicia quatro execuções secundárias do fluxo de trabalho, em que cada execução processa três itens, enquanto uma execução processa um só item. A execução de cada fluxo de trabalho secundário invoca uma função do Lambda que itera os itens individuais presentes no lote.

Você vai criar uma máquina de estado que executa a multiplicação em uma matriz de números inteiros. Digamos que a matriz de números inteiros que você fornece como entrada é [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] e o fator de multiplicação é 7. Então, a matriz resultante formada após a multiplicação desses números inteiros por um fator de sete será [7, 14, 21, 28, 35, 42, 49, 56, 63, 70].

Etapa 1: Criar a máquina de estado

Nesta etapa, você cria o protótipo de fluxo de trabalho da máquina de estado que passa um lote inteiro de dados para a função do Lambda que você vai criar na Etapa 2.

  • Use a definição a seguir para criar uma máquina de estado usando o console do Step Functions. Para obter informações sobre como criar uma máquina de estado, consulte Etapa 1: Criar o protótipo do fluxo de trabalho no tutorial Introdução ao uso do estado Mapa Distribuído.

    Nessa máquina de estado, você define um estado Mapa Distribuído que aceita uma matriz de 10 números inteiros como entrada e passa essa matriz para uma função do Lambda em lotes do 3. A função do Lambda itera os itens individuais presentes no lote e retorna uma matriz de saída chamada multiplied. A matriz de saída contém o resultado da multiplicação realizada nos itens passados na matriz de entrada.

    Importante

    Certifique-se de substituir o Amazon Resource Name (ARN) da função Lambda no código a seguir pelo ARN da função que você criará na Etapa 2.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }

Etapa 2: Criar a função do Lambda

Nesta etapa, você vai criar a função do Lambda que processa todos os itens passados no lote.

Importante

Certifique-se de que sua função Lambda esteja sob a mesma Região da AWS como sua máquina estatal.

Para criar a função do Lambda
  1. Use o console Lambda para criar uma função Python Lambda chamada. ProcessEntireBatch Para obter informações sobre como criar uma função do Lambda, consulte Etapa 4: Configurar a função do Lambda no tutorial Introdução ao uso do estado Mapa Distribuído.

  2. Copie o código a seguir para a função do Lambda e cole-o na seção Origem do código da função do Lambda.

    import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
  3. Depois de criar sua função Lambda, copie a função ARN exibida no canto superior direito da página. A seguir está um exemploARN, onde nome da função é o nome da função Lambda (nesse caso,ProcessEntireBatch):

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    Você precisará fornecer a função ARN na máquina de estado criada na Etapa 1.

  4. Escolha Implantar para implantar essas alterações.

Etapa 3: Executar a máquina de estado

Quando você executa a máquina de estado, o estado Mapa Distribuído inicia quatro execuções secundárias do fluxo de trabalho, em que cada execução processa três itens, enquanto uma execução processa um só item.

O exemplo a seguir mostra os dados que uma das execuções do fluxo de trabalho secundário passa para a função do ProcessEntireBatch.

{ "BatchInput": { "MyMultiplicationFactor": 7 }, "Items": [1, 2, 3] }

Com essa entrada, o exemplo a seguir mostra a matriz de saída chamada multiplied que a função do Lambda retorna.

{ "statusCode": 200, "multiplied": [7, 14, 21] }

A máquina de estado retorna a seguinte saída que contém quatro matrizes chamadas multiplied para as quatro execuções do fluxo de trabalho secundário. Essas matrizes contêm os resultados da multiplicação dos itens de entrada individuais.

[ { "statusCode": 200, "multiplied": [7, 14, 21] }, { "statusCode": 200, "multiplied": [28, 35, 42] }, { "statusCode": 200, "multiplied": [49, 56, 63] }, { "statusCode": 200, "multiplied": [70] } ]

Para combinar todos os itens da matriz retornados em uma só matriz de saída, use o campo ResultSelector. Defina esse campo dentro do estado Mapa Distribuído para encontrar todas as matrizes multiplied, extrair todos os itens dentro dessas matrizes e combiná-los em uma só matriz de saída.

Para usar o campo ResultSelector, atualize a definição da máquina de estado como mostra o exemplo a seguir.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }

A máquina de estado atualizada retorna uma matriz de saída consolidada, como mostra o exemplo a seguir.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }