翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Development AWS DataOps Kit を使用して Google Analytics データを取り込み、変換、分析するためのデータパイプラインを構築する
作成者: Anton Kukushkin (AWS) と Rudy Puig (AWS)
環境:PoC またはパイロット | テクノロジー: DataLakes、分析 DevOps、インフラストラクチャ | |
ワークロード: オープンソース | AWS サービス: Amazon AppFlow、Amazon Athena 、AWSCDK、AWSLambda、Amazon S3 |
[概要]
このパターンでは、 AWS DataOps 開発キット (DDK) やその他の AWSサービスを使用して、Google Analytics データを取り込み、変換、分析するためのデータパイプラインを構築する方法について説明します。AWS DDK は、 でデータワークフローと最新のデータアーキテクチャを構築するのに役立つオープンソースの開発フレームワークですAWS。の主な目的の 1 AWSDDKつは、パイプラインのオーケストレーション、インフラストラクチャの構築、そのインフラストラクチャの DevOps 背後にある の作成など、通常、労働集約的なデータパイプラインタスクに専念する時間と労力を節約することです。これらの労力のかかるタスクを にオフロードAWSDDKして、コードやその他の価値の高いアクティビティの作成に集中できます。
前提条件と制限
前提条件
製品バージョン
Python 3.7 以降
pip 9.0.3 以降
アーキテクチャ
テクノロジースタック
Amazon AppFlow
Amazon Athena
Amazon CloudWatch
Amazon EventBridge
Amazon Simple Storage Service (Amazon S3)
Amazon Simple Queue Service (Amazon SQS)
AWS DataOps 開発キット (DDK)
AWS Lambda
ターゲット アーキテクチャ
次の図は、Google アナリティクスのデータを取り込み、変換、分析するイベント駆動型のプロセスを示しています。
この図表は、次のワークフローを示しています:
Amazon スケジュール CloudWatch されたイベントルールは、Amazon を呼び出します AppFlow。
Amazon は Google Analytics データを S3 バケットに AppFlow 取り込みます。
データが S3 バケットによって取り込まれると、 のイベント通知 EventBridge が生成され、 CloudWatch イベントルールによってキャプチャされ、Amazon SQSキューに入れられます。
Lambda 関数は、Amazon SQSキューからイベントを消費し、それぞれの S3 オブジェクトを読み取り、オブジェクトを Apache Parquet 形式に変換し、変換されたオブジェクトを S3 バケットに書き込み、Glue Data Catalog AWS テーブル定義を作成または更新します。
Athena クエリがテーブルに対して実行されます。
ツール
AWS ツール
Amazon AppFlow は、Software as a Service (SaaS) アプリケーション間でデータを安全に交換できるフルマネージド統合サービスです。
Amazon Athena は、標準の を使用して Amazon S3 でデータを直接分析するのに役立つインタラクティブなクエリサービスですSQL。
Amazon CloudWatch は、AWSリソースのメトリクスと、 で実行しているアプリケーションのAWSメトリクスをリアルタイムでモニタリングするのに役立ちます。
Amazon EventBridge は、アプリケーションをさまざまなソースからのリアルタイムデータに接続するのに役立つサーバーレスイベントバスサービスです。例えば、AWSLambda 関数、送信API先を使用したHTTP呼び出しエンドポイント、または他のAWSアカウントのイベントバスなどです。
Amazon Simple Storage Service (Amazon S3) は、どのようなデータ量であっても、データを保存、保護、取得することを支援するクラウドベースのオブジェクトストレージサービスです。
Amazon Simple Queue Service (Amazon SQS) は、安全で耐久性があり、利用可能なホストキューを提供し、分散ソフトウェアシステムとコンポーネントの統合と分離に役立ちます。
AWS Lambda は、サーバーのプロビジョニングや管理を必要とせずにコードを実行するのに役立つコンピューティングサービスです。必要に応じてコードを実行し、自動的にスケーリングするため、課金は実際に使用したコンピューティング時間に対してのみ発生します。
AWS Cloud Development Kit (CDK) は、コードでクラウドインフラストラクチャを定義し、 を通じてプロビジョニングするためのフレームワークですAWS CloudFormation。
AWS DataOps Development Kit (DDK)
は、 でデータワークフローと最新のデータアーキテクチャを構築するのに役立つオープンソースの開発フレームワークですAWS。
コード
このパターンのコードは、 GitHub AWS DataOps 開発キット (DDK)
エピック
タスク | 説明 | 必要なスキル |
---|---|---|
ソースコードを複製します。 | ソースコードのクローンを作成するには、次のコマンドを実行します。
| DevOps エンジニア |
仮想環境を作成します。 | ソースコードディレクトリに移動し、以下のコマンドを実行して仮想環境を作成します。
| DevOps エンジニア |
SDK の依存関係をインストールします。 | 次のコマンドを実行して、仮想環境を有効にし、依存関係をインストールします。
| DevOps エンジニア |
タスク | 説明 | 必要なスキル |
---|---|---|
環境を起動します。 |
| DevOps エンジニア |
データをデプロイします。 | データパイプラインをデプロイするには、 | DevOps エンジニア |
タスク | 説明 | 必要なスキル |
---|---|---|
スタックのステータスを検証します。 |
| DevOps エンジニア |
トラブルシューティング
問題 | ソリューション |
---|---|
| Google Analytics 用の Amazon AppFlow コネクタを作成し、名前を付けたことを確認します 手順については、Amazon AppFlow ドキュメントの「Google Analytics」を参照してください。 |
関連リソース
AWS DataOps 開発キット (DDK)
(GitHub) AWS DDK 例
(GitHub)
追加情報
AWS DDK データパイプラインは、1 つ以上のステージで構成されます。次のコード例では、AppFlowIngestionStage
を使用して Google Analytics からデータを取り込み、SqsToLambdaStage
を使用してデータ変換を処理し、AthenaSQLStage
を使用して Athena クエリを実行します。
まず、次のコード例に示すように、データ変換ステージと取り込みステージを作成します。
appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )
次に、次のコード例に示すように、 DataPipeline
コンストラクトを使用して、 EventBridge ルールを使用してステージをまとめて「ワイヤ」します。
( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )
コード例の詳細については、「Amazon 、 GitHub Amazon Athena AppFlow、および AWS DataOps Development Kit リポジトリを使用した Google Analytics データの分析