翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Step Functions Express ワークフローSQSを使用して Amazon からの大量のメッセージを処理
このサンプルプロジェクトでは、 の使用方法を示します。 AWS Step Functions Express ワークフローは、Amazon Simple Queue Service (Amazon ) などの大量のイベントソースからのメッセージまたはデータを処理しますSQS。Express ワークフローは非常に高いレートで開始できるため、大容量のイベント処理やストリーミングデータワークロードに最適です。
イベントソースからステートマシンを実行するために一般的に使用される 2 つの方法は次のとおりです。
-
イベントソースが CloudWatch イベントを発行するたびにステートマシンの実行を開始するように Amazon Events ルールを設定します。詳細については、 CloudWatch 「イベント でトリガーするイベントルールの作成」を参照してください。
-
イベントソースを Lambda 関数にマッピングし、ステートマシンを実行する関数コードを記述します。- AWS Lambda 関数は、イベントソースがイベントを発行するたびに呼び出され、ステートマシンの実行が開始されます。詳細については、「 の使用」を参照してください。 AWS Lambda Amazon でSQS。
このサンプルプロジェクトでは、Amazon SQSキューがメッセージを送信するたびに 2 番目のメソッドを使用して実行を開始します。同様の設定を使用して、Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB、Amazon Kinesis などの他のイベントソースから Express ワークフローの実行をトリガーできます。
Express ワークフローと Step Functions サービス統合の詳細については、以下を参照してください。
ステップ 1: ステートマシンを作成する
-
Step Functions コンソール
を開き、[ステートマシンの作成] を選択します。 -
Process high-volume messages from SQS
検索ボックスに「」と入力し、返された検索結果から「 からの大量のメッセージを処理するSQS」を選択します。 -
[次へ] を選択して続行します。
-
デモの実行 を選択して読み取り専用と ready-to-deploy ワークフローを作成するか、ビルド を選択して編集可能なステートマシン定義を作成し、それをビルドして後でデプロイできます。
このサンプルプロジェクトは、以下のリソースをデプロイします。
-
4 つの Lambda 関数
-
Amazon SQSキュー
-
An AWS Step Functions ステートマシン
-
関連 AWS Identity and Access Management (IAM) ロール
次の図は、サンプルプロジェクトからの大量のメッセージを処理するSQSワークフローグラフを示しています。
-
-
[テンプレートの使用] を選択して選択を続行します。
次のステップは、前の選択肢によって異なります。
-
デモの実行 – によってデプロイされたリソースを使用して読み取り専用プロジェクトを作成する前に、ステートマシンを確認できます。 AWS CloudFormation を に AWS アカウント.
ステートマシンの定義を表示でき、準備ができたら、デプロイと実行を選択してプロジェクトをデプロイし、リソースを作成します。
デプロイでは、リソースとアクセス許可の作成に最大 10 分かかる場合があります。スタック ID リンクを使用して、 の進行状況をモニタリングできます。 AWS CloudFormation.
デプロイが完了すると、コンソールに新しいステートマシンが表示されます。
-
構築 – ワークフロー定義を確認して編集できます。カスタムワークフローの実行を試みる前に、サンプルプロジェクトのプレースホルダーの値を設定する必要がある場合があります。
注記
アカウントにデプロイされたサービスには、標準料金が適用される場合があります。
ステップ 2: ステートマシンの実行をトリガーする
-
Amazon SQSコンソール
を開きます。 -
サンプルプロジェクトで作成されたキューを選択します。
名前は Example-SQSQueue- wJalrXUtnFEMIに似ています。
-
[Queue Actions] (キュー操作) リストで、[Send a Message] (メッセージの送信) を選択します。
-
コピーボタンを使用して次のメッセージをコピーし、[Send a Message] (メッセージの送信) ウィンドウに入力して、[Send Message] (メッセージの送信) を選択します。
注記
このサンプルメッセージでは、
input:
行がページに合わせて改行されています。コピーボタンを使用するか、改行なしの 1 行として入力されていることを確認します。{ "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW
-
[Close] (閉じる) を選択します。
-
Step Functions コンソールを開きます。
-
Amazon CloudWatch Logs ロググループ
に移動し、ログを検査します。ロググループの名前は example-ExpressLogGroup- wJalrXUtnFEMIのようになります。
Lambda 関数のコードの例
以下は、Lambda 関数の開始によって を使用してステートマシンの実行を開始する方法を示す Lambda 関数コードです。 AWS SDK.
import boto3
def lambda_handler(event, context):
message_body = event['Records'][0]['body']
client = boto3.client('stepfunctions')
response = client.start_execution(
stateMachineArn='${ExpressStateMachineArn}',
input=message_body
)
ステートマシンのコード例
このサンプルプロジェクトの Express ワークフローは、テキスト処理用の Lambda 関数のセットで構成されています。
方法の詳細については、 AWS Step Functions は他の を制御できます AWS サービスについては、「」を参照してくださいサービスと Step Functions の統合。
{
"Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.",
"StartAt": "Decode base64 string",
"States": {
"Decode base64 string": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Generate statistics"
},
"Generate statistics": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Remove special characters"
},
"Remove special characters": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Tokenize and count"
},
"Tokenize and count": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"End": true
}
}
}
IAM 例
この例では AWS Identity and Access Management サンプルプロジェクトによって生成された (IAM) ポリシーには、ステートマシンおよび関連リソースの実行に必要な最小特権が含まれています。IAM ポリシーに必要なアクセス許可のみを含めることをお勧めします。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI",
"arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF",
"arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI",
"arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF"
],
"Effect": "Allow"
}
]
}
次のポリシーは、 CloudWatch ログに十分なアクセス許可があることを保証します。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
]
}
他の で Step Functions を使用するIAMときに を設定する方法の詳細については、「」を参照してください。 AWS サービスについては、「」を参照してくださいStep Functions が統合サービスのIAMポリシーを生成する方法。