Verarbeiten Sie umfangreiche Nachrichten von Amazon SQS mit Step Functions Express-Workflows - AWS Step Functions

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verarbeiten Sie umfangreiche Nachrichten von Amazon SQS mit Step Functions Express-Workflows

Dieses Beispielprojekt zeigt die Verwendung eines AWS Step Functions Express-Workflow zur Verarbeitung von Nachrichten oder Daten aus einer Ereignisquelle mit hohem Volumen, z. B. Amazon Simple Queue Service (AmazonSQS). Da Express-Workflows mit sehr hoher Rate gestartet werden können, eignen sie sich ideal für die Verarbeitung von hochvolumigen Ereignissen oder für Streamingdaten-Workloads.

Im Folgenden finden Sie zwei häufig verwendete Methoden für die Ausführung Ihres Zustandsautomaten über eine Ereignisquelle:

  • Konfigurieren Sie eine Amazon CloudWatch Events-Regel, um eine State-Machine-Ausführung zu starten, wenn die Ereignisquelle ein Ereignis ausgibt. Weitere Informationen finden Sie unter Erstellen einer CloudWatch Ereignisregel, die bei einem Ereignis ausgelöst wird.

  • Ordnen Sie die Ereignisquelle einer Lambda-Funktion zu und schreiben Sie Funktionscode zur Ausführung Ihres Zustandsautomaten. Das Tool AWS Lambda Die Funktion wird jedes Mal aufgerufen, wenn Ihre Ereignisquelle ein Ereignis ausgibt, wodurch wiederum eine State-Machine-Ausführung gestartet wird. Weitere Informationen finden Sie unter Verwenden AWS Lambda mit Amazon SQS.

Dieses Beispielprojekt verwendet die zweite Methode, um jedes Mal eine Ausführung zu starten, wenn die SQS Amazon-Warteschlange eine Nachricht sendet. Sie können eine ähnliche Konfiguration verwenden, um die Ausführung von Express Workflows aus anderen Ereignisquellen wie Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB und Amazon Kinesis auszulösen.

Weitere Informationen zu Express Workflows- und Step Functions Functions-Dienstintegrationen finden Sie im Folgenden:

Schritt 1: Erstellen Sie die Zustandsmaschine

  1. Öffnen Sie die Step Functions Functions-Konsole und wählen Sie Create State Machine.

  2. Geben Sie Process high-volume messages from SQS etwas in das Suchfeld ein, und wählen Sie dann in den zurückgegebenen Suchergebnissen die Option Nachrichten mit hohem Volumen verarbeiten SQS aus.

  3. Wählen Sie Next (Weiter), um fortzufahren.

  4. Wählen Sie „Demo ausführen“, um eine schreibgeschützte Datei und einen ready-to-deploy Workflow zu erstellen, oder wählen Sie „Darauf aufbauen“, um eine bearbeitbare Zustandsmaschinen-Definition zu erstellen, auf der Sie aufbauen und diese später bereitstellen können.

    Dieses Beispielprojekt stellt die folgenden Ressourcen bereit:

    • Vier Lambda-Funktionen

    • Eine SQS Amazon-Warteschlange

    • Importieren in &S3; AWS Step Functions Zustandsautomat

    • Verwandt AWS Identity and Access Management (IAM) Rollen

    Die folgende Abbildung zeigt das Workflow-Diagramm für das SQS Beispielprojekt „Nachrichten mit hohem Volumen verarbeiten“:

    Workflow-Diagramm des SQS Beispielprojekts Nachrichten mit hohem Volumen verarbeiten.
  5. Wählen Sie Vorlage verwenden, um mit Ihrer Auswahl fortzufahren.

Die nächsten Schritte hängen von Ihrer vorherigen Auswahl ab:

  1. Führen Sie eine Demo durch — Sie können den Status Machine überprüfen, bevor Sie ein schreibgeschütztes Projekt mit Ressourcen erstellen, die bereitgestellt werden von AWS CloudFormation zu Ihrem AWS-Konto.

    Sie können sich die State-Machine-Definition ansehen. Wenn Sie bereit sind, wählen Sie Deploy and run, um das Projekt bereitzustellen und die Ressourcen zu erstellen.

    Die Bereitstellung kann bis zu 10 Minuten dauern, bis Ressourcen und Berechtigungen erstellt sind. Sie können den Stack-ID-Link verwenden, um den Fortschritt zu überwachen in AWS CloudFormation.

    Nach Abschluss der Bereitstellung sollte Ihre neue Zustandsmaschine in der Konsole angezeigt werden.

  2. Darauf aufbauen — Sie können die Workflow-Definition überprüfen und bearbeiten. Möglicherweise müssen Sie Werte für Platzhalter im Beispielprojekt festlegen, bevor Sie versuchen, Ihren benutzerdefinierten Workflow auszuführen.

Anmerkung

Für Dienste, die für Ihr Konto bereitgestellt werden, können Standardgebühren anfallen.

Schritt 2: Auslösen der State-Machine-Ausführung

  1. Öffnen Sie die SQSAmazon-Konsole.

  2. Wählen Sie die Warteschlange aus, die vom Beispielprojekt erstellt wurde.

    Der Name wird Example- SQSQueue - ähneln wJalrXUtnFEMI.

  3. Wählen Sie in der Liste Queue Actions (Warteschlangenaktionen) die Option Send a Message (Eine Nachricht senden) aus.

  4. Verwenden Sie die Schaltfläche zum Kopieren, um die folgende Nachricht zu kopieren. Geben Sie sie dann im Fenster Send a Message (Eine Nachricht senden) ein und wählen Sie Send Message (Nachricht senden) aus.

    Anmerkung

    In dieser Beispielmeldung wurde die Zeile input: mit Zeilenumbrüchen formatiert, um sie an die Seite anzupassen. Verwenden Sie die Schaltfläche „Copy (Kopieren)“ oder stellen Sie anderweitig sicher, dass sie als einzelne Zeile ohne Umbrüche eingegeben wird.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. Klicken Sie auf Close (Schließen).

  6. Öffnen Sie die Step Functions Functions-Konsole.

  7. Gehen Sie zu Ihrer Amazon CloudWatch Logs-Protokollgruppe und überprüfen Sie die Protokolle. Der Name der Protokollgruppe wird wie Beispiel- ExpressLogGroup - aussehen wJalrXUtnFEMI.

Beispiel für einen Lambda-Funktionscode

Der folgende Lambda-Funktionscode zeigt, wie die initiierende Lambda-Funktion eine State-Machine-Ausführung mit dem AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

Code des Zustandsautomaten aus diesem Beispiel

Der Express-Workflow in diesem Beispielprojekt besteht aus einer Reihe von Lambda-Funktionen für die Textverarbeitung.

Weitere Informationen darüber, wie AWS Step Functions kann andere kontrollieren AWS Dienstleistungen, sieheIntegration von Diensten mit Step Functions.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAMBeispiel

Dieses Beispiel AWS Identity and Access Management (IAM) Die vom Beispielprojekt generierte Richtlinie beinhaltet die geringsten Rechte, die für die Ausführung des Zustandsmaschinen und der zugehörigen Ressourcen erforderlich sind. Wir empfehlen, dass Sie nur die Berechtigungen in Ihre IAM Richtlinien aufnehmen, die erforderlich sind.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

Die folgende Richtlinie stellt sicher, dass genügend Berechtigungen für CloudWatch Protokolle vorhanden sind.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Informationen zur Konfiguration IAM bei der Verwendung von Step Functions mit anderen AWS Dienste finden Sie unterSo generiert Step Functions IAM Richtlinien für integrierte Dienste.