Sample blueprint project
Data format conversion is a frequent extract, transform, and load (ETL) use case. In typical analytic workloads, column-based file formats like Parquet or ORC are preferred over text formats like CSV or JSON. This sample blueprint enables you to convert data from CSV/JSON/etc. into Parquet for files on Amazon S3.
This blueprint takes a list of S3 paths defined by a blueprint parameter, converts the
data to Parquet format, and writes it to the S3 location specified by another blueprint
parameter. The layout script creates a crawler and job for each path. The layout script also
uploads the ETL script in Conversion.py
to an S3 bucket specified by
another blueprint parameter. The layout script then specifies the uploaded script as the ETL
script for each job. The ZIP archive for the project contains the layout script, the ETL
script, and the blueprint configuration file.
For information about more sample blueprint projects, see Blueprint samples.
The following is the layout script, in the file Layout.py
.
from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * import boto3 s3_client = boto3.client('s3') # Ingesting all the S3 paths as Glue table in parquet format def generate_layout(user_params, system_params): #Always give the full path for the file with open("ConversionBlueprint/Conversion.py", "rb") as f: s3_client.upload_fileobj(f, user_params['ScriptsBucket'], "Conversion.py") etlScriptLocation = "s3://{}/Conversion.py".format(user_params['ScriptsBucket']) crawlers = [] jobs = [] workflowName = user_params['WorkflowName'] for path in user_params['S3Paths']: tablePrefix = "source_" crawler = Crawler(Name="{}_crawler".format(workflowName), Role=user_params['PassRole'], DatabaseName=user_params['TargetDatabase'], TablePrefix=tablePrefix, Targets= {"S3Targets": [{"Path": path}]}) crawlers.append(crawler) transform_job = Job(Name="{}_transform_job".format(workflowName), Command={"Name": "glueetl", "ScriptLocation": etlScriptLocation, "PythonVersion": "3"}, Role=user_params['PassRole'], DefaultArguments={"--database_name": user_params['TargetDatabase'], "--table_prefix": tablePrefix, "--region_name": system_params['region'], "--output_path": user_params['TargetS3Location']}, DependsOn={crawler: "SUCCEEDED"}, WaitForDependencies="AND") jobs.append(transform_job) conversion_workflow = Workflow(Name=workflowName, Entities=Entities(Jobs=jobs, Crawlers=crawlers)) return conversion_workflow
The following is the corresponding blueprint configuration file
blueprint.cfg
.
{ "layoutGenerator": "ConversionBlueprint.Layout.generate_layout", "parameterSpec" : { "WorkflowName" : { "type": "String", "collection": false, "description": "Name for the workflow." }, "S3Paths" : { "type": "S3Uri", "collection": true, "description": "List of Amazon S3 paths for data ingestion." }, "PassRole" : { "type": "IAMRoleName", "collection": false, "description": "Choose an IAM role to be used in running the job/crawler" }, "TargetDatabase": { "type": "String", "collection" : false, "description": "Choose a database in the Data Catalog." }, "TargetS3Location": { "type": "S3Uri", "collection" : false, "description": "Choose an Amazon S3 output path: ex:s3://<target_path>/." }, "ScriptsBucket": { "type": "S3Bucket", "collection": false, "description": "Provide an S3 bucket name(in the same AWS Region) to store the scripts." } } }
The following script in the file Conversion.py
is the uploaded ETL
script. Note that it preserves the partitioning scheme during conversion.
import sys from pyspark.sql.functions import * from pyspark.context import SparkContext from awsglue.transforms import * from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions import boto3 args = getResolvedOptions(sys.argv, [ 'JOB_NAME', 'region_name', 'database_name', 'table_prefix', 'output_path']) databaseName = args['database_name'] tablePrefix = args['table_prefix'] outputPath = args['output_path'] glue = boto3.client('glue', region_name=args['region_name']) glue_context = GlueContext(SparkContext.getOrCreate()) spark = glue_context.spark_session job = Job(glue_context) job.init(args['JOB_NAME'], args) def get_tables(database_name, table_prefix): tables = [] paginator = glue.get_paginator('get_tables') for page in paginator.paginate(DatabaseName=database_name, Expression=table_prefix+"*"): tables.extend(page['TableList']) return tables for table in get_tables(databaseName, tablePrefix): tableName = table['Name'] partitionList = table['PartitionKeys'] partitionKeys = [] for partition in partitionList: partitionKeys.append(partition['Name']) # Create DynamicFrame from Catalog dyf = glue_context.create_dynamic_frame.from_catalog( name_space=databaseName, table_name=tableName, additional_options={ 'useS3ListImplementation': True }, transformation_ctx='dyf' ) # Resolve choice type with make_struct dyf = ResolveChoice.apply( frame=dyf, choice='make_struct', transformation_ctx='resolvechoice_' + tableName ) # Drop null fields dyf = DropNullFields.apply( frame=dyf, transformation_ctx="dropnullfields_" + tableName ) # Write DynamicFrame to S3 in glueparquet sink = glue_context.getSink( connection_type="s3", path=outputPath, enableUpdateCatalog=True, partitionKeys=partitionKeys ) sink.setFormat("glueparquet") sink.setCatalogInfo( catalogDatabase=databaseName, catalogTableName=tableName[len(tablePrefix):] ) sink.writeFrame(dyf) job.commit()
Note
Only two Amazon S3 paths can be supplied as an input to the sample blueprint. This is because AWS Glue triggers are limited to invoking only two crawler actions.