Uma conversão de formato de dados é um caso de uso frequente de extração, transformação e carregamento (ETL). Em workloads analíticas típicas, formatos de arquivo baseados em colunas, como Parquet ou ORC, são preferidos em relação a formatos de texto, como CSV ou JSON. Esse blueprint de exemplo permite converter dados de CSV/JSON/etc. em Parquet para arquivos no Amazon S3.
Esse blueprint obtém uma lista de caminhos do S3 definidos por um parâmetro de blueprint, converte os dados no formato Parquet e os grava no local do S3 especificado por outro parâmetro de blueprint. O script de layout cria um crawler e um trabalho para cada caminho. O script de layout também carrega o script de ETL no Conversion.py
para um bucket do S3 especificado por outro parâmetro do blueprint. Em seguida, o script de layout especifica o script carregado como o script de ETL para cada trabalho. O arquivo ZIP do projeto contém o script de layout, o script de ETL e o arquivo de configuração do blueprint.
Para obter mais informações sobre exemplos de projetos de blueprint, consulte Esquemas de exemplo.
A seguir está o script de layout, no arquivo Layout.py
.
from awsglue.blueprint.workflow import *
from awsglue.blueprint.job import *
from awsglue.blueprint.crawler import *
import boto3
s3_client = boto3.client('s3')
# Ingesting all the S3 paths as Glue table in parquet format
def generate_layout(user_params, system_params):
#Always give the full path for the file
with open("ConversionBlueprint/Conversion.py", "rb") as f:
s3_client.upload_fileobj(f, user_params['ScriptsBucket'], "Conversion.py")
etlScriptLocation = "s3://{}/Conversion.py".format(user_params['ScriptsBucket'])
crawlers = []
jobs = []
workflowName = user_params['WorkflowName']
for path in user_params['S3Paths']:
tablePrefix = "source_"
crawler = Crawler(Name="{}_crawler".format(workflowName),
Role=user_params['PassRole'],
DatabaseName=user_params['TargetDatabase'],
TablePrefix=tablePrefix,
Targets= {"S3Targets": [{"Path": path}]})
crawlers.append(crawler)
transform_job = Job(Name="{}_transform_job".format(workflowName),
Command={"Name": "glueetl",
"ScriptLocation": etlScriptLocation,
"PythonVersion": "3"},
Role=user_params['PassRole'],
DefaultArguments={"--database_name": user_params['TargetDatabase'],
"--table_prefix": tablePrefix,
"--region_name": system_params['region'],
"--output_path": user_params['TargetS3Location']},
DependsOn={crawler: "SUCCEEDED"},
WaitForDependencies="AND")
jobs.append(transform_job)
conversion_workflow = Workflow(Name=workflowName, Entities=Entities(Jobs=jobs, Crawlers=crawlers))
return conversion_workflow
A seguir está o arquivo de configuração do blueprint correspondente blueprint.cfg
.
{
"layoutGenerator": "ConversionBlueprint.Layout.generate_layout",
"parameterSpec" : {
"WorkflowName" : {
"type": "String",
"collection": false,
"description": "Name for the workflow."
},
"S3Paths" : {
"type": "S3Uri",
"collection": true,
"description": "List of Amazon S3 paths for data ingestion."
},
"PassRole" : {
"type": "IAMRoleName",
"collection": false,
"description": "Choose an IAM role to be used in running the job/crawler"
},
"TargetDatabase": {
"type": "String",
"collection" : false,
"description": "Choose a database in the Data Catalog."
},
"TargetS3Location": {
"type": "S3Uri",
"collection" : false,
"description": "Choose an Amazon S3 output path: ex:s3://<target_path>/."
},
"ScriptsBucket": {
"type": "S3Bucket",
"collection": false,
"description": "Provide an S3 bucket name(in the same AWS Region) to store the scripts."
}
}
}
O script a seguir no arquivo Conversion.py
é o script de ETL carregado. Observe que ele preserva o esquema de particionamento durante a conversão.
import sys
from pyspark.sql.functions import *
from pyspark.context import SparkContext
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
import boto3
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'region_name',
'database_name',
'table_prefix',
'output_path'])
databaseName = args['database_name']
tablePrefix = args['table_prefix']
outputPath = args['output_path']
glue = boto3.client('glue', region_name=args['region_name'])
glue_context = GlueContext(SparkContext.getOrCreate())
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args['JOB_NAME'], args)
def get_tables(database_name, table_prefix):
tables = []
paginator = glue.get_paginator('get_tables')
for page in paginator.paginate(DatabaseName=database_name, Expression=table_prefix+"*"):
tables.extend(page['TableList'])
return tables
for table in get_tables(databaseName, tablePrefix):
tableName = table['Name']
partitionList = table['PartitionKeys']
partitionKeys = []
for partition in partitionList:
partitionKeys.append(partition['Name'])
# Create DynamicFrame from Catalog
dyf = glue_context.create_dynamic_frame.from_catalog(
name_space=databaseName,
table_name=tableName,
additional_options={
'useS3ListImplementation': True
},
transformation_ctx='dyf'
)
# Resolve choice type with make_struct
dyf = ResolveChoice.apply(
frame=dyf,
choice='make_struct',
transformation_ctx='resolvechoice_' + tableName
)
# Drop null fields
dyf = DropNullFields.apply(
frame=dyf,
transformation_ctx="dropnullfields_" + tableName
)
# Write DynamicFrame to S3 in glueparquet
sink = glue_context.getSink(
connection_type="s3",
path=outputPath,
enableUpdateCatalog=True,
partitionKeys=partitionKeys
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(
catalogDatabase=databaseName,
catalogTableName=tableName[len(tablePrefix):]
)
sink.writeFrame(dyf)
job.commit()
nota
Somente dois caminhos do Amazon S3 podem ser fornecidos como uma entrada para o blueprint de exemplo. Isso porque os acionadores do AWS Glue podem invocar apenas duas ações de crawler.