使用AWS DataOps 开发套件构建数据管道,以提取、转换和分析 Google Analytics(分析)数据 - AWS Prescriptive Guidance

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用AWS DataOps 开发套件构建数据管道,以提取、转换和分析 Google Analytics(分析)数据

由 Anton Kukushkin (AWS) 和 Rudy Puig () 创作 AWS

代码库:AWSDDK示例-使用亚马逊 AppFlow、亚马逊 Athena 和开发套件分析谷歌分析数据 AWS DataOps

环境:PoC 或试点

技术: DataLakes; 分析; DevOps; 基础架构

工作负载:开源

AWS服务:亚马逊 AppFlow;亚马逊 Athena;;AWSLambda AWSCDK;亚马逊 S3

Summary

此模式描述了如何使用AWS DataOps 开发套件 (DDK) 和其他AWS服务构建数据管道来提取、转换和分析 Google Analytics(分析)数据。AWSDDK是一个开源开发框架,可帮助您在上面构建数据工作流程和现代数据架构AWS。的主要目标之一AWSDDK是为您节省通常用于劳动密集型数据管道任务的时间和精力,例如协调管道、构建基础设施和创建基础架构。 DevOps 您可以将这些劳动密集型任务转移到,AWSDDK这样您就可以专注于编写代码和其他高价值的活动。

先决条件和限制

先决条件

  • 一个活跃的AWS账户

  • 配置用于谷歌分析的 Amazon AppFlow 连接器

  • Pythonpip(Python 的包管理器)

  • Git,已安装和配置

  • AWS命令行界面 (AWSCLI),已安装配置

  • AWSCloud Development Kit (AWSCDK),已安装

产品版本

  • Python 3.7 或更高版本

  • pip 9.0.3 或更高版本

架构

技术堆栈

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • 亚马逊简单队列服务(亚马逊SQS)

  • AWS DataOps 开发套件 (DDK)

  • AWSLambda

目标架构

下图显示了摄取、转换和分析 Google Analytics 数据的事件驱动流程。

架构示意图

图表显示了以下工作流:

  1. 亚马逊 CloudWatch 计划的事件规则会调用亚马逊。 AppFlow

  2. 亚马逊将谷 AppFlow 歌分析数据提取到 S3 存储桶中。

  3. 在 S3 存储桶提取数据后,系统会生成中的 EventBridge 事件通知,由 CloudWatch 事件规则捕获,然后将其放入 Amazon SQS 队列中。

  4. Lambda 函数使用来自亚马逊SQS队列的事件,读取相应的 S3 对象,将对象转换为 Apache Parquet 格式,将转换后的对象写入 S3 存储桶,然后创建或更新 Glue 数据目录表定义AWS。

  5. Athena 查询针对此表运行。

工具

AWS工具

  • Amazon AppFlow 是一项完全托管的集成服务,使您能够在软件即服务 (SaaS) 应用程序之间安全地交换数据。

  • Amazon Athena 是一项交互式查询服务,可帮助您使用标准直接在 Amazon S3 中分析数据。SQL

  • Amazon CloudWatch 可帮助您实时监控您的AWS资源和运行的应用程序AWS的指标。

  • Amazon EventBridge 是一项无服务器事件总线服务,可帮助您将应用程序与来自各种来源的实时数据连接起来。例如,AWSLambda 函数、使用API目标的HTTP调用终端节点或其他账户中的事件总线。AWS

  • Amazon Simple Storage Service (Amazon S3) 是一项基于云的对象存储服务,可帮助您存储、保护和检索任意数量的数据。

  • Amazon Simple Queue Service (AmazonSQS) 提供安全、耐用且可用的托管队列,可帮助您集成和分离分布式软件系统和组件。

  • AWSLambda 是一项计算服务,可帮助您运行代码,而无需预置或管理服务器。它仅在需要时运行您的代码,并且能自动扩缩,因此您只需为使用的计算时间付费。

  • AWSCloud Development Kit (CDK) 是一个框架,用于在代码中定义云基础架构并通过它进行配置AWS CloudFormation。

  • AWS DataOps Developmen@@ t Kit (DDK) 是一个开源开发框架,可帮助你在其上构建数据工作流程和现代数据架构AWS。

代码

此模式的代码可在 GitHub AWS DataOps 开发套件 (DDK)使用亚马逊 AppFlow、Amazon Athena 和开发套件存储库分析谷歌分析数据中找到。AWS DataOps

操作说明

任务描述所需技能

克隆源代码。

要克隆源代码,请运行以下命令:

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps 工程师

创建虚拟环境。

导航到源代码目录,然后运行以下命令创建虚拟环境:

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps 工程师

安装依赖项。

要激活虚拟环境并安装依赖项,请运行以下命令:

source .venv/bin/activate && pip install -r requirements.txt
DevOps 工程师
任务描述所需技能

引导环境。

  1. 确认已使用您的AWS账户的有效凭证进行设置。AWS CLI有关更多信息,请参阅文档中的使用命名配置AWSCLI文件。

  2. 运行 cdk bootstrap --profile [AWS_PROFILE] 命令。

DevOps 工程师

部署数据。

要部署数据管线,请运行 cdk deploy --profile [AWS_PROFILE] 命令。

DevOps 工程师
任务描述所需技能

验证堆栈状态。

  1. 打开控制AWS CloudFormation 台

  2. 堆栈页面,确认堆栈 DdkAppflowAthenaStack 的状态为 CREATE_COMPLETE

DevOps 工程师

故障排除

问题解决方案

如果在创建 AWS::AppFlow::Flow 资源期间部署失败,您会收到以下错误:Connector Profile with name ga-connection does not exist

确认您已为 Google Analytics(分析)创建了亚马逊 AppFlow 连接器并将其命名ga-connection

有关说明,请参阅 Amazon AppFlow 文档中的 Google 分析

相关资源

其他信息

AWSDDK数据管道由一个或多个阶段组成。在以下代码示例中,您使用 AppFlowIngestionStage 从 Google Analytics 摄取数据,使用 SqsToLambdaStage 处理数据转换,使用 AthenaSQLStage 运行 Athena 查询。

首先,创建数据转换和摄取阶段,如以下代码示例所示:

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

接下来,使用该DataPipeline构造通过使用 EventBridge 规则将各个阶段 “连接” 在一起,如以下代码示例所示:

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

有关更多代码示例,请参阅使用亚马逊、Amazon AppFlow Athena AWS DataOps 和开发套件 GitHub 分析谷歌分析数据存储库