SQS使用 Step Functions 快速工作流程處理來自 Amazon 的大量訊息 - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

SQS使用 Step Functions 快速工作流程處理來自 Amazon 的大量訊息

此範例專案示範如何使用 AWS Step Functions 用於處理來自大量事件來源的訊息或資料的快速工作流程,例如 Amazon 簡單佇列服務 (AmazonSQS)。由於快速工作流程能以非常高的速率啟動,因此非常適用於大量事件處理或串流資料工作負載。

以下是從事件來源執行狀態機器的兩種常用方法:

  • 設定 Amazon CloudWatch 事件規則,以便在事件來源發出事件時啟動狀態機器執行。如需詳細資訊,請參閱建立在 CloudWatch 事件上觸發的事件規則

  • 將事件來源映射至 Lambda 函數,然後撰寫函數程式碼以執行您的狀態機器。所以此 AWS Lambda 每次事件源發出事件時,都會調用函數,從而啟動狀態機執行。如需詳細資訊,請參閱使 AWS Lambda 與 Amazon SQS

此範例專案會在每次 Amazon SQS 佇列傳送訊息時使用第二種方法來啟動執行。您可以使用類似的組態,從其他事件來源觸發快速工作流程執行,例如 Amazon 簡單儲存服務 (Amazon S3)、Amazon DynamoDB 和 Amazon Kinesis。

如需有關 Express 工作流程和 Step Functions 服務整合的詳細資訊,請參閱下列內容:

步驟 1:建立狀態機

  1. 開啟 Step Functions 主控台,然後選擇建立狀態機器

  2. Process high-volume messages from SQS在搜尋方塊中輸入,然後從傳回的搜尋結果SQS中選擇 [處理大量郵件]。

  3. 選擇 Next (下一步) 繼續。

  4. 選擇 [執行示範] 以建立唯讀和 ready-to-deploy 工作流程,或選擇 [在其上建置] 建立可編輯的狀態機定義,您可以在其上建置並稍後進行部署。

    此範例專案會部署下列資源:

    • 四 Lambda 数

    • Amazon SQS 隊列

    • 同時 AWS Step Functions 狀態機器

    • 相關 AWS Identity and Access Management (IAM) 角色

    下列影像顯示「處理SQS範例專案中的大量訊息」的工作流程圖形:

    處理SQS範例專案中大量訊息的工作流程圖。
  5. 選擇「使用範本」繼續進行選取。

接下來的步驟取決於您之前的選擇:

  1. 執行示範 — 您可以先檢閱狀態機器,然後再建立唯讀專案,其中包含部署的資源 AWS CloudFormation 到您的 AWS 帳戶.

    您可以檢視狀態機器定義,當您準備就緒時,請選擇 [部署並執行] 以部署專案並建立資源。

    部署最多可能需要 10 分鐘的時間來建立資源和權限。您可以使用「堆疊 ID」連結來監控進度 AWS CloudFormation.

    部署完成後,您應該會在控制台中看到新的狀態機器。

  2. 建立在其上 — 您可以檢閱和編輯工作流程定義。您可能需要在範例專案中設定預留位置的值,然後才能嘗試執行自訂工作流程。

注意

部署到您帳戶的服務可能需要支付標準費用。

步驟 2:觸發狀態機執行

  1. 打開 Amazon SQS 控制台

  2. 選取由範例專案建立的佇列。

    該名稱將類似於示例-SQSQueue-wJalr XUtnFEMI

  3. Queue Actions (佇列動作) 清單中,選取 Send a Message (傳送訊息)

  4. 使用複製按鈕來複製以下訊息,然後在 Send a Message (傳送訊息) 視窗中,輸入該訊息,然後選擇 Send Message (傳送訊息)

    注意

    在此範例訊息中,input: 行已使用換行符號進行格式化以符合頁面。使用複製按鈕,或確保將該行輸入為不換行的單一行。

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. 選擇關閉

  6. 開啟「Step Functions」主控台

  7. 轉到您的 Amazon CloudWatch 日誌日誌組並檢查日誌。記錄群組的名稱看起來像 example-ExpressLogGroup-wJalr XUtnFEMI

範例 Lambda 函數代碼

以下是 Lambda 函數程式碼,顯示起始 Lambda 函數如何使用 AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

範例狀態機器程式碼

此範例專案中的快速工作流程,包含了一組用於文字處理的 Lambda 函數。

有關如何進一步了解 AWS Step Functions 可以控制其他 AWS 服務,請參閱整合服務與 Step Functions

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM範例

此範例 AWS Identity and Access Management (IAM) 範例專案所產生的原則包含執行狀態機器及相關資源所需的最低權限。我們建議您僅在IAM原則中加入必要的權限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

下列原則可確保 CloudWatch 記錄檔擁有足夠的權限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

有關如何在使用步驟函數與其他功能IAM時進行配置的資訊 AWS 服務,請參閱Step Functions 式如何為整合式服務產生IAM原則