Créez un pipeline de données pour ingérer, transformer et analyser les données de Google Analytics à l'aide du kit de AWS DataOps développement - Recommandations AWS

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Créez un pipeline de données pour ingérer, transformer et analyser les données de Google Analytics à l'aide du kit de AWS DataOps développement

Créé par Anton Kukushkin (AWS) et Rudy Puig () AWS

Référentiel de code : AWS DDKexemples - Analyse des données Google Analytics avec Amazon AppFlow, Amazon Athena et Development Kit AWS DataOps

Environnement : PoC ou pilote

Technologies : DataLakes ; Analytique DevOps ; Infrastructure

Charge de travail : Open source

AWSservices : Amazon AppFlow ; Amazon Athena ; AWS CDK AWS Lambda ; Amazon S3

Récapitulatif

Ce modèle décrit comment créer un pipeline de données pour ingérer, transformer et analyser les données Google Analytics à l'aide du kit de AWS DataOps développement (DDK) et d'autres AWS services. AWSDDKIl s'agit d'un framework de développement open source qui vous aide à créer des flux de travail de données et une architecture de données moderne. AWS L'un des principaux objectifs du AWS DDK est de vous faire économiser le temps et les efforts généralement consacrés aux tâches de pipeline de données à forte intensité de main-d'œuvre, telles que l'orchestration de pipelines, la création d'infrastructures et la création de l'infrastructure DevOps sous-jacente. Vous pouvez vous décharger de ces tâches fastidieuses AWS DDK afin de vous concentrer sur l'écriture de code et d'autres activités à forte valeur ajoutée.

Conditions préalables et limitations

Prérequis

Versions du produit

  • Python 3.7 ou version ultérieure

  • pip 9.0.3 ou version ultérieure

Architecture

Pile technologique

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • Service de file d'attente Amazon Simple (AmazonSQS)

  • AWS DataOps Kit de développement (DDK)

  • AWSLambda

Architecture cible

Le schéma suivant montre le processus piloté par les événements qui ingère, transforme et analyse les données de Google Analytics.

Schéma d'architecture

Le schéma suivant illustre le flux de travail suivant :

  1. Une règle relative aux événements CloudWatch planifiés d'Amazon invoque Amazon AppFlow.

  2. Amazon AppFlow ingère les données de Google Analytics dans un compartiment S3.

  3. Une fois les données ingérées par le compartiment S3, les notifications d'événements EventBridge sont générées, capturées par une règle CloudWatch Events, puis placées dans une SQS file d'attente Amazon.

  4. Une fonction Lambda consomme les événements de la SQS file d'attente Amazon, lit les objets S3 respectifs, transforme les objets au format Apache Parquet, écrit les objets transformés dans le compartiment S3, puis crée ou met à jour la définition de la table AWS Glue Data Catalog.

  5. Une requête Athena s'exécute sur la table.

Outils

AWSoutils

  • Amazon AppFlow est un service d'intégration entièrement géré qui vous permet d'échanger des données en toute sécurité entre des applications SaaS (Software as a Service).

  • Amazon Athena est un service de requête interactif qui vous permet d'analyser les données directement dans Amazon S3 en utilisant la norme. SQL

  • Amazon vous CloudWatch aide à surveiller les indicateurs de vos AWS ressources et des applications que vous utilisez AWS en temps réel.

  • Amazon EventBridge est un service de bus d'événements sans serveur qui vous permet de connecter vos applications à des données en temps réel provenant de diverses sources. Par exemple, les fonctions AWS Lambda, les points de terminaison HTTP d'invocation utilisant des API destinations ou les bus d'événements dans d'autres comptes. AWS

  • Amazon Simple Storage Service (Amazon S3) est un service de stockage d'objets basé sur le cloud qui vous permet de stocker, de protéger et de récupérer n'importe quel volume de données.

  • Amazon Simple Queue Service (AmazonSQS) fournit une file d'attente hébergée sécurisée, durable et disponible qui vous aide à intégrer et à dissocier les systèmes et composants logiciels distribués.

  • AWSLambda est un service de calcul qui vous permet d'exécuter du code sans avoir à approvisionner ou à gérer des serveurs. Il exécute votre code uniquement lorsque cela est nécessaire et évolue automatiquement, de sorte que vous ne payez que pour le temps de calcul que vous utilisez.

  • AWSCloud Development Kit (CDK) est un framework permettant de définir l'infrastructure cloud dans le code et de la provisionner via AWS CloudFormation ce dernier.

  • AWS DataOps Development Kit (DDK) est un framework de développement open source qui vous aide à créer des flux de travail de données et une architecture de données moderne surAWS.

Code

Le code de ce modèle est disponible dans les GitHub AWS DataOps référentiels Development Kit (DDK) et Analyzing Google Analytics with Amazon AppFlow, Amazon Athena et AWS DataOps Development Kit.

Épopées

TâcheDescriptionCompétences requises

Clonez le code source.

Pour cloner le code source, exécutez la commande suivante :

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps ingénieur

Créez un environnement virtuel.

Accédez au répertoire du code source, puis exécutez la commande suivante pour créer un environnement virtuel :

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps ingénieur

Installez les dépendances.

Pour activer l'environnement virtuel et installer les dépendances, exécutez la commande suivante :

source .venv/bin/activate && pip install -r requirements.txt
DevOps ingénieur
TâcheDescriptionCompétences requises

Démarrez l'environnement.

  1. Vérifiez que le AWS CLI est configuré avec des informations d'identification valides pour votre AWS compte. Pour plus d'informations, consultez la section Utilisation de profils nommés dans la AWS CLI documentation.

  2. Exécutez la commande cdk bootstrap --profile [AWS_PROFILE].

DevOps ingénieur

Déployez les données.

Pour déployer le pipeline de données, exécutez la cdk deploy --profile [AWS_PROFILE] commande.

DevOps ingénieur
TâcheDescriptionCompétences requises

Validez l'état de la pile.

  1. Ouvrez la AWS CloudFormation console.

  2. Sur la page Stacks, vérifiez que le statut de la pile DdkAppflowAthenaStack estCREATE_COMPLETE.

DevOps ingénieur

Résolution des problèmes

ProblèmeSolution

Le déploiement échoue lors de la création d'une AWS::AppFlow::Flow ressource et le message d'erreur suivant s'affiche : Connector Profile with name ga-connection does not exist

Confirmez que vous avez créé un AppFlow connecteur Amazon pour Google Analytics et que vous l'avez nomméga-connection.

Pour obtenir des instructions, consultez Google Analytics dans la AppFlow documentation Amazon.

Ressources connexes

Informations supplémentaires

AWSDDKles pipelines de données sont composés d'une ou de plusieurs étapes. Dans les exemples de code suivants, vous les utilisez AppFlowIngestionStage pour ingérer des données provenant de Google Analytics, SqsToLambdaStage pour gérer la transformation des données et AthenaSQLStage pour exécuter la requête Athena.

Tout d'abord, les étapes de transformation et d'ingestion des données sont créées, comme le montre l'exemple de code suivant :

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

Ensuite, la DataPipeline construction est utilisée pour « relier » les étapes entre elles en utilisant des EventBridge règles, comme le montre l'exemple de code suivant :

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

Pour d'autres exemples de code, consultez le GitHub référentiel Analyse des données Google Analytics avec Amazon AppFlow, Amazon Athena et AWS DataOps le référentiel Development Kit.