Traitez de gros volumes de messages provenant d'Amazon SQS avec les flux de travail Step Functions Express - AWS Step Functions

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Traitez de gros volumes de messages provenant d'Amazon SQS avec les flux de travail Step Functions Express

Cet exemple de projet montre comment utiliser un AWS Step Functions Express Workflow pour traiter des messages ou des données provenant d'une source d'événements à volume élevé, telle qu'Amazon Simple Queue Service (AmazonSQS). Comme Express Workflows peut être démarré à un rythme très élevé, il convient parfaitement pour le traitement d'événements à volume élevé ou les charges de travail de données en continu.

Voici deux méthodes couramment utilisées pour exécuter votre machine d'état à partir d'une source d'événement :

  • Configurez une règle Amazon CloudWatch Events pour démarrer l'exécution d'une machine à états chaque fois que la source d'événements émet un événement. Pour plus d'informations, consultez la section Création d'une règle d' CloudWatch événements déclenchant un événement.

  • Mappez la source d'événement à une fonction Lambda et écrivez le code de fonction pour exécuter votre machine d'état. Le AWS Lambda la fonction est invoquée chaque fois que votre source d'événement émet un événement, ce qui lance à son tour une exécution par machine à états. Pour plus d'informations, voir Utilisation AWS Lambda avec Amazon SQS.

Cet exemple de projet utilise la deuxième méthode pour démarrer une exécution chaque fois que la SQS file d'attente Amazon envoie un message. Vous pouvez utiliser une configuration similaire pour déclencher l'exécution d'Express Workflows à partir d'autres sources d'événements, telles qu'Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB et Amazon Kinesis.

Pour plus d'informations sur les intégrations des services Express Workflows et Step Functions, consultez les rubriques suivantes :

Étape 1 : Création de la machine à états

  1. Ouvrez la console Step Functions et choisissez Create state machine.

  2. Tapez Process high-volume messages from SQS dans la zone de recherche, puis choisissez Traiter les messages volumineux à SQS partir des résultats de recherche renvoyés.

  3. Choisissez Next (Suivant) pour continuer.

  4. Choisissez Exécuter une démo pour créer un ready-to-deploy flux de travail et un mode de travail en lecture seule, ou choisissez Construire dessus pour créer une définition de machine à états modifiable sur laquelle vous pourrez vous appuyer pour la déployer ultérieurement.

    Cet exemple de projet déploie les ressources suivantes :

    • Fonction Four Lambda

    • Une SQS file d'attente Amazon

    • Un AWS Step Functions machine d'état

    • Relié AWS Identity and Access Management (IAM) rôles

    L'image suivante montre le graphique du flux de travail pour le projet Traiter les messages volumineux à partir d'SQSun exemple de projet :

    Graphique du flux de travail du traitement des messages volumineux provenant d'SQSun exemple de projet.
  5. Choisissez Utiliser le modèle pour poursuivre votre sélection.

Les prochaines étapes dépendent de votre choix précédent :

  1. Exécuter une démonstration : vous pouvez passer en revue la machine d'état avant de créer un projet en lecture seule avec des ressources déployées par AWS CloudFormation à votre Compte AWS.

    Vous pouvez consulter la définition de la machine d'état et, lorsque vous êtes prêt, choisissez Déployer et exécuter pour déployer le projet et créer les ressources.

    Le déploiement peut prendre jusqu'à 10 minutes pour créer des ressources et des autorisations. Vous pouvez utiliser le lien Stack ID pour suivre les progrès dans AWS CloudFormation.

    Une fois le déploiement terminé, vous devriez voir votre nouvelle machine d'état dans la console.

  2. Tirez parti de cette information : vous pouvez revoir et modifier la définition du flux de travail. Vous devrez peut-être définir des valeurs pour les espaces réservés dans l'exemple de projet avant de tenter d'exécuter votre flux de travail personnalisé.

Note

Des frais standard peuvent s'appliquer pour les services déployés sur votre compte.

Étape 2 : Déclencher l'exécution de la machine à états

  1. Ouvrez la SQSconsole Amazon.

  2. Sélectionnez la file d'attente qui a été créée par l'exemple de projet.

    Le nom sera similaire à Example- SQSQueue - wJalr XUtnFEMI.

  3. Dans la liste Actions en file d'attente, sélectionnez Envoyer un message.

  4. Utilisez le bouton Copier pour copier le message suivant et, dans la fenêtre Envoyer un message saisissez-le et sélectionnez le bouton Envoyer un message .

    Note

    Dans cet exemple de message, la ligne input: a été formatée avec des sauts de ligne pour s'adapter à la page. Utilisez le bouton Copier ou assurez-vous qu'il est entré sous la forme d'une seule ligne sans interruption.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Choisissez Close (Fermer).

  6. Ouvrez la console Step Functions.

  7. Accédez à votre groupe de CloudWatch journaux Amazon Logs et inspectez les journaux. Le nom du groupe de journaux ressemblera à example- ExpressLogGroup - wJalr XUtnFEMI.

Exemple de code de fonction Lambda

Le code de fonction Lambda ci-dessous montre comment la fonction Lambda initiatrice démarre une exécution par machine à états à l'aide du AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Exemple de code de machine d'état

Le workflow express de cet exemple de projet consiste en un ensemble de fonctions Lambda pour le traitement de texte.

Pour plus d'informations sur la façon dont AWS Step Functions peut contrôler d'autres AWS services, voirIntégrer les services avec Step Functions.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAMExemple

Cet exemple AWS Identity and Access Management (IAM) la politique générée par l'exemple de projet inclut le moindre privilège nécessaire pour exécuter la machine à états et les ressources associées. Nous vous recommandons de n'inclure que les autorisations nécessaires dans vos IAM politiques.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

La politique suivante garantit que les autorisations sont suffisantes pour les CloudWatch journaux.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Pour plus d'informations sur la configuration IAM lors de l'utilisation de Step Functions avec d'autres AWS services, voirComment Step Functions génère IAM des politiques pour les services intégrés.