Membangun pipeline data untuk menyerap, mengubah, dan menganalisis data Google Analytics menggunakan AWS DataOps Development Kit - AWS Prescriptive Guidance

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Membangun pipeline data untuk menyerap, mengubah, dan menganalisis data Google Analytics menggunakan AWS DataOps Development Kit

Anton Kukushkin dan Rudy Puig, Amazon Web Services

Ringkasan

Pola ini menjelaskan cara membuat pipeline data untuk menyerap, mengubah, dan menganalisis data Google Analytics dengan menggunakan AWS DataOps Development Kit (AWS DDK) dan lainnya. Layanan AWS AWS DDK adalah kerangka kerja pengembangan sumber terbuka yang membantu Anda membangun alur kerja data dan arsitektur data modern. AWS Salah satu tujuan utama AWS DDK adalah untuk menghemat waktu dan upaya yang biasanya dikhususkan untuk tugas-tugas pipa data padat karya, seperti mengatur jaringan pipa, membangun infrastruktur, dan menciptakan infrastruktur di balik itu. DevOps Anda dapat menurunkan tugas-tugas padat karya ini ke AWS DDK sehingga Anda dapat fokus pada penulisan kode dan aktivitas bernilai tinggi lainnya.

Prasyarat dan batasan

Prasyarat

Versi produk

  • Python 3.7 atau yang lebih baru

  • pip 9.0.3 atau yang lebih baru

Arsitektur

Tumpukan teknologi

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon Simple Queue Service (Amazon SQS)

  • AWS DataOps Kit Pengembangan (AWS DDK)

  • AWS Lambda

Arsitektur target

Diagram berikut menunjukkan proses berbasis peristiwa yang menyerap, mengubah, dan menganalisis data Google Analytics.

Menyerap, mengubah, dan menganalisis data Google Analytics dengan layanan AWS.

Diagram menunjukkan alur kerja berikut:

  1. Aturan acara CloudWatch terjadwal Amazon memanggil Amazon AppFlow.

  2. Amazon AppFlow menyerap data Google Analytics ke dalam bucket S3.

  3. Setelah data dicerna oleh bucket S3, notifikasi peristiwa akan EventBridge dibuat, ditangkap oleh aturan CloudWatch Peristiwa, dan kemudian dimasukkan ke dalam antrean Amazon SQS.

  4. Fungsi Lambda mengkonsumsi peristiwa dari antrian Amazon SQS, membaca objek S3 masing-masing, mengubah objek ke format Apache Parquet, menulis objek yang diubah ke bucket S3, dan kemudian membuat atau memperbarui definisi tabel. AWS Glue Data Catalog

  5. Kueri Athena berjalan melawan tabel.

Alat

AWS alat-alat

  • Amazon AppFlow adalah layanan integrasi yang dikelola sepenuhnya yang memungkinkan Anda bertukar data dengan aman antara aplikasi perangkat lunak sebagai layanan (SaaS).

  • Amazon Athena adalah layanan kueri interaktif yang membantu Anda menganalisis data secara langsung di Amazon S3 dengan menggunakan SQL standar.

  • Amazon CloudWatch membantu Anda memantau metrik sumber AWS daya Anda dan aplikasi yang Anda jalankan AWS secara real time.

  • Amazon EventBridge adalah layanan bus acara tanpa server yang membantu Anda menghubungkan aplikasi Anda dengan data waktu nyata dari berbagai sumber. Misalnya, AWS Lambda fungsi, titik akhir pemanggilan HTTP menggunakan tujuan API, atau bus acara di tempat lain. Akun AWS

  • Amazon Simple Storage Service (Amazon S3) adalah layanan penyimpanan objek berbasis cloud yang membantu Anda menyimpan, melindungi, dan mengambil sejumlah data.

  • Amazon Simple Queue Service (Amazon Simple Queue Service) menyediakan antrian host yang aman, tahan lama, dan tersedia yang membantu Anda mengintegrasikan dan memisahkan sistem dan komponen perangkat lunak terdistribusi.

  • AWS Lambdaadalah layanan komputasi yang membantu Anda menjalankan kode tanpa perlu menyediakan atau mengelola server. Ini menjalankan kode Anda hanya bila diperlukan dan skala secara otomatis, jadi Anda hanya membayar untuk waktu komputasi yang Anda gunakan.

  • AWS Cloud Development Kit (AWS CDK)adalah kerangka kerja untuk mendefinisikan infrastruktur cloud dalam kode dan menyediakannya. AWS CloudFormation

  • AWS DataOps Development Kit (AWS DDK) adalah kerangka kerja pengembangan sumber terbuka untuk membantu Anda membangun alur kerja data dan arsitektur data modern. AWS

Kode

Kode untuk pola ini tersedia di GitHub AWS DataOps Development Kit (AWS DDK) dan Menganalisis data Google Analytics dengan repositori Amazon AppFlow, Amazon Athena, AWS DataOps dan Development Kit.

Epik

TugasDeskripsiKeterampilan yang dibutuhkan

Kloning kode sumber.

Untuk mengkloning kode sumber, jalankan perintah berikut:

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps insinyur

Buat lingkungan virtual.

Arahkan ke direktori kode sumber, lalu jalankan perintah berikut untuk membuat lingkungan virtual:

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps insinyur

Instal dependensi.

Untuk mengaktifkan lingkungan virtual dan menginstal dependensi, jalankan perintah berikut:

source .venv/bin/activate && pip install -r requirements.txt
DevOps insinyur
TugasDeskripsiKeterampilan yang dibutuhkan

Bootstrap lingkungan.

  1. Konfirmasikan bahwa AWS CLI sudah diatur dengan kredensil yang valid untuk Anda. Akun AWS Untuk informasi selengkapnya, lihat Menggunakan profil bernama dalam AWS CLI dokumentasi.

  2. Jalankan perintah cdk bootstrap --profile [AWS_PROFILE].

DevOps insinyur

Menyebarkan data.

Untuk menyebarkan pipa data, jalankan cdk deploy --profile [AWS_PROFILE] perintah.

DevOps insinyur
TugasDeskripsiKeterampilan yang dibutuhkan

Validasi status tumpukan.

  1. Buka konsol AWS CloudFormation.

  2. Pada halaman Stacks, konfirmasikan bahwa status tumpukan DdkAppflowAthenaStack adalahCREATE_COMPLETE.

DevOps insinyur

Pemecahan Masalah

IsuSolusi

Penerapan gagal selama pembuatan AWS::AppFlow::Flow sumber daya dan Anda menerima kesalahan berikut: Connector Profile with name ga-connection does not exist

Konfirmasikan bahwa Anda membuat AppFlow konektor Amazon untuk Google Analytics dan menamakannyaga-connection.

Untuk petunjuk, lihat Google Analytics di AppFlow dokumentasi Amazon.

Sumber daya terkait

Informasi tambahan

AWS Pipa data DDK terdiri dari satu atau banyak tahapan. Dalam contoh kode berikut, Anda gunakan AppFlowIngestionStage untuk menyerap data dari Google Analytics, SqsToLambdaStage menangani transformasi data, dan AthenaSQLStage menjalankan kueri Athena.

Pertama, transformasi data dan tahap konsumsi dibuat, seperti contoh kode berikut menunjukkan:

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

Selanjutnya, DataPipeline konstruksi digunakan untuk “menghubungkan” tahapan bersama-sama dengan menggunakan EventBridge aturan, seperti contoh kode berikut menunjukkan:

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

Untuk contoh kode lainnya, lihat GitHub Menganalisis data Google Analytics dengan Amazon AppFlow, Amazon Athena, dan repositori AWS DataOps Development Kit.