設計図プロジェクト例 - AWS Glue

設計図プロジェクト例

データ形式の変換は、抽出、変換、ロード (ETL) で頻繁に行われるユースケースです。一般的な分析ワークロードでは、CSV や JSON などのテキスト形式よりも、Parquet や ORC などの列ベースのファイル形式の使用が好まれます。このサンプル設計図を使用すると、Amazon S3 上のファイルで、CSV/JSON/その他のデータを Parquet に変換できます。

この設計図では、設計図パラメータで定義された S3 パスのリストを取得し、そのデータを Parquet 形式に変換した上で、別の設計図パラメータで指定された S3 のロケーションに書き込んでいます。このレイアウトスクリプトは、パスごとにクローラとジョブを作成します。また、このレイアウトスクリプトでは、Conversion.py 内の ETL スクリプトを、別の設計図パラメータで指定された S3 バケットにアップロードしています。その後、このレイアウトスクリプトは、アップロードされたスクリプトを各ジョブの ETL スクリプトとして指定します。プロジェクト用の ZIP アーカイブには、レイアウトスクリプト、ETL スクリプト、および設計図の設定ファイルが含まれています。

その他の設計図プロジェクトの例については、「設計図の例」を参照してください。

以下は、Layout.py ファイル内にあるレイアウトスクリプトです。

from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * import boto3 s3_client = boto3.client('s3') # Ingesting all the S3 paths as Glue table in parquet format def generate_layout(user_params, system_params): #Always give the full path for the file with open("ConversionBlueprint/Conversion.py", "rb") as f: s3_client.upload_fileobj(f, user_params['ScriptsBucket'], "Conversion.py") etlScriptLocation = "s3://{}/Conversion.py".format(user_params['ScriptsBucket']) crawlers = [] jobs = [] workflowName = user_params['WorkflowName'] for path in user_params['S3Paths']: tablePrefix = "source_" crawler = Crawler(Name="{}_crawler".format(workflowName), Role=user_params['PassRole'], DatabaseName=user_params['TargetDatabase'], TablePrefix=tablePrefix, Targets= {"S3Targets": [{"Path": path}]}) crawlers.append(crawler) transform_job = Job(Name="{}_transform_job".format(workflowName), Command={"Name": "glueetl", "ScriptLocation": etlScriptLocation, "PythonVersion": "3"}, Role=user_params['PassRole'], DefaultArguments={"--database_name": user_params['TargetDatabase'], "--table_prefix": tablePrefix, "--region_name": system_params['region'], "--output_path": user_params['TargetS3Location']}, DependsOn={crawler: "SUCCEEDED"}, WaitForDependencies="AND") jobs.append(transform_job) conversion_workflow = Workflow(Name=workflowName, Entities=Entities(Jobs=jobs, Crawlers=crawlers)) return conversion_workflow

以下は、対応する設計図の設定ファイル blueprint.cfg です。

{ "layoutGenerator": "ConversionBlueprint.Layout.generate_layout", "parameterSpec" : { "WorkflowName" : { "type": "String", "collection": false, "description": "Name for the workflow." }, "S3Paths" : { "type": "S3Uri", "collection": true, "description": "List of Amazon S3 paths for data ingestion." }, "PassRole" : { "type": "IAMRoleName", "collection": false, "description": "Choose an IAM role to be used in running the job/crawler" }, "TargetDatabase": { "type": "String", "collection" : false, "description": "Choose a database in the Data Catalog." }, "TargetS3Location": { "type": "S3Uri", "collection" : false, "description": "Choose an Amazon S3 output path: ex:s3://<target_path>/." }, "ScriptsBucket": { "type": "S3Bucket", "collection": false, "description": "Provide an S3 bucket name(in the same AWS Region) to store the scripts." } } }

以下に示す、ファイル Conversion.py 内のスクリプトは、アップロードされた ETL スクリプトです。ここでは、変換中にパーティション分割スキームが保持されることに注意してください。

import sys from pyspark.sql.functions import * from pyspark.context import SparkContext from awsglue.transforms import * from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions import boto3 args = getResolvedOptions(sys.argv, [ 'JOB_NAME', 'region_name', 'database_name', 'table_prefix', 'output_path']) databaseName = args['database_name'] tablePrefix = args['table_prefix'] outputPath = args['output_path'] glue = boto3.client('glue', region_name=args['region_name']) glue_context = GlueContext(SparkContext.getOrCreate()) spark = glue_context.spark_session job = Job(glue_context) job.init(args['JOB_NAME'], args) def get_tables(database_name, table_prefix): tables = [] paginator = glue.get_paginator('get_tables') for page in paginator.paginate(DatabaseName=database_name, Expression=table_prefix+"*"): tables.extend(page['TableList']) return tables for table in get_tables(databaseName, tablePrefix): tableName = table['Name'] partitionList = table['PartitionKeys'] partitionKeys = [] for partition in partitionList: partitionKeys.append(partition['Name']) # Create DynamicFrame from Catalog dyf = glue_context.create_dynamic_frame.from_catalog( name_space=databaseName, table_name=tableName, additional_options={ 'useS3ListImplementation': True }, transformation_ctx='dyf' ) # Resolve choice type with make_struct dyf = ResolveChoice.apply( frame=dyf, choice='make_struct', transformation_ctx='resolvechoice_' + tableName ) # Drop null fields dyf = DropNullFields.apply( frame=dyf, transformation_ctx="dropnullfields_" + tableName ) # Write DynamicFrame to S3 in glueparquet sink = glue_context.getSink( connection_type="s3", path=outputPath, enableUpdateCatalog=True, partitionKeys=partitionKeys ) sink.setFormat("glueparquet") sink.setCatalogInfo( catalogDatabase=databaseName, catalogTableName=tableName[len(tablePrefix):] ) sink.writeFrame(dyf) job.commit()
注記

このサンプルの設計図に対し、入力として提供できる Amazon S3 パスは 2 つだけです。これは、AWS Glue トリガーが呼び出すことができるクローラアクションが、2 つだけに制限されているためです。