Step Functions Express ワークフローSQSを使用して Amazon からの大量のメッセージを処理 - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Step Functions Express ワークフローSQSを使用して Amazon からの大量のメッセージを処理

このサンプルプロジェクトでは、 の使用方法を示します。 AWS Step Functions Express ワークフローは、Amazon Simple Queue Service (Amazon ) などの大量のイベントソースからのメッセージまたはデータを処理しますSQS。Express ワークフローは非常に高いレートで開始できるため、大容量のイベント処理やストリーミングデータワークロードに最適です。

イベントソースからステートマシンを実行するために一般的に使用される 2 つの方法は次のとおりです。

このサンプルプロジェクトでは、Amazon SQSキューがメッセージを送信するたびに 2 番目のメソッドを使用して実行を開始します。同様の設定を使用して、Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB、Amazon Kinesis などの他のイベントソースから Express ワークフローの実行をトリガーできます。

Express ワークフローと Step Functions サービス統合の詳細については、以下を参照してください。

ステップ 1: ステートマシンを作成する

  1. Step Functions コンソールを開き、[ステートマシンの作成] を選択します。

  2. Process high-volume messages from SQS 検索ボックスに「」と入力し、返された検索結果から「 からの大量のメッセージを処理するSQS」を選択します。

  3. [次へ] を選択して続行します。

  4. デモの実行 を選択して読み取り専用と ready-to-deploy ワークフローを作成するか、ビルド を選択して編集可能なステートマシン定義を作成し、それをビルドして後でデプロイできます。

    このサンプルプロジェクトは、以下のリソースをデプロイします。

    • 4 つの Lambda 関数

    • Amazon SQSキュー

    • An AWS Step Functions ステートマシン

    • 関連 AWS Identity and Access Management (IAM) ロール

    次の図は、サンプルプロジェクトからの大量のメッセージを処理するSQSワークフローグラフを示しています。

    サンプルプロジェクトからの大量のメッセージを処理するSQSのワークフローグラフ。
  5. [テンプレートの使用] を選択して選択を続行します。

次のステップは、前の選択肢によって異なります。

  1. デモの実行 – によってデプロイされたリソースを使用して読み取り専用プロジェクトを作成する前に、ステートマシンを確認できます。 AWS CloudFormation を に AWS アカウント.

    ステートマシンの定義を表示でき、準備ができたら、デプロイと実行を選択してプロジェクトをデプロイし、リソースを作成します。

    デプロイでは、リソースとアクセス許可の作成に最大 10 分かかる場合があります。スタック ID リンクを使用して、 の進行状況をモニタリングできます。 AWS CloudFormation.

    デプロイが完了すると、コンソールに新しいステートマシンが表示されます。

  2. 構築 – ワークフロー定義を確認して編集できます。カスタムワークフローの実行を試みる前に、サンプルプロジェクトのプレースホルダーの値を設定する必要がある場合があります。

注記

アカウントにデプロイされたサービスには、標準料金が適用される場合があります。

ステップ 2: ステートマシンの実行をトリガーする

  1. Amazon SQSコンソール を開きます。

  2. サンプルプロジェクトで作成されたキューを選択します。

    名前は Example-SQSQueue- wJalrXUtnFEMIに似ています。

  3. [Queue Actions] (キュー操作) リストで、[Send a Message] (メッセージの送信) を選択します。

  4. コピーボタンを使用して次のメッセージをコピーし、[Send a Message] (メッセージの送信) ウィンドウに入力して、[Send Message] (メッセージの送信) を選択します。

    注記

    このサンプルメッセージでは、input: 行がページに合わせて改行されています。コピーボタンを使用するか、改行なしの 1 行として入力されていることを確認します。

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. [Close] (閉じる) を選択します。

  6. Step Functions コンソールを開きます。

  7. Amazon CloudWatch Logs ロググループに移動し、ログを検査します。ロググループの名前は example-ExpressLogGroup- wJalrXUtnFEMIのようになります。

Lambda 関数のコードの例

以下は、Lambda 関数の開始によって を使用してステートマシンの実行を開始する方法を示す Lambda 関数コードです。 AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

ステートマシンのコード例

このサンプルプロジェクトの Express ワークフローは、テキスト処理用の Lambda 関数のセットで構成されています。

方法の詳細については、 AWS Step Functions は他の を制御できます AWS サービスについては、「」を参照してくださいサービスと Step Functions の統合

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM 例

この例では AWS Identity and Access Management サンプルプロジェクトによって生成された (IAM) ポリシーには、ステートマシンおよび関連リソースの実行に必要な最小特権が含まれています。IAM ポリシーに必要なアクセス許可のみを含めることをお勧めします。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

次のポリシーは、 CloudWatch ログに十分なアクセス許可があることを保証します。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

他の で Step Functions を使用するIAMときに を設定する方法の詳細については、「」を参照してください。 AWS サービスについては、「」を参照してくださいStep Functions が統合サービスのIAMポリシーを生成する方法