AWS DataOps 개발 키트를 사용하여 Google 애널리틱스 데이터를 수집, 변환, 분석할 수 있는 데이터 파이프라인을 구축하세요. - AWS 권장 가이드

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS DataOps 개발 키트를 사용하여 Google 애널리틱스 데이터를 수집, 변환, 분석할 수 있는 데이터 파이프라인을 구축하세요.

작성자: 안톤 쿠쿠스킨 (AWS) 과 루디 푸이그 () AWS

코드 리포지토리: AWS DDK예제 - Amazon AppFlow, Amazon Athena, 개발 키트를 사용한 구글 애널리틱스 데이터 분석 AWS DataOps

환경: PoC 또는 파일럿

기술: DataLakes; 애널리틱스 DevOps; 인프라

워크로드: 오픈 소스

AWS서비스: 아마존 AppFlow, 아마존 아테나, AWS CDK 람다AWS, 아마존 S3

요약

이 패턴은 AWS DataOps Development Kit (DDK) 및 기타 서비스를 사용하여 Google 애널리틱스 데이터를 수집, 변환 및 분석하기 위한 데이터 파이프라인을 구축하는 방법을 설명합니다. AWS 데이터 워크플로와 최신 데이터 아키텍처를 구축하는 데 도움이 되는 오픈 소스 개발 AWS DDK 프레임워크입니다. AWS 의 주요 목표 중 하나는 파이프라인 조정, 인프라 구축, 인프라 구축과 같은 노동 집약적인 데이터 파이프라인 작업에 일반적으로 소요되는 시간과 노력을 절약하는 것입니다. AWS DDK DevOps 이러한 노동 집약적인 작업을 AWS DDK 오프로드하여 코드 작성 및 기타 고부가가치 활동에 집중할 수 있습니다.

사전 조건 및 제한 사항

사전 조건 

  • 활성 계정 AWS

  • 구글 애널리틱스용 Amazon AppFlow 커넥터, 구성

  • Pythonpip(Python의 패키지 관리자)

  • Git, 설치 및 구성됨

  • AWS명령줄 인터페이스 (AWSCLI), 설치구성

  • AWSCloud Development Kit (AWSCDK), 설치됨

제품 버전

  • Python 3.7 이상

  • pip 9.0.3 이상

아키텍처

기술 스택

  • 아마존 AppFlow

  • Amazon Athena

  • 아마존 CloudWatch

  • 아마존 EventBridge

  • Amazon Simple Storage Service(S3)

  • 아마존 심플 큐 서비스 (아마존SQS)

  • AWS DataOps 개발 키트 (DDK)

  • AWS람다

대상 아키텍처 

다음 다이어그램은 Google Analytics 데이터를 수집, 변환, 분석하는 이벤트 기반 프로세스를 보여줍니다.

아키텍처 다이어그램

이 다이어그램은 다음 워크플로를 보여줍니다.

  1. 아마존 CloudWatch 예약 이벤트 규칙은 Amazon을 호출합니다. AppFlow

  2. Amazon은 AppFlow 구글 애널리틱스 데이터를 S3 버킷으로 수집합니다.

  3. S3 버킷에서 데이터를 수집한 후 이벤트 알림이 EventBridge 생성되고 CloudWatch Events 규칙에 의해 캡처된 다음 Amazon SQS 대기열에 추가됩니다.

  4. Lambda 함수는 SQS Amazon 대기열에서 이벤트를 사용하고, 각 S3 객체를 읽고, 객체를 Apache Parquet 형식으로 변환하고, 변환된 객체를 S3 버킷에 쓴 다음 Glue Data Catalog 테이블 정의를 생성하거나 업데이트합니다. AWS

  5. Athena 쿼리는 테이블에 대해 실행됩니다.

도구

AWS도구

  • AppFlowAmazon은 SaaS (서비스형 소프트웨어) 애플리케이션 간에 데이터를 안전하게 교환할 수 있는 완전 관리형 통합 서비스입니다.

  • Amazon Athena는 표준을 사용하여 Amazon S3에서 직접 데이터를 분석할 수 있도록 지원하는 대화형 쿼리 서비스입니다. SQL

  • Amazon은 실행 중인 AWS 리소스 및 애플리케이션의 지표를 AWS 실시간으로 모니터링할 수 있도록 CloudWatch 도와줍니다.

  • EventBridgeAmazon은 다양한 소스의 실시간 데이터와 애플리케이션을 연결하는 데 도움이 되는 서버리스 이벤트 버스 서비스입니다. AWSLambda 함수HTTP, 대상을 API 사용하는 호출 엔드포인트 또는 다른 계정의 이벤트 버스를 예로 들 수 있습니다. AWS

  • Amazon Simple Storage Service(S3)는 원하는 양의 데이터를 저장, 보호 및 검색하는 데 도움이 되는 클라우드 기반 객체 스토리지 서비스입니다.

  • Amazon Simple Queue Service (AmazonSQS) 는 분산 소프트웨어 시스템 및 구성 요소를 통합하고 분리하는 데 도움이 되는 안전하고 안정적이며 사용 가능한 호스팅 대기열을 제공합니다.

  • AWSLambda는 서버를 프로비저닝하거나 관리할 필요 없이 코드를 실행할 수 있는 컴퓨팅 서비스입니다. 필요할 때만 코드를 실행하며 자동으로 확장이 가능하므로 사용한 컴퓨팅 시간만큼만 비용을 지불합니다.

  • AWSCloud Development Kit (CDK) 는 코드로 클라우드 인프라를 정의하고 이를 통해 AWS CloudFormation 프로비저닝하기 위한 프레임워크입니다.

  • AWS DataOps Development Kit (DDK) 는 데이터 워크플로와 최신 데이터 아키텍처를 구축하는 데 도움이 되는 오픈 소스 개발 프레임워크입니다. AWS

코드

이 패턴의 코드는 GitHub AWS DataOps 개발 키트 (DDK) 및 Amazon AppFlow, Amazon Athena 및 개발 키트 리포지토리를 사용한 Google 애널리틱스 데이터 분석에서 확인할 수 있습니다. AWS DataOps

에픽

작업설명필요한 기술

소스 코드를 복제합니다.

다음 명령을 실행하여 소스 코드를 빌드합니다.

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps 엔지니어

가상 환경을 생성합니다.

소스 코드 디렉터리로 이동한 후, 다음 명령을 실행하여 가상 환경을 생성합니다.

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps 엔지니어

종속성을 설치합니다.

가상 환경을 활성화하고 종속성을 설치하려면 다음 명령을 실행합니다.

source .venv/bin/activate && pip install -r requirements.txt
DevOps 엔지니어
작업설명필요한 기술

환경을 부트스트랩합니다.

  1. 이 (가) 사용자 AWS 계정의 유효한 자격 증명으로 AWS CLI 설정되어 있는지 확인하십시오. 자세한 내용은 AWS CLI 설명서의 명명된 프로필 사용을 참조하십시오.

  2. cdk bootstrap --profile [AWS_PROFILE] 명령을 실행합니다.

DevOps 엔지니어

데이터를 배포합니다.

데이터 파이프라인을 배포하려면 cdk deploy --profile [AWS_PROFILE] 명령을 실행합니다.

DevOps 엔지니어
작업설명필요한 기술

스택 상태를 확인합니다.

  1. AWS CloudFormation 콘솔을 여세요.

  2. 스택 페이지에서 스택 DdkAppflowAthenaStack 상태가 CREATE_COMPLETE인지 확인합니다.

DevOps 엔지니어

문제 해결

문제Solution

AWS::AppFlow::Flow 리소스 생성 중에 배포 실패 및 Connector Profile with name ga-connection does not exist 오류가 나타납니다.

Google 애널리틱스용 Amazon AppFlow 커넥터를 만들고 이름을 지정했는지 확인하십시오ga-connection.

지침은 Amazon AppFlow 설명서의 Google 애널리틱스를 참조하십시오.

관련 리소스

추가 정보

AWSDDK데이터 파이프라인은 하나 이상의 단계로 구성됩니다. 다음 코드 예제에서는 AppFlowIngestionStage를 사용하여 Google Analytics에서 데이터를 수집하고, SqsToLambdaStage를 사용하여 데이터 변환을 처리하고, AthenaSQLStage를 사용하여 Athena 쿼리를 실행합니다.

먼저 다음과 같은 코드 예제에서 볼 수 있듯이 데이터 변환 및 수집 단계가 생성됩니다.

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

다음으로, 다음 코드 예제에서 볼 수 있듯이 DataPipeline 구문을 사용하여 EventBridge 규칙을 사용하여 스테이지를 “연결”합니다.

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

더 많은 코드 예제는 Amazon AppFlow, Amazon Athena 및 AWS DataOps 개발 키트 리포지토리를 GitHub 사용한 Google 애널리틱스 데이터 분석을 참조하십시오.