Crea una pipeline di dati per importare, trasformare e analizzare i dati di Google Analytics utilizzando il AWS DataOps Development Kit - Prontuario AWS

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Crea una pipeline di dati per importare, trasformare e analizzare i dati di Google Analytics utilizzando il AWS DataOps Development Kit

Creato da Anton Kukushkin (AWS) e Rudy Puig () AWS

Archivio di codice: AWS DDKesempi - Analisi dei dati di Google Analytics con Amazon AppFlow, Amazon Athena e Development Kit AWS DataOps

Ambiente: PoC o pilota

Tecnologie: DataLakes; Analisi DevOps; Infrastruttura

Carico di lavoro: open source

AWSservizi: Amazon AppFlow; Amazon Athena; AWS CDK AWS Lambda; Amazon S3

Riepilogo

Questo modello descrive come creare una pipeline di dati per importare, trasformare e analizzare i dati di Google Analytics utilizzando il AWS DataOps Development Kit () DDK e altri servizi. AWS AWSDDKÈ un framework di sviluppo open source che ti aiuta a creare flussi di lavoro di dati e un'architettura di dati moderna su. AWS Uno degli obiettivi principali di AWS DDK è quello di farti risparmiare tempo e fatica tipicamente dedicati alle attività di pipeline di dati ad alta intensità di manodopera, come l'orchestrazione delle pipeline, la creazione di infrastrutture e la creazione dell'infrastruttura alla base di tale infrastruttura. DevOps Puoi delegare queste attività ad alta intensità di lavoro in AWS DDK modo da poterti concentrare sulla scrittura di codice e su altre attività di alto valore.

Prerequisiti e limitazioni

Prerequisiti

Versioni del prodotto

  • Python 3.7 o versioni successive

  • pip 9.0.3 o versioni successive

Architettura

stack tecnologico

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • Servizio Amazon Simple Queue (AmazonSQS)

  • AWS DataOps Kit di sviluppo () DDK

  • AWSLambda

Architettura Target

Il diagramma seguente mostra il processo basato sugli eventi che acquisisce, trasforma e analizza i dati di Google Analytics.

Diagramma di architettura

Il diagramma mostra il flusso di lavoro seguente:

  1. Una regola per gli eventi CloudWatch pianificati di Amazon richiama Amazon. AppFlow

  2. Amazon AppFlow inserisce i dati di Google Analytics in un bucket S3.

  3. Dopo che i dati sono stati inseriti dal bucket S3, EventBridge vengono generate notifiche di eventi, acquisite da una regola CloudWatch Events e quindi inserite in una coda Amazon. SQS

  4. Una funzione Lambda consuma gli eventi dalla SQS coda Amazon, legge i rispettivi oggetti S3, trasforma gli oggetti in formato Apache Parquet, scrive gli oggetti trasformati nel bucket S3 e quindi crea o aggiorna la definizione della tabella Glue Data Catalog. AWS

  5. Una query Athena viene eseguita sulla tabella.

Strumenti

AWSstrumenti

  • Amazon AppFlow è un servizio di integrazione completamente gestito che consente di scambiare dati in modo sicuro tra applicazioni SaaS (Software as a Service).

  • Amazon Athena è un servizio di query interattivo che ti aiuta ad analizzare i dati direttamente in Amazon S3 utilizzando standard. SQL

  • Amazon ti CloudWatch aiuta a monitorare i parametri delle tue AWS risorse e delle applicazioni su cui esegui AWS in tempo reale.

  • Amazon EventBridge è un servizio di bus eventi senza server che ti aiuta a connettere le tue applicazioni con dati in tempo reale provenienti da una varietà di fonti. Ad esempio, funzioni AWS Lambda, endpoint di HTTP invocazione che utilizzano API destinazioni o bus di eventi in altri account. AWS

  • Amazon Simple Storage Service (Amazon S3) è un servizio di archiviazione degli oggetti basato sul cloud che consente di archiviare, proteggere e recuperare qualsiasi quantità di dati.

  • Amazon Simple Queue Service (AmazonSQS) fornisce una coda ospitata sicura, durevole e disponibile che ti aiuta a integrare e disaccoppiare sistemi e componenti software distribuiti.

  • AWSLambda è un servizio di elaborazione che ti aiuta a eseguire il codice senza dover effettuare il provisioning o gestire i server. Esegue il codice solo quando necessario e si ridimensiona automaticamente, quindi paghi solo per il tempo di elaborazione che utilizzi.

  • AWSCloud Development Kit (CDK) è un framework per definire l'infrastruttura cloud in codice e fornirla tramite AWS CloudFormation.

  • AWS DataOps Development Kit (DDK) è un framework di sviluppo open source che ti aiuta a creare flussi di lavoro di dati e un'architettura di dati moderna su. AWS

Codice

Il codice per questo pattern è disponibile negli archivi GitHub AWS DataOps Development Kit (DDK) e Analyzing Google Analytics con Amazon AppFlow, Amazon Athena AWS DataOps e Development Kit.

Epiche

AttivitàDescrizioneCompetenze richieste

Clona il codice sorgente.

Per clonare il codice sorgente, esegui il seguente comando:

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps ingegnere

Crea un ambiente virtuale.

Passa alla directory del codice sorgente, quindi esegui il comando seguente per creare un ambiente virtuale:

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps ingegnere

Installa le dipendenze.

Per attivare l'ambiente virtuale e installare le dipendenze, esegui il seguente comando:

source .venv/bin/activate && pip install -r requirements.txt
DevOps ingegnere
AttivitàDescrizioneCompetenze richieste

Avvia l'ambiente.

  1. Conferma che AWS CLI sia configurato con credenziali valide per il tuo AWS account. Per ulteriori informazioni, consulta la sezione Utilizzo di profili denominati nella AWS CLI documentazione.

  2. Esegui il comando cdk bootstrap --profile [AWS_PROFILE].

DevOps ingegnere

Distribuisci i dati.

Per distribuire la pipeline di dati, esegui il comando. cdk deploy --profile [AWS_PROFILE]

DevOps ingegnere
AttivitàDescrizioneCompetenze richieste

Convalida lo stato dello stack.

  1. Apri la console. AWS CloudFormation

  2. Nella pagina Stacks, verifica che lo stato dello stack DdkAppflowAthenaStack sia. CREATE_COMPLETE

DevOps ingegnere

Risoluzione dei problemi

ProblemaSoluzione

La distribuzione non riesce durante la creazione di una AWS::AppFlow::Flow risorsa e viene visualizzato il seguente errore: Connector Profile with name ga-connection does not exist

Conferma di aver creato un AppFlow connettore Amazon per Google Analytics e di avergli dato un nomega-connection.

Per istruzioni, consulta Google Analytics nella AppFlow documentazione di Amazon.

Risorse correlate

Informazioni aggiuntive

AWSDDKle pipeline di dati sono composte da una o più fasi. Nei seguenti esempi di codice, li usi AppFlowIngestionStage per importare dati da Google Analytics, SqsToLambdaStage gestire la trasformazione dei dati ed AthenaSQLStage eseguire la query Athena.

Innanzitutto, vengono create le fasi di trasformazione e ingestione dei dati, come mostra il seguente esempio di codice:

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

Successivamente, il DataPipeline costrutto viene utilizzato per «collegare» gli stadi utilizzando EventBridge delle regole, come mostra il seguente esempio di codice:

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

Per altri esempi di codice, consulta il GitHub repository Analisi dei dati di Google Analytics con Amazon AppFlow, Amazon Athena AWS DataOps e Development Kit.