Procesa mensajes de gran volumen de Amazon SQS con los flujos de trabajo de Step Functions Express - AWS Step Functions

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Procesa mensajes de gran volumen de Amazon SQS con los flujos de trabajo de Step Functions Express

En este proyecto de ejemplo se muestra cómo utilizar un AWS Step Functions Flujo de trabajo rápido para procesar mensajes o datos de una fuente de eventos de gran volumen, como Amazon Simple Queue Service (AmazonSQS). Dado que los flujos de trabajo rápidos se pueden iniciar a una velocidad muy elevada, son ideales para las cargas de trabajo de datos de streaming o procesamiento de eventos de un volumen elevado.

A continuación se muestran dos métodos utilizados con frecuencia para ejecutar la máquina de estado desde un origen de eventos:

  • Configure una regla de Amazon CloudWatch Events para iniciar la ejecución de una máquina de estados siempre que la fuente del evento emita un evento. Para obtener más información, consulte Creación de una regla de CloudWatch eventos que se active en un evento.

  • Asigne el origen de eventos a una función Lambda y escriba el código de función para ejecutar la máquina de estado. La AWS Lambda la función se invoca cada vez que la fuente de eventos emite un evento, lo que a su vez inicia una ejecución en una máquina de estados. Para obtener más información, consulte Uso AWS Lambda con Amazon SQS.

Este proyecto de ejemplo usa el segundo método para iniciar una ejecución cada vez que la SQS cola de Amazon envía un mensaje. Puede usar una configuración similar para activar la ejecución de flujos de trabajo rápidos desde otros orígenes de eventos, como Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB y Amazon Kinesis.

Para obtener más información acerca de los flujos de trabajo rápidos y las integraciones de servicios de Step Functions, consulte los siguientes temas:

Paso 1: Crear la máquina de estado

  1. Abra la consola de Step Functions y seleccione Crear máquina de estado.

  2. Escribe Process high-volume messages from SQS en el cuadro de búsqueda y, a continuación, selecciona Procesar mensajes de gran volumen SQS procedentes de los resultados de búsqueda que se devuelven.

  3. Elija Siguiente para continuar.

  4. Elija Ejecutar una demostración para crear un ready-to-deploy flujo de trabajo y de solo lectura, o elija Crear a partir de ella para crear una definición de máquina de estados editable sobre la que pueda crear e implementar posteriormente.

    En este proyecto de muestra se implementan los siguientes recursos:

    • Cuatro funciones de Lambda

    • Una SQS cola de Amazon

    • Un registro AWS Step Functions máquina de estado

    • ¿Relacionado AWS Identity and Access Management (IAM) roles

    La siguiente imagen muestra el gráfico del flujo de trabajo del proyecto de SQS ejemplo Procesar mensajes de gran volumen:

    Gráfico de flujo de trabajo del proyecto de SQS ejemplo para procesar mensajes de gran volumen.
  5. Elija Utilizar plantilla para continuar con la selección.

Los siguientes pasos dependen de la elección anterior:

  1. Realice una demostración: puede revisar la máquina de estados antes de crear un proyecto de solo lectura con los recursos desplegados por AWS CloudFormation a tu Cuenta de AWS.

    Puede ver la definición de la máquina de estados y, cuando esté listo, elija Implementar y ejecutar para implementar el proyecto y crear los recursos.

    La creación de recursos y permisos puede tardar hasta 10 minutos en implementarse. Puede utilizar el enlace Stack ID para supervisar el progreso en AWS CloudFormation.

    Una vez completada la implementación, deberías ver tu nueva máquina de estados en la consola.

  2. Concéntrese en él: puede revisar y editar la definición del flujo de trabajo. Es posible que tengas que establecer valores para los marcadores de posición en el proyecto de ejemplo antes de intentar ejecutar tu flujo de trabajo personalizado.

nota

Es posible que se apliquen cargos estándar por los servicios implementados en tu cuenta.

Paso 2: Activar la ejecución de la máquina de estado

  1. Abre la SQSconsola de Amazon.

  2. Seleccione la cola que ha creado el proyecto de ejemplo.

    El nombre será similar a Example- SQSQueue - wJalr XUtnFEMI.

  3. En la lista Acciones de cola, seleccione Enviar un mensaje.

  4. Utilice el botón de copiar para copiar el siguiente mensaje y, en la ventana Enviar un mensaje, escríbalo y seleccione el botón Enviar mensaje.

    nota

    En este mensaje de ejemplo, la línea de input: tiene un formato con saltos de línea para ajustarse a la página. Utilice el botón de copiar o asegúrese de que se introduzca como una línea única sin saltos de línea.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Elija Close.

  6. Abra la consola de Step Functions.

  7. Ve a tu grupo de CloudWatch registros de Amazon Logs e inspecciona los registros. El nombre del grupo de registros tendrá el siguiente aspecto: ExpressLogGroup - wJalr XUtnFEMI.

Código de función de Lambda de ejemplo

A continuación se muestra el código de una función Lambda que muestra cómo la función Lambda iniciadora inicia la ejecución de una máquina de estados mediante AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Código de la máquina de estado de ejemplo

El flujo de trabajo rápido de este proyecto de ejemplo está compuesto por un conjunto de funciones Lambda para procesar textos.

Para obtener más información sobre cómo AWS Step Functions puede controlar otros AWS servicios, consulteIntegración de servicios con Step Functions.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAMEjemplo

Este ejemplo AWS Identity and Access Management La política (IAM) generada por el proyecto de muestra incluye el mínimo de privilegios necesarios para ejecutar la máquina de estados y los recursos relacionados. Le recomendamos que incluya solo los permisos que sean necesarios en sus IAM políticas.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

La siguiente política garantiza que haya permisos suficientes para los CloudWatch registros.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Para obtener información sobre cómo configurar el uso IAM de Step Functions con otros AWS servicios, consulteCómo Step Functions genera IAM políticas para servicios integrados.