Cree una canalización de datos para incorporar, transformar y analizar los datos de Google Analytics con el kit de AWS DataOps desarrollo - Recomendaciones de AWS

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Cree una canalización de datos para incorporar, transformar y analizar los datos de Google Analytics con el kit de AWS DataOps desarrollo

Creada por Anton Kukushkin (AWS) y Rudy Puig () AWS

Repositorio de código: AWS DDK ejemplos: análisis de datos de Google Analytics con Amazon AppFlow, Amazon Athena y AWS DataOps Development Kit

Entorno: PoC o piloto

Tecnologías: DataLakes análisis e DevOps infraestructura

Carga de trabajo: código abierto

AWSservicios: Amazon AppFlow; Amazon Athena; AWS CDK AWS Lambda; Amazon S3

Resumen

Este patrón describe cómo crear una canalización de datos para ingerir, transformar y analizar los datos de Google Analytics mediante el kit de AWS DataOps desarrollo (DDK) y otros servicios. AWS AWSDDKSe trata de un marco de desarrollo de código abierto que te ayuda a crear flujos de trabajo de datos y una arquitectura de datos moderna. AWS Uno de sus principales objetivos AWS DDK es ahorrarle el tiempo y el esfuerzo que normalmente se dedican a tareas de canalización de datos que requieren mucha mano de obra, como organizar las canalizaciones, construir infraestructuras y crear las bases de esa infraestructura. DevOps Puedes prescindir de estas tareas que requieren mucha mano de obra para AWS DDK poder centrarte en escribir código y otras actividades de gran valor.

Requisitos previos y limitaciones

Requisitos previos 

Versiones de producto

  • Python 3.7 o posterior

  • pip 9.0.3 o posterior

Arquitectura

Pila de tecnología

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon Simple Queue Service (AmazonSQS)

  • AWS DataOps Kit de desarrollo () DDK

  • AWSLambda

Arquitectura de destino

El siguiente diagrama muestra el proceso basado en eventos que incorpora, transforma y analiza los datos de Google Analytics.

Diagrama de arquitectura

En el diagrama, se muestra el siguiente flujo de trabajo:

  1. Una regla de eventos CloudWatch programados de Amazon invoca a Amazon AppFlow.

  2. Amazon AppFlow ingiere los datos de Google Analytics en un bucket de S3.

  3. Una vez que el bucket de S3 ingiere los datos, EventBridge se generan las notificaciones de eventos, las captura una regla de CloudWatch eventos y, a continuación, las coloca en una SQS cola de Amazon.

  4. Una función Lambda consume los eventos de la SQS cola de Amazon, lee los objetos S3 respectivos, transforma los objetos al formato Apache Parquet, escribe los objetos transformados en el bucket de S3 y, a continuación, crea o actualiza la definición de la tabla del catálogo de datos de AWS Glue.

  5. Una consulta de Athena realiza comparaciones con la tabla.

Herramientas

AWSherramientas

  • Amazon AppFlow es un servicio de integración totalmente gestionado que le permite intercambiar datos de forma segura entre aplicaciones de software como servicio (SaaS).

  • Amazon Athena es un servicio de consultas interactivo que le ayuda a analizar los datos directamente en Amazon S3 mediante el uso estándar. SQL

  • Amazon le CloudWatch ayuda a supervisar las métricas de sus AWS recursos y las aplicaciones en las que se ejecuta AWS en tiempo real.

  • Amazon EventBridge es un servicio de bus de eventos sin servidor que le ayuda a conectar sus aplicaciones con datos en tiempo real de diversas fuentes. Por ejemplo, funciones AWS Lambda, puntos finales de HTTP invocación que utilizan API destinos o buses de eventos en otras cuentas. AWS

  • Amazon Simple Storage Service (Amazon S3) es un servicio de almacenamiento de objetos basado en la nube que le ayuda a almacenar, proteger y recuperar cualquier cantidad de datos.

  • Amazon Simple Queue Service (AmazonSQS) proporciona una cola alojada segura, duradera y disponible que le ayuda a integrar y desacoplar sistemas y componentes de software distribuidos.

  • AWSLambda es un servicio informático que le ayuda a ejecutar código sin necesidad de aprovisionar o administrar servidores. Ejecuta el código solo cuando es necesario y amplía la capacidad de manera automática, por lo que solo pagará por el tiempo de procesamiento que utilice.

  • AWSCloud Development Kit (CDK) es un marco para definir la infraestructura de nube en el código y aprovisionarla mediante AWS CloudFormation ella.

  • AWS DataOps El kit de desarrollo (DDK) es un marco de desarrollo de código abierto que le ayuda a crear flujos de trabajo de datos y una arquitectura de datos moderna. AWS

Código

El código de este patrón está disponible en los GitHub AWS DataOps repositorios Development Kit (DDK) y Analyzing Google Analytics data with Amazon AppFlow, Amazon Athena y AWS DataOps Development Kit.

Epics

TareaDescripciónHabilidades requeridas

Clone el código fuente.

Para clonar el código fuente, ejecute el siguiente comando:

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps ingeniero

Cree un entorno virtual.

Navegue hasta el directorio de código fuente y, a continuación, ejecute el siguiente comando para crear un entorno virtual:

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps ingeniero

Instalar las dependencias.

Para activar el entorno virtual e instalar las dependencias, ejecute el siguiente comando:

source .venv/bin/activate && pip install -r requirements.txt
DevOps ingeniero
TareaDescripciónHabilidades requeridas

Inicie el entorno.

  1. Confirma que AWS CLI está configurada con credenciales válidas para tu AWS cuenta. Para obtener más información, consulte Uso de perfiles con nombre en la AWS CLI documentación.

  2. Ejecute el comando cdk bootstrap --profile [AWS_PROFILE].

DevOps ingeniero

Implemente los datos.

Para implementar el proceso de datos, ejecute el comando cdk deploy --profile [AWS_PROFILE].

DevOps ingeniero
TareaDescripciónHabilidades requeridas

Valide el estado de la pila.

  1. Abre la AWS CloudFormation consola.

  2. En la página Pilas, confirme que el estado de la pila DdkAppflowAthenaStack es CREATE_COMPLETE.

DevOps ingeniero

Resolución de problemas

ProblemaSolución

La implementación falla durante la creación de un recurso AWS::AppFlow::Flow y recibe el siguiente error: Connector Profile with name ga-connection does not exist

Confirma que has creado un AppFlow conector de Amazon para Google Analytics y le has dado un nombrega-connection.

Para obtener instrucciones, consulta Google Analytics en la AppFlow documentación de Amazon.

Recursos relacionados

Información adicional

AWSDDKlas canalizaciones de datos se componen de una o varias etapas. Los siguientes ejemplos de código emplean AppFlowIngestionStage para incorporar datos de Google Analytics, SqsToLambdaStage para gestionar la transformación de datos y AthenaSQLStage para ejecutar la consulta de Athena.

En primer lugar, se crean las etapas de transformación e incorporación de datos, como se muestra en el siguiente ejemplo de código:

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

A continuación, la DataPipeline construcción se utiliza para «conectar» las etapas mediante EventBridge reglas, como se muestra en el siguiente ejemplo de código:

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

Para ver más ejemplos de código, consulta el GitHub repositorio Análisis de datos de Google Analytics con Amazon AppFlow, Amazon Athena y AWS DataOps Development Kit.