使用 Athena 按順序並 parallel 執行查詢 - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Athena 按順序並 parallel 執行查詢

此範例專案示範如何連續執行 Athena 查詢,然後 parallel 處理錯誤,然後根據查詢成功還是失敗傳送 Amazon SNS 通知。

在這個專案中,Step Functions 會使用狀態機器同步執行 Athena 查詢。傳回查詢結果後,請輸入 parallel 狀態,並同時執行兩個 Athena 查詢。然後,它會等待任務成功或失敗,並傳送 Amazon SNS 主題,其中包含有關任務是成功還是失敗的訊息。

步驟 1:建立狀態機

  1. 開啟 Step Functions 主控台,然後選擇建立狀態機器

  2. Execute multiple queries在搜尋方塊中輸入,然後從傳回的搜尋結果中選擇 [執行多個查詢]。

  3. 選擇 Next (下一步) 繼續。

  4. 選擇 [執行示範] 以建立唯讀和 ready-to-deploy 工作流程,或選擇 [在其上建置] 建立可編輯的狀態機定義,您可以在其上建置並稍後進行部署。

    此範例專案會部署下列資源:

    • Amazon Athena queries

    • 同時 Amazon SNS 主題

    • 同時 AWS Step Functions 狀態機器

    • 相關 AWS Identity and Access Management (IAM) 角色

    下圖展示了「執行多個查詢」範例專案的工作流程圖形:

    執行多個查詢範例專案的工作流程圖形。
  5. 選擇「使用範本」繼續進行選取。

接下來的步驟取決於您之前的選擇:

  1. 執行示範 — 您可以先檢閱狀態機器,然後再建立唯讀專案,其中包含部署的資源 AWS CloudFormation 到您的 AWS 帳戶.

    您可以檢視狀態機器定義,當您準備就緒時,請選擇 [部署並執行] 以部署專案並建立資源。

    部署最多可能需要 10 分鐘的時間來建立資源和權限。您可以使用「堆疊 ID」連結來監控進度 AWS CloudFormation.

    部署完成後,您應該會在控制台中看到新的狀態機器。

  2. 建立在其上 — 您可以檢閱和編輯工作流程定義。您可能需要在範例專案中設定預留位置的值,然後才能嘗試執行自訂工作流程。

注意

部署到您帳戶的服務可能需要支付標準費用。

步驟 2:運行狀態機

  1. 在 [狀態電腦] 頁面上,選擇您的範例專案。

  2. 在範例專案頁面上,選擇 [開始執行]。

  3. 在 [開始執行] 對話方塊中,執行下列動作:

    1. (選擇性) 輸入自訂執行名稱,以覆寫產生的預設值。

      非ASCII名稱和記錄

      Step Functions 接受包含非ASCII字元的狀態機器、執行項目、活動和標籤的名稱。由於此類字元不適用於 Amazon CloudWatch,因此我們建議您僅使用ASCII字元,以便在中追蹤指標 CloudWatch。

    2. (選擇性) 在「入」方塊中,將輸入值輸入為JSON。如果您正在運行演示,則可以跳過此步驟。

    3. 選擇 Start execution (開始執行)

    「Step Functions」主控台會將您導向「執行詳細資訊」頁面,您可以在其中選擇「圖形」檢視中的狀態,以瀏覽步驟詳情窗格中的相關資訊。

範例狀態機器程式碼

此範例專案中的狀態機器會直接將參數傳遞至這些資源,與 Amazon Athena 和 Amazon SNS 整合。

瀏覽此範例狀態機器,瞭解 Step Functions 如何透SNS過連線至Resource欄位中的 Amazon 資源名稱 (ARN) 並傳遞Parameters至服務來控制 Amazon Athena 和 Amazon API。

有關如何進一步了解 AWS Step Functions 可以控制其他 AWS 服務,請參閱整合服務與 Step Functions

{ "Comment": "An example of using Athena to execute queries in sequence and parallel, with error handling and notifications.", "StartAt": "Generate Example Data", "States": { "Generate Example Data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<ATHENA_FUNCTION_NAME>" }, "Next": "Load Data to Database" }, "Load Data to Database": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Send query results" } ], "Next": "Map" }, "Map": { "Type": "Parallel", "ResultSelector": { "Query1Result.$": "$[0].ResultSet.Rows", "Query2Result.$": "$[1].ResultSet.Rows" }, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Send query results" } ], "Branches": [ { "StartAt": "Start Athena query 1", "States": { "Start Athena query 1": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Next": "Get Athena query 1 results" }, "Get Athena query 1 results": { "Type": "Task", "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId" }, "End": true } } }, { "StartAt": "Start Athena query 2", "States": { "Start Athena query 2": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Next": "Get Athena query 2 results" }, "Get Athena query 2 results": { "Type": "Task", "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId" }, "End": true } } } ], "Next": "Send query results" }, "Send query results": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "Message.$": "$", "TopicArn": "<SNS_TOPIC_ARN>" }, "End": true } } }

IAM例子

此範例 AWS Identity and Access Management (IAM) 範例專案所產生的原則包含執行狀態機器及相關資源所需的最低權限。我們建議您僅在IAM原則中加入必要的權限。

AthenaStartQueryExecution

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:startQueryExecution", "athena:stopQueryExecution", "athena:getQueryExecution", "athena:getDataCatalog" ], "Resource": [ "arn:aws:athena:us-east-2:123456789012:workgroup/stepfunctions-athena-sample-project-workgroup-ztuvu9yuix", "arn:aws:athena:us-east-2:123456789012:datacatalog/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:DeleteDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition", "glue:DeletePartition", "glue:BatchDeletePartition" ], "Resource": [ "arn:aws:glue:us-east-2:123456789012:catalog", "arn:aws:glue:us-east-2:123456789012:database/*", "arn:aws:glue:us-east-2:123456789012:table/*", "arn:aws:glue:us-east-2:123456789012:userDefinedFunction/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }
AthenaGetQueryResults

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:getQueryResults" ], "Resource": [ "arn:aws:us-east-2:123456789012:workgroup/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::*" ] } ] }
SNSPublish

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "arn:aws:sns:us-east-2:123456789012:StepFunctionsSample-AthenaMultipleQueriese1ec229b-5cbe-4754-a8a8-078474bac878-SNSTopic-9AID0HEJT7TH" ] } ] }
LambdaInvokeFunction

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl:*" ] }, { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-2:123456789012:function:StepFunctionsSample-Athen-LambdaForStringGeneratio-GQFQjN7mE9gl" ] } ] }

有關如何在使用步驟函數與其他功能IAM時進行配置的資訊 AWS 服務,請參閱Step Functions 式如何為整合式服務產生IAM原則