使用 AWS Step Functions 编排 ETL 管道,包含验证、转换和分区 - AWS Prescriptive Guidance

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 AWS Step Functions 编排 ETL 管道,包含验证、转换和分区

由 Sandip Gangapadhyay (AWS) 创建

代码存储库:aws-step-functions-etl-pipeline-pat tern

环境:生产

技术:分析;大数据;数据湖; DevOps;无服务器

Amazon Web Services:Amazon Athena、AWS Glue、AWS Lambda、AWS Step Functions

Summary

此示例介绍了如何构建无服务器提取、转换、加载(ETL)管道,以验证、转换、压缩和分区大型 CSV 数据集,从而实现性能和成本优化。该管道由 AWS Step Functions 编排,包含错误处理、自动重试和用户通知功能。

将 CSV 文件上传至 Amazon Simple Storage Service (Amazon S3) 存储桶文件夹,ETL 管道开始运行。该管道验证源 CSV 文件的内容和架构,将 CSV 文件转换为压缩 Apache Parquet 格式,按年、月和日对数据集进行分区,并将其存储在单独的文件夹中供分析工具处理。

自动执行此模式的代码可在带有 AWS Step Functi GitHub ons 存储库的 ETL Pipelin e 中找到。

先决条件和限制

先决条件

  • 一个有效的 Amazon Web Services account。

  • AWS 命令行接口 (AWS CLI) Line Interface 已使用您的 AWS 账户进行安装和配置,因此您可以通过部署 AWS 堆栈来创建 AW CloudFormation S 资源。建议使用 AWS CLI 第 2 版。有关安装说明,请参阅 AWS CLI 文档中的安装、更新和卸载 AWS CLI 版本 2。有关 AWS CLI 配置说明,请参阅 AWS CLI 文档中的配置和凭证文件设置

  • Amazon S3 存储桶。

  • 具有正确架构的 CSV 数据集。(此模式中包含的代码存储库提供了示例 CSV 文件,其中包含您可以使用的正确架构和数据类型。)

  • 支持与 AWS 管理控制台 配合使用的 Web 浏览器。(请参阅支持的浏览器列表。)

  • AWS Glue 控制台访问权限。

  • AWS Step Functions 控制台访问权限。

限制

  • 在 AWS Step Functions 中,保存历史日志最大限制为 90 天。有关更多信息,请参阅 AWS Step Functions 文档中的配额标准工作流配额

产品版本

  • 适用于 AWS Lambda 的 Python 3.11

  • AWS Glue 版本 2.0

架构

从 S3 源存储桶通过 Step Functions、AWS Glue 和 Amazon SNS 进行 ETL 处理,分为 10 个步骤。

图中所示的工作流包括以下高级步骤:

  1. 用户将 CSV 文件上传至 Amazon S3 中的源文件夹。

  2. Amazon S3 通知事件会启动 AWS Lambda 函数,该函数启动 Step Functions 状态机。

  3. Lambda 函数验证原始 CSV 文件架构和数据类型。

  4. 根据验证结果:

    1. 如源文件验证成功,则文件将移至舞台文件夹进行进一步处理。

    2. 如果验证失败,文件将移至错误文件夹,并由 Amazon Simple Notification Service (Amazon SNS) 发送错误通知。

  5. AWS Glue 爬网程序从 Amazon S3 的阶段文件夹中创建原始文件架构。

  6. AWS Glue 作业将原始文件转换、压缩并分区为 Parquet 格式。

  7. AWS Glue 任务还会将文件移动至 Amazon S3 中的转换文件夹。

  8. AWS Glue 爬网程序会根据转换后的文件创建架构。生成的架构用于任何分析作业。您还可以使用 Amazon Athena 中运行临时查询。

  9. 如果管道在无错误的情况下完成,则架构文件将移至存档文件夹。如遇到任何错误,则会将文件移至错误文件夹。

  10. Amazon SNS 会根据管道完成状态发送通知,指示成功或者失败。

此模式中使用的所有 AWS 资源均为无服务器。没有需要管理的服务器。

工具

Amazon Web Services

  • AWS Glue – AWS Glue 是一项完全托管的 ETL 服务,可让客户轻松准备和加载数据以进行分析。

  • AWS Step Functions - AWS Step Functions 是一项无服务器编排服务,可让您搭配使用 AWS Lambda 函数和其他 Amazon Web Services 来构建业务关键型应用程序。通过 AWS Step Functions 图形控制台,您可将应用程序的工作流视为一系列事件驱动的步骤。

  • Amazon S3 – Amazon Simple Storage Service(Amazon S3)是一种对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。

  • Amazon SNS – Amazon Simple Notification Service (Amazon SNS) 是一项高度可用、耐用、安全、完全托管的 pub/sub 消息服务,可让您分离微服务、分布式系统和无服务器应用程序。

  • AWS Lambda - AWS Lambda 是一项计算服务,可帮助您运行代码,无需预置或管理服务器。只有在需要时 AWS Lambda 才运行您的代码,并且能自动扩缩,从每天几个请求扩展到每秒数千个请求。

代码

此模式的代码可在带有 AWS Step Fun GitHub ctions 存储库的 ETL Pipelin e 中找到。代码存储库包含以下文件和文件夹:

  • template.yml— 用于使用 AWS Step Functions 创建 ETL 管道的 AWS CloudFormation 模板。

  • parameter.json — 包含所有参数和参数值。您可更新此文件以更改参数值,如操作说明部分所述。

  • myLayer/python 文件夹 — 包含为此项目创建所需 AWS Lambda 层所需 Python 包。

  • lambda 文件夹 - 包含以下 Lambda 函数:

    • move_file.py — 将源数据集移动到存档、转换或错误文件夹。

    • check_crawler.py — 在 AWS Glue 爬网程序发送失败消息之前,根据 RETRYLIMIT  环境变量的配置多次检查其状态。

    • start_crawler.py — 启动 AWS Glue 爬网程序。

    • start_step_function.py — 启动 AWS Step Functions。

    • start_codebuild.py— 启动 AWS CodeBuild 项目。

    • validation.py — 验证输入的原始数据集。

    • s3object.py — 在 S3 存储桶内创建所需目录结构。

    • notification.py — 在管道结束时发送成功或错误通知。

若要使用示例代码,请按照操作部分的说明执行。

操作说明

任务描述所需技能

克隆示例代码存储库。

  1. 打开ETL Pipeline with AWS Step Functions 存储库。

  2. 在主存储库页面的文件列表上方选择代码,然后复制使用 HTTPS 克隆 下列出的 URL。

  3. 将工作目录更改为要存储示例文件的位置。

  4. 在终端或命令提示符处,输入命令:

    git clone <repoURL>

    其中,<repoURL> 是指您在步骤 2 中复制的 URL。

开发人员

更新参数值。

在存储库本地副本中,编辑 parameter.json 文件并更新默认参数值,如下所示:

  • pS3BucketName ─ 用于存储数据集的 S3 存储桶的名称。此模板将为您创建此存储桶。存储桶名称必须全局唯一。

  • pSourceFolder ─ S3 存储桶内用于上传源 CSV 文件的文件夹的名称。

  • pStageFolder ─ S3 存储桶内将在该过程中用作暂存区的文件夹的名称。

  • pTransformFolder ─ S3 存储桶内用于存储已转换和分区数据集的文件夹的名称。

  • pErrorFolder ─ S3 存储桶内的文件夹,如果无法验证源 CSV 文件,则该文件将被移至该文件夹。

  • pArchiveFolder  ─ S3 存储桶内用于存档源 CSV 文件的文件夹的名称。

  • pEmailforNotification ─ 用于接收成功/错误通知的有效电子邮件地址。

  • pPrefix─ AWS Glue 抓取工具名称中将使用的前缀字符串。

  • pDatasetSchema ─ 将对源文件进行验证的数据集架构。Cerberus Python 数据包用于源数据集验证。有关更多信息,请参阅 Cerberus 网站。

开发人员

上传源代码到 S3 存储桶。

在部署自动执行 ETL 管道的 CloudFormation 模板之前,必须打包 CloudFormation 模板的源文件并将其上传到 S3 存储桶。为此,通过您预配置的配置文件运行以下 AWS CLI 命令:

aws cloudformation package --template-file template.yml --s3-bucket <bucket_name> --output-template-file packaged.template --profile <profile_name>

其中:

  • <bucket_name> 是要部署堆栈的 AWS 区域中现有 S3 存储桶的名称。此存储桶用于存储 CloudFormation 模板的源代码包。

  • <profile_name> 是您在设置 AWS CLI 时预先配置的有效 AWS CLI 配置文件。

开发人员
任务描述所需技能

部署 CloudFormation 模板。

要部署 CloudFormation 模板,请运行以下 AWS CLI 命令:

aws cloudformation deploy --stack-name <stack_name> --template-file packaged.template --parameter-overrides file://parameter.json --capabilities CAPABILITY_IAM --profile <profile_name>

其中:

  • <stack_name>是 CloudFormation 堆栈的唯一标识符。

  • <profile-name> 是您预先配置的 AWS CLI 配置文件。

开发人员

查看进度。

AWS CloudFormation 控制台上,查看堆栈开发进度。当状态为时 CREATE_COMPLETE,表示堆栈已成功部署。

开发人员

记下 AWS Glue 数据库名称。

堆栈的输出 选项卡显示 AWS Glue 数据库名称。键名称为 GlueDBOutput

开发人员
任务描述所需技能

启动 ETL 管道。

  1. 导航到 S3 存储桶内的源文件夹(source 或您在 parameter.json 文件中设置的文件夹名称)。

  2. 将示例 CSV 文件上传至此文件夹。(代码存储库提供了一个名为 Sample_Bank_Transaction_Raw_Dataset.csv 的示例文件以供您使用。) 上传文件后,将通过 Step Functions 启动 ETL 管道。

  3. Step Functions 控制台,检查 ETL 管道状态。

开发人员

查看分区数据集。

ETL 管道完成后,确认分区数据集在 Amazon S3 转换文件夹(transform 或您在 parameter.json 文件中设置的文件夹名称)中可用。

开发人员

检查已分区 AWS Glue 数据库。

  1. AWS Glue 控制台,选择堆栈创建的 AWS Glue 数据库(这是上文中标注的数据库)。

  2. 验证 AWS Glue Data Catalog 中的分区表是否可用。

开发人员

运行查询。

(可选)使用 Amazon Athena 对已分区和转换的数据库运行临时查询。有关说明,请参阅 AWS 文档中的 使用 Amazon Athena 运行 SQL 查询

数据库分析师

故障排除

问题解决方案

AWS Glue 任务和爬虫的 AWS 身份和访问管理 (IAM) 权限

如果您进一步自定义 AWS Glue 任务或爬虫,请务必在 AWS Glue 任务使用的 IAM 角色中授予相应的 IAM 权限,或者向 AWS Lake Formation 提供数据权限。有关更多信息,请参阅 AWS 文档

相关资源

Amazon Web Services 文档

其他信息

下图显示了 Step Functions Inspector 面板 面板中成功建立 ETL 管道的 AWS Step Functions 工作流。

Step Functions 工作流程用于验证输入的.csv、抓取数据和运行 AWS Glue 作业。

下图所示为不合格的 ETL 管道 AWS Step Functions 工作流,原因是 Step Functions 检查器面板显示其输入验证错误。

Step Functions 工作流程失败,因此文件移动到错误文件夹。