使用查询大型数据集 AWS Glue 爬网程序 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用查询大型数据集 AWS Glue 爬网程序

此示例项目演示了如何在 Amazon S3 中提取大型数据集并将其分区 AWS Glue 抓取工具,然后对该分区执行 Amazon Athena 查询。

在这个项目中,Step Functions 状态机调用 AWS Glue 在 Amazon S3 中对大型数据集进行分区的爬虫程序。曾经 AWS Glue Crawler 返回成功消息,工作流程会对该分区执行 Athena 查询。查询执行成功完成后,亚马逊将向亚马逊SNS主题发送SNS通知。

第 1 步:创建状态机

  1. 打开 Step Functions 控制台,然后选择创建状态机

  2. 在搜索框中键入 Query large datasets,然后从返回的搜索结果中选择查询大型数据集

  3. 选择下一步以继续。

  4. 选择 “运行演示” 以创建只读和 ready-to-deploy 工作流程,或者选择 “在其上构建” 以创建可编辑的状态机定义,您可以在此基础上构建并稍后部署。

    该示例项目部署了以下资源:

    • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 Amazon S3 桶

    • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 AWS Glue 爬网程序

    • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 Amazon SNS topic

    • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 AWS Step Functions 状态机

    • 相关 AWS Identity and Access Management (IAM) 角色

    下图显示了查询大型数据集示例项目的工作流图:

    查询大型数据集示例项目的工作流图。
  5. 选择使用模板继续进行选择。

后续步骤取决于您之前的选择:

  1. 运行演示 — 您可以先查看状态机,然后再使用部署的资源创建只读项目 AWS CloudFormation 给你的 AWS 账户.

    您可以查看状态机定义,准备就绪后,选择部署并运行以部署项目并创建资源。

    部署最多可能需要 10 分钟才能创建资源和权限。您可以使用堆栈 ID 链接来监控进度 AWS CloudFormation.

    部署完成后,您应该会在控制台中看到您的新状态机。

  2. 在此基础上再接再厉 — 您可以查看和编辑工作流程定义。在尝试运行自定义工作流程之前,您可能需要为示例项目中的占位符设置值。

注意

部署到您的账户的服务可能会收取标准费用。

第 2 步:运行状态机

  1. 状态机页面上,选择您的示例项目。

  2. 在示例项目页面上,选择启动执行

  3. 启动执行对话框中,执行以下操作:

    1. (可选)输入自定义执行名称以覆盖生成的默认值。

      非ASCII姓名和日志

      Step Functions 接受状态机、执行、活动和包含非ASCII字符的标签的名称。由于此类字符不适用于亚马逊 CloudWatch,因此我们建议您仅使用ASCII字符,以便您可以跟踪中的指标 CloudWatch。

    2. (可选)在输入框中,将输入值输入为JSON。如果您正在运行演示,则可以跳过此步骤。

    3. 选择启动执行

    Step Functions 控制台将引导您进入执行详情页面,您可以在图表视图中选择状态以浏览步骤详细信息窗格中的相关信息。

示例状态机代码

此示例项目中的状态机与 Amazon S3 集成, AWS Glue、Amazon Athena 和 A SNS mazon,方法是将参数直接传递给这些资源。

浏览这个示例状态机,了解 Step Functions 如何控制 Amazon S3, AWS Glue、Amazon Athena 和SNS亚马逊,方法是连接到字段Resource中的亚马逊资源ARN名称 (),然后传递Parameters到该服务。API

有关如何做的更多信息 AWS Step Functions 可以控制其他 AWS 服务,请参阅将服务与 Step Functions 集成

{ "Comment": "An example demonstrates how to ingest a large data set in Amazon S3 and partition it through aws Glue Crawlers, then execute Amazon Athena queries against that partition.", "StartAt": "Start Crawler", "States": { "Start Crawler": { "Type": "Task", "Next": "Get Crawler status", "Parameters": { "Name": "<GLUE_CRAWLER_NAME>" }, "Resource": "arn:aws:states:::aws-sdk:glue:startCrawler" }, "Get Crawler status": { "Type": "Task", "Parameters": { "Name": "<GLUE_CRAWLER_NAME>" }, "Resource": "arn:aws:arn:aws:states:::aws-sdk:glue:getCrawler", "Next": "Check Crawler status" }, "Check Crawler status": { "Type": "Choice", "Choices": [ { "Variable": "$.Crawler.State", "StringEquals": "RUNNING", "Next": "Wait" } ], "Default": "Start an Athena query" }, "Wait": { "Type": "Wait", "Seconds": 30, "Next": "Get Crawler status" }, "Start an Athena query": { "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "<ATHENA_QUERYSTRING>", "WorkGroup": "<ATHENA_WORKGROUP>" }, "Type": "Task", "Next": "Get query results" }, "Get query results": { "Resource": "arn:aws:states:::athena:getQueryResults", "Parameters": { "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId" }, "Type": "Task", "Next": "Send query results" }, "Send query results": { "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "<SNS_TOPIC_ARN>", "Message": { "Input.$": "$.ResultSet.Rows" } }, "Type": "Task", "End": true } } }

IAM示例

这些例子 AWS Identity and Access Management (IAM) 示例项目生成的策略包括执行状态机和相关资源所需的最低权限。我们建议您在IAM策略中仅包含必要的权限。

AthenaGetQueryResults

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:getQueryResults" ], "Resource": [ "arn:aws:athena:us-east-2:123456789012:workgroup/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::*" ] } ] }
AthenaStartQueryExecution

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "athena:startQueryExecution", "athena:stopQueryExecution", "athena:getQueryExecution", "athena:getDataCatalog" ], "Resource": [ "arn:aws:athena:us-east-2:123456789012:workgroup/stepfunctions-athena-sample-project-workgroup-8v7bshiv70", "arn:aws:athena:us-east-2:123456789012:datacatalog/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:DeleteDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition", "glue:DeletePartition", "glue:BatchDeletePartition" ], "Resource": [ "arn:aws:glue:us-east-2:123456789012:catalog", "arn:aws:glue:us-east-2:123456789012:database/*", "arn:aws:glue:us-east-2:123456789012:table/*", "arn:aws:glue:us-east-2:123456789012:userDefinedFunction/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }
SNSPublish

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "arn:aws:sns:us-east-2:123456789012:StepFunctionsSample-AthenaIngestLargeDataset92bc4949-abf8-4a1e-9236-5b7c81b3efa3-SNSTopic-8Y5ZLI5AASXV" ] } ] }

有关在将 Step Functions 与其他功能一起使用IAM时如何进行配置的信息 AWS 服务,请参阅Step Functions 如何为集成服务生成IAM策略