Processe mensagens de alto volume da Amazon SQS com fluxos de trabalho do Step Functions Express - AWS Step Functions

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Processe mensagens de alto volume da Amazon SQS com fluxos de trabalho do Step Functions Express

Este exemplo de projeto demonstra como usar um AWS Step Functions Express Workflow para processar mensagens ou dados de uma fonte de eventos de alto volume, como o Amazon Simple Queue Service (AmazonSQS). Como os fluxos de trabalho expressos podem ser iniciados em uma taxa muito alta, eles são ideais para processamento de eventos de alto volume ou cargas de trabalho de dados em streaming.

Aqui estão dois métodos comumente usados para executar sua máquina de estado de uma fonte de evento:

  • Configure uma regra do Amazon CloudWatch Events para iniciar a execução de uma máquina de estado sempre que a fonte do evento emitir um evento. Para obter mais informações, consulte Criação de uma regra de CloudWatch eventos que é acionada em um evento.

  • Mapeie a origem do evento para uma função do Lambda e escreva o código da função para executar sua máquina de estado. A ferramenta AWS Lambda a função é invocada toda vez que sua fonte de eventos emite um evento, iniciando, por sua vez, a execução de uma máquina de estado. Para obter mais informações, consulte Usando AWS Lambda com a Amazon SQS.

Esse projeto de amostra usa o segundo método para iniciar uma execução sempre que a SQS fila da Amazon envia uma mensagem. Você pode usar uma configuração semelhante para acionar a execução do fluxos de trabalho expressos a partir de outras fontes de eventos, como o Amazon Simple Storage Service (Amazon S3), o Amazon DynamoDB e o Amazon Kinesis.

Para obter mais informações sobre o fluxos de trabalho expressos e as integrações de serviço do Step Functions, consulte o seguinte:

Etapa 1: Criar a máquina de estado

  1. Abra o console do Step Functions e clique em Criar máquina de estado.

  2. Digite Process high-volume messages from SQS na caixa de pesquisa e escolha Processar mensagens de alto volume a SQS partir dos resultados da pesquisa que são retornados.

  3. Escolha Próximo para continuar.

  4. Escolha Executar uma demonstração para criar um ready-to-deploy fluxo de trabalho e somente leitura, ou escolha Criar nele para criar uma definição de máquina de estado editável na qual você possa criar e implantar posteriormente.

    ‎Este projeto de exemplo implementa os recursos a seguir.

    • Quatro funções do Lambda

    • Uma SQS fila da Amazon

    • Uma AWS Step Functions máquina de estado

    • Relacionado AWS Identity and Access Management (IAM) funções

    A imagem a seguir mostra o gráfico do fluxo de trabalho do Processar mensagens de alto volume do projeto de SQS amostra:

    Gráfico do fluxo de trabalho do processo de mensagens de alto volume do projeto de SQS amostra.
  5. Escolha Usar modelo para continuar com a seleção.

As próximas etapas dependem da sua escolha anterior:

  1. Execute uma demonstração — Você pode revisar a máquina de estado antes de criar um projeto somente para leitura com recursos implantados pelo AWS CloudFormation para o seu Conta da AWS.

    Você pode visualizar a definição da máquina de estado e, quando estiver pronto, escolher Implantar e executar para implantar o projeto e criar os recursos.

    A implantação pode levar até 10 minutos para criar recursos e permissões. Você pode usar o link Stack ID para monitorar o progresso no AWS CloudFormation.

    Após a conclusão da implantação, você deverá ver sua nova máquina de estado no console.

  2. Desenvolva com base nisso — você pode revisar e editar a definição do fluxo de trabalho. Talvez seja necessário definir valores para espaços reservados no projeto de amostra antes de tentar executar seu fluxo de trabalho personalizado.

nota

Cobranças padrão podem ser aplicadas aos serviços implantados em sua conta.

Etapa 2: Acionar a execução da máquina de estado

  1. Abra o SQSconsole da Amazon.

  2. Selecione a fila que foi criada pelo projeto de exemplo.

    O nome será semelhante a Example- SQSQueue - wJalr XUtnFEMI.

  3. Na lista Queue Actions (Ações da fila), selecione Send a Message (Enviar uma mensagem).

  4. Use o botão de cópia para copiar a seguinte mensagem e, na janela Send a Message (Enviar uma mensagem), insira-a e escolha Send Message (Enviar mensagem) .

    nota

    Nesta mensagem de exemplo, a linha input: foi formatada com quebras de linha para caber na página. Use o botão Copiar ou certifique-se de que ele seja inserido como uma única linha sem quebras.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Escolha Fechar.

  6. Abra o console do Step Functions.

  7. Acesse seu grupo de CloudWatch registros do Amazon Logs e inspecione os registros. O nome do grupo de registros será semelhante ao exemplo- ExpressLogGroup - wJalr XUtnFEMI.

Exemplo de código da função do Lambda

A seguir está o código da função Lambda que mostra como a função Lambda iniciante inicia a execução de uma máquina de estado usando o AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Exemplo de código da máquina de estado

O fluxo de trabalho expresso neste projeto de exemplo consiste em um conjunto de funções do Lambda para processamento de texto.

Para obter mais informações sobre como AWS Step Functions pode controlar outros AWS serviços, vejaIntegrando serviços com Step Functions.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAMExemplo

Esse exemplo AWS Identity and Access Management (IAM) a política gerada pelo projeto de amostra inclui o menor privilégio necessário para executar a máquina de estado e os recursos relacionados. Recomendamos que você inclua somente as permissões necessárias em suas IAM políticas.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

A política a seguir garante que haja permissões suficientes para CloudWatch os registros.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Para obter informações sobre como configurar IAM ao usar Step Functions com outros AWS serviços, vejaComo o Step Functions gera IAM políticas para serviços integrados.