Crie um pipeline de dados para ingerir, transformar e analisar dados do Google Analytics usando o AWS DataOps Development Kit - Recomendações da AWS

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Crie um pipeline de dados para ingerir, transformar e analisar dados do Google Analytics usando o AWS DataOps Development Kit

Criado por Anton Kukushkin (AWS) e Rudy Puig () AWS

Repositório de código: AWS DDKexemplos - Análise de dados do Google Analytics com Amazon AppFlow, Amazon Athena e Development Kit AWS DataOps

Ambiente: PoC ou piloto

Tecnologias: DataLakes; Análise DevOps; Infraestrutura

Workload: código aberto

AWSserviços: Amazon AppFlow; Amazon Athena; AWS CDK AWS Lambda; Amazon S3

Resumo

Esse padrão descreve como criar um pipeline de dados para ingerir, transformar e analisar dados do Google Analytics usando o AWS DataOps Development Kit (DDK) e outros AWS serviços. AWSDDKÉ uma estrutura de desenvolvimento de código aberto que ajuda você a criar fluxos de trabalho de dados e uma arquitetura de dados moderna em. AWS Um dos principais objetivos do AWS DDK é economizar o tempo e o esforço que normalmente são dedicados às tarefas trabalhosas do pipeline de dados, como orquestrar pipelines, construir infraestrutura e criar a base dessa infraestrutura. DevOps Você pode transferir essas tarefas trabalhosas para AWS DDK se concentrar em escrever código e outras atividades de alto valor.

Pré-requisitos e limitações

Pré-requisitos

Versões do produto

  • Python 3.7 ou superior

  • pip 9.0.3 ou superior

Arquitetura

Pilha de tecnologia

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon Simple Queue Service (AmazonSQS)

  • AWS DataOps Kit de desenvolvimento (DDK)

  • AWSLambda

Arquitetura de destino

O diagrama a seguir mostra o processo orientado por eventos que ingere, transforma e analisa os dados do Google Analytics.

Diagrama de arquitetura

O diagrama mostra o seguinte fluxo de trabalho:

  1. Uma regra de evento CloudWatch agendado da Amazon invoca a Amazon. AppFlow

  2. A Amazon AppFlow ingere dados do Google Analytics em um bucket do S3.

  3. Depois que os dados são ingeridos pelo bucket do S3, as notificações de eventos EventBridge são geradas, capturadas por uma regra de CloudWatch eventos e colocadas em uma fila da AmazonSQS.

  4. Uma função Lambda consome eventos da SQS fila da Amazon, lê os respectivos objetos do S3, transforma os objetos no formato Apache Parquet, grava os objetos transformados no bucket do S3 e, em seguida, cria ou atualiza a definição da tabela do Glue Data Catalog. AWS

  5. Uma consulta do Athena é executada na tabela.

Ferramentas

AWSferramentas

  • AppFlowA Amazon é um serviço de integração totalmente gerenciado que permite que você troque dados com segurança entre aplicativos de software como serviço (SaaS).

  • O Amazon Athena é um serviço de consulta interativo que ajuda você a analisar dados diretamente no Amazon S3 usando o padrão. SQL

  • CloudWatchA Amazon ajuda você a monitorar as métricas dos seus AWS recursos e dos aplicativos em que você executa AWS em tempo real.

  • EventBridgeA Amazon é um serviço de ônibus de eventos sem servidor que ajuda você a conectar seus aplicativos com dados em tempo real de várias fontes. Por exemplo, funções AWS Lambda, endpoints de HTTP invocação usando API destinos ou barramentos de eventos em outras contas. AWS

  • O Amazon Simple Storage Service (Amazon S3) é um serviço de armazenamento de objetos baseado na nuvem que ajuda você a armazenar, proteger e recuperar qualquer quantidade de dados.

  • O Amazon Simple Queue Service (AmazonSQS) fornece uma fila hospedada segura, durável e disponível que ajuda você a integrar e desacoplar sistemas e componentes de software distribuídos.

  • AWSO Lambda é um serviço de computação que ajuda você a executar código sem precisar provisionar ou gerenciar servidores. Ele executa o código somente quando necessário e dimensiona automaticamente, assim, você paga apenas pelo tempo de computação usado.

  • AWSO Cloud Development Kit (CDK) é uma estrutura para definir a infraestrutura de nuvem em código e provisioná-la por meio dela. AWS CloudFormation

  • AWS DataOps O Development Kit (DDK) é uma estrutura de desenvolvimento de código aberto para ajudá-lo a criar fluxos de trabalho de dados e uma arquitetura de dados moderna. AWS

Código

O código desse padrão está disponível nos repositórios GitHub AWS DataOps Development Kit (DDK) e Analisando dados do Google Analytics com Amazon AppFlow, Amazon Athena e AWS DataOps Development Kit.

Épicos

TarefaDescriçãoHabilidades necessárias

Clone o código-fonte.

Para clonar o código-fonte, execute o seguinte comando:

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps engenheiro

Crie um ambiente virtual.

Navegue até o diretório do código-fonte e execute o seguinte comando para criar um ambiente virtual:

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps engenheiro

Instale as dependências.

Para ativar o ambiente virtual e instalar as dependências, execute o seguinte comando:

source .venv/bin/activate && pip install -r requirements.txt
DevOps engenheiro
TarefaDescriçãoHabilidades necessárias

Faça o bootstrap do ambiente.

  1. Confirme se o AWS CLI está configurado com credenciais válidas para sua AWS conta. Para obter mais informações, consulte Usando perfis nomeados na AWS CLI documentação.

  2. Execute o comando cdk bootstrap --profile [AWS_PROFILE].

DevOps engenheiro

Implante os dados.

Para implantar o pipeline de dados, execute o comando cdk deploy --profile [AWS_PROFILE].

DevOps engenheiro
TarefaDescriçãoHabilidades necessárias

Valide o status da pilha.

  1. Abra o AWS CloudFormation console.

  2. Na página Stacks, confirme se o status da pilha DdkAppflowAthenaStack é CREATE_COMPLETE.

DevOps engenheiro

Solução de problemas

ProblemaSolução

A implantação falha durante a criação de um recurso AWS::AppFlow::Flow e você recebe o seguinte erro: Connector Profile with name ga-connection does not exist

Confirme se você criou um AppFlow conector Amazon para o Google Analytics e o nomeouga-connection.

Para obter instruções, consulte o Google Analytics na AppFlow documentação da Amazon.

Recursos relacionados

Mais informações

AWSDDKos pipelines de dados são compostos por um ou vários estágios. Nos exemplos de código a seguir, você usa AppFlowIngestionStage para ingerir dados do Google Analytics, SqsToLambdaStage lidar com a transformação de dados e AthenaSQLStage para executar a consulta do Athena.

Primeiro, os estágios de transformação e ingestão de dados são criados, conforme mostra o exemplo de código a seguir:

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

Em seguida, a DataPipeline construção é usada para “conectar” os estágios usando EventBridge regras, como mostra o exemplo de código a seguir:

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

Para ver mais exemplos de código, consulte o repositório GitHub Analisando dados do Google Analytics com Amazon AppFlow, Amazon Athena e AWS DataOps Development Kit.