Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Membangun pipeline data untuk menyerap, mengubah, dan menganalisis data Google Analytics menggunakan AWS DataOps Development Kit
Anton Kukushkin dan Rudy Puig, Amazon Web Services
Ringkasan
Pola ini menjelaskan cara membuat pipeline data untuk menyerap, mengubah, dan menganalisis data Google Analytics dengan menggunakan AWS DataOps Development Kit (AWS DDK) dan lainnya. Layanan AWS AWS DDK adalah kerangka kerja pengembangan sumber terbuka yang membantu Anda membangun alur kerja data dan arsitektur data modern. AWS Salah satu tujuan utama AWS DDK adalah untuk menghemat waktu dan upaya yang biasanya dikhususkan untuk tugas-tugas pipa data padat karya, seperti mengatur jaringan pipa, membangun infrastruktur, dan menciptakan infrastruktur di balik itu. DevOps Anda dapat menurunkan tugas-tugas padat karya ini ke AWS DDK sehingga Anda dapat fokus pada penulisan kode dan aktivitas bernilai tinggi lainnya.
Prasyarat dan batasan
Prasyarat
Aktif Akun AWS
AppFlow Konektor Amazon untuk Google Analytics, dikonfigurasi
Python
dan pip (manajer paket Python) Git, diinstal dan dikonfigurasi
AWS Command Line Interface (AWS CLI), diinstal dan dikonfigurasi
AWS Cloud Development Kit (AWS CDK), dipasang
Versi produk
Python 3.7 atau yang lebih baru
pip 9.0.3 atau yang lebih baru
Arsitektur
Tumpukan teknologi
Amazon AppFlow
Amazon Athena
Amazon CloudWatch
Amazon EventBridge
Amazon Simple Storage Service (Amazon S3)
Amazon Simple Queue Service (Amazon SQS)
AWS DataOps Kit Pengembangan (AWS DDK)
AWS Lambda
Arsitektur target
Diagram berikut menunjukkan proses berbasis peristiwa yang menyerap, mengubah, dan menganalisis data Google Analytics.

Diagram menunjukkan alur kerja berikut:
Aturan acara CloudWatch terjadwal Amazon memanggil Amazon AppFlow.
Amazon AppFlow menyerap data Google Analytics ke dalam bucket S3.
Setelah data dicerna oleh bucket S3, notifikasi peristiwa akan EventBridge dibuat, ditangkap oleh aturan CloudWatch Peristiwa, dan kemudian dimasukkan ke dalam antrean Amazon SQS.
Fungsi Lambda mengkonsumsi peristiwa dari antrian Amazon SQS, membaca objek S3 masing-masing, mengubah objek ke format Apache Parquet, menulis objek yang diubah ke bucket S3, dan kemudian membuat atau memperbarui definisi tabel. AWS Glue Data Catalog
Kueri Athena berjalan melawan tabel.
Alat
AWS alat-alat
Amazon AppFlow adalah layanan integrasi yang dikelola sepenuhnya yang memungkinkan Anda bertukar data dengan aman antara aplikasi perangkat lunak sebagai layanan (SaaS).
Amazon Athena adalah layanan kueri interaktif yang membantu Anda menganalisis data secara langsung di Amazon S3 dengan menggunakan SQL standar.
Amazon CloudWatch membantu Anda memantau metrik sumber AWS daya Anda dan aplikasi yang Anda jalankan AWS secara real time.
Amazon EventBridge adalah layanan bus acara tanpa server yang membantu Anda menghubungkan aplikasi Anda dengan data waktu nyata dari berbagai sumber. Misalnya, AWS Lambda fungsi, titik akhir pemanggilan HTTP menggunakan tujuan API, atau bus acara di tempat lain. Akun AWS
Amazon Simple Storage Service (Amazon S3) adalah layanan penyimpanan objek berbasis cloud yang membantu Anda menyimpan, melindungi, dan mengambil sejumlah data.
Amazon Simple Queue Service (Amazon Simple Queue Service) menyediakan antrian host yang aman, tahan lama, dan tersedia yang membantu Anda mengintegrasikan dan memisahkan sistem dan komponen perangkat lunak terdistribusi.
AWS Lambdaadalah layanan komputasi yang membantu Anda menjalankan kode tanpa perlu menyediakan atau mengelola server. Ini menjalankan kode Anda hanya bila diperlukan dan skala secara otomatis, jadi Anda hanya membayar untuk waktu komputasi yang Anda gunakan.
AWS Cloud Development Kit (AWS CDK)adalah kerangka kerja untuk mendefinisikan infrastruktur cloud dalam kode dan menyediakannya. AWS CloudFormation
AWS DataOps Development Kit (AWS DDK)
adalah kerangka kerja pengembangan sumber terbuka untuk membantu Anda membangun alur kerja data dan arsitektur data modern. AWS
Kode
Kode untuk pola ini tersedia di GitHub AWS DataOps Development Kit (AWS DDK)
Epik
Tugas | Deskripsi | Keterampilan yang dibutuhkan |
---|---|---|
Kloning kode sumber. | Untuk mengkloning kode sumber, jalankan perintah berikut:
| DevOps insinyur |
Buat lingkungan virtual. | Arahkan ke direktori kode sumber, lalu jalankan perintah berikut untuk membuat lingkungan virtual:
| DevOps insinyur |
Instal dependensi. | Untuk mengaktifkan lingkungan virtual dan menginstal dependensi, jalankan perintah berikut:
| DevOps insinyur |
Tugas | Deskripsi | Keterampilan yang dibutuhkan |
---|---|---|
Bootstrap lingkungan. |
| DevOps insinyur |
Menyebarkan data. | Untuk menyebarkan pipa data, jalankan | DevOps insinyur |
Tugas | Deskripsi | Keterampilan yang dibutuhkan |
---|---|---|
Validasi status tumpukan. |
| DevOps insinyur |
Pemecahan Masalah
Isu | Solusi |
---|---|
Penerapan gagal selama pembuatan | Konfirmasikan bahwa Anda membuat AppFlow konektor Amazon untuk Google Analytics dan menamakannya Untuk petunjuk, lihat Google Analytics di AppFlow dokumentasi Amazon. |
Sumber daya terkait
Informasi tambahan
AWS Pipa data DDK terdiri dari satu atau banyak tahapan. Dalam contoh kode berikut, Anda gunakan AppFlowIngestionStage
untuk menyerap data dari Google Analytics, SqsToLambdaStage
menangani transformasi data, dan AthenaSQLStage
menjalankan kueri Athena.
Pertama, transformasi data dan tahap konsumsi dibuat, seperti contoh kode berikut menunjukkan:
appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )
Selanjutnya, DataPipeline
konstruksi digunakan untuk “menghubungkan” tahapan bersama-sama dengan menggunakan EventBridge aturan, seperti contoh kode berikut menunjukkan:
( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )
Untuk contoh kode lainnya, lihat GitHub Menganalisis data Google Analytics dengan Amazon AppFlow, Amazon Athena, dan repositori AWS DataOps Development Kit