Elabora messaggi ad alto volume da Amazon SQS con i flussi di lavoro Step Functions Express - AWS Step Functions

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Elabora messaggi ad alto volume da Amazon SQS con i flussi di lavoro Step Functions Express

Questo progetto di esempio dimostra come utilizzare un AWS Step Functions Express Workflow per elaborare messaggi o dati da una fonte di eventi ad alto volume, come Amazon Simple Queue Service (AmazonSQS). Dal momento che i flussi di lavoro Express possono essere avviati a una velocità molto elevata, sono ideali per i carichi di lavoro di elaborazione eventi o di streaming dei dati a volume elevato.

Di seguito sono indicati due metodi utilizzati comunemente per eseguire la macchina a stati da un'origine evento:

  • Configura una regola Amazon CloudWatch Events per avviare l'esecuzione di una macchina a stati ogni volta che l'origine dell'evento emette un evento. Per ulteriori informazioni, consulta Creazione di una regola per CloudWatch gli eventi che si attiva su un evento.

  • Mappare l'origine evento a una funzione Lambda e scrivere codice di funzione per eseguire la macchina a stati. Il AWS Lambda la funzione viene richiamata ogni volta che l'origine dell'evento emette un evento, avviando a sua volta l'esecuzione di una macchina a stati. Per ulteriori informazioni, vedere Utilizzo AWS Lambda con Amazon SQS.

Questo progetto di esempio utilizza il secondo metodo per avviare un'esecuzione ogni volta che la SQS coda Amazon invia un messaggio. Puoi utilizzare una configurazione simile per attivare l'esecuzione di Express Workflows da altre fonti di eventi, come Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB e Amazon Kinesis.

Per ulteriori informazioni sulle integrazioni dei servizi Express Workflows e Step Functions, consulta quanto segue:

Fase 1: Creare la macchina a stati

  1. Apri la console Step Functions e scegli Crea macchina a stati.

  2. Digita Process high-volume messages from SQS nella casella di ricerca, quindi scegli Elabora messaggi ad alto volume tra SQS i risultati di ricerca restituiti.

  3. Seleziona Successivo per continuare.

  4. Scegli Esegui una demo per creare un ready-to-deploy flusso di lavoro di sola lettura o scegli Crea su di esso per creare una definizione di macchina a stati modificabile da utilizzare e distribuire in un secondo momento.

    Questo progetto di esempio utilizza le seguenti risorse:

    • Quattro funzioni Lambda

    • Una SQS coda Amazon

    • Un record AWS Step Functions macchina a stati

    • Correlato AWS Identity and Access Management (IAM) ruoli

    L'immagine seguente mostra il grafico del flusso di lavoro per l'elaborazione di messaggi ad alto volume dal progetto SQS di esempio:

    Grafico del flusso di lavoro del processo di elaborazione dei messaggi ad alto volume dal progetto di SQS esempio.
  5. Scegli Usa modello per continuare con la selezione.

I passaggi successivi dipendono dalla scelta precedente:

  1. Esegui una demo: puoi esaminare la macchina a stati prima di creare un progetto di sola lettura con risorse distribuite da AWS CloudFormation al tuo Account AWS.

    Puoi visualizzare la definizione della macchina a stati e, quando sei pronto, scegli Implementa ed esegui per distribuire il progetto e creare le risorse.

    La creazione di risorse e autorizzazioni può richiedere fino a 10 minuti per la distribuzione. Puoi utilizzare il link Stack ID per monitorare i progressi in AWS CloudFormation.

    Una volta completata la distribuzione, dovresti vedere la tua nuova macchina a stati nella console.

  2. Sviluppala: puoi rivedere e modificare la definizione del flusso di lavoro. Potrebbe essere necessario impostare i valori per i segnaposto nel progetto di esempio prima di provare a eseguire il flusso di lavoro personalizzato.

Nota

Potrebbero essere applicati costi standard per i servizi distribuiti sul tuo account.

Fase 2: Attivare l'esecuzione della macchina a stati

  1. Apri la SQSconsole Amazon.

  2. Selezionare la coda creata dal progetto di esempio.

    Il nome sarà simile a Example- SQSQueue - wJalr XUtnFEMI.

  3. Dall'elenco Queue Actions (Azioni coda), selezionare Send a Message (Invia un messaggio).

  4. Utilizzare il pulsante di copia per copiare il seguente messaggio, quindi immetterlo nella finestra Send a Message (Invia un messaggio) e scegliere Send Message (Invia messaggio).

    Nota

    In questo messaggio di esempio sono state inserite interruzioni di riga nella riga input: per adattarla alla pagina. Utilizzare il pulsante di copia o assicurarsi in altro modo che venga immesso come singola riga senza interruzioni.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Scegli Chiudi.

  6. Apri la console Step Functions.

  7. Vai al tuo gruppo di CloudWatch log Amazon Logs e ispeziona i log. Il nome del gruppo di log sarà simile a example - -. ExpressLogGroup wJalr XUtnFEMI

Esempio di codice di funzione Lambda

Di seguito è riportato il codice della funzione Lambda che mostra come la funzione Lambda di avvio avvia l'esecuzione di una macchina a stati utilizzando AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Esempio di codice della macchina a stati

Il flusso di lavoro Express in questo progetto di esempio consiste in un insieme di funzioni Lambda per l'elaborazione del testo.

Per ulteriori informazioni su come AWS Step Functions può controllarne altri AWS servizi, vediIntegrazione dei servizi con Step Functions.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAMEsempio

In questo esempio AWS Identity and Access Management (IAM) la politica generata dal progetto di esempio include il minimo privilegio necessario per eseguire la macchina a stati e le risorse correlate. Ti consigliamo di includere solo le autorizzazioni necessarie nelle tue IAM politiche.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

La seguente politica garantisce che vi siano autorizzazioni sufficienti per i CloudWatch registri.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Per informazioni su come configurare l'IAMutilizzo di Step Functions con altri AWS servizi, vediIn che modo Step Functions genera IAM politiche per servizi integrati.