Progetto di schema di esempio - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Progetto di schema di esempio

La conversione del formato dei dati è un caso d'uso frequente di estrazione, trasformazione e caricamento (ETL). Nei carichi di lavoro analitici tipici, i formati di file basati su colonne come Parquet o ORC sono preferiti rispetto ai formati di testo come CSV o JSON. Questo piano di esempio consente di convertire i dati da CSV/JSON/ecc. in Parquet per i file su Amazon S3.

Questo piano accetta un elenco di percorsi S3 definiti da un parametro del piano, converte i dati in formato Parquet e li scrive nella posizione S3 specificata da un altro parametro del piano. Lo script di layout crea un crawler e un processo per ogni percorso. Lo script di layout carica anche lo script ETL in Conversion.py in un bucket S3 specificato da un altro parametro del piano. Lo script di layout specifica quindi lo script caricato come script ETL per ogni processo. L'archivio ZIP del progetto contiene lo script di layout, lo script ETL e il file di configurazione del piano.

Per ulteriori informazioni su altri piani di esempio, consulta Esempi di schema.

Di seguito è riportato lo script di layout, nel file Layout.py.

from awsglue.blueprint.workflow import * from awsglue.blueprint.job import * from awsglue.blueprint.crawler import * import boto3 s3_client = boto3.client('s3') # Ingesting all the S3 paths as Glue table in parquet format def generate_layout(user_params, system_params): #Always give the full path for the file with open("ConversionBlueprint/Conversion.py", "rb") as f: s3_client.upload_fileobj(f, user_params['ScriptsBucket'], "Conversion.py") etlScriptLocation = "s3://{}/Conversion.py".format(user_params['ScriptsBucket']) crawlers = [] jobs = [] workflowName = user_params['WorkflowName'] for path in user_params['S3Paths']: tablePrefix = "source_" crawler = Crawler(Name="{}_crawler".format(workflowName), Role=user_params['PassRole'], DatabaseName=user_params['TargetDatabase'], TablePrefix=tablePrefix, Targets= {"S3Targets": [{"Path": path}]}) crawlers.append(crawler) transform_job = Job(Name="{}_transform_job".format(workflowName), Command={"Name": "glueetl", "ScriptLocation": etlScriptLocation, "PythonVersion": "3"}, Role=user_params['PassRole'], DefaultArguments={"--database_name": user_params['TargetDatabase'], "--table_prefix": tablePrefix, "--region_name": system_params['region'], "--output_path": user_params['TargetS3Location']}, DependsOn={crawler: "SUCCEEDED"}, WaitForDependencies="AND") jobs.append(transform_job) conversion_workflow = Workflow(Name=workflowName, Entities=Entities(Jobs=jobs, Crawlers=crawlers)) return conversion_workflow

Di seguito è riportato il corrispondente file blueprint.cfg di configurazione del piano.

{ "layoutGenerator": "ConversionBlueprint.Layout.generate_layout", "parameterSpec" : { "WorkflowName" : { "type": "String", "collection": false, "description": "Name for the workflow." }, "S3Paths" : { "type": "S3Uri", "collection": true, "description": "List of Amazon S3 paths for data ingestion." }, "PassRole" : { "type": "IAMRoleName", "collection": false, "description": "Choose an IAM role to be used in running the job/crawler" }, "TargetDatabase": { "type": "String", "collection" : false, "description": "Choose a database in the Data Catalog." }, "TargetS3Location": { "type": "S3Uri", "collection" : false, "description": "Choose an Amazon S3 output path: ex:s3://<target_path>/." }, "ScriptsBucket": { "type": "S3Bucket", "collection": false, "description": "Provide an S3 bucket name(in the same AWS Region) to store the scripts." } } }

Il seguente script nel file Conversion.py è lo script ETL caricato. Nota che durante la conversione mantiene lo schema di partizionamento.

import sys from pyspark.sql.functions import * from pyspark.context import SparkContext from awsglue.transforms import * from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions import boto3 args = getResolvedOptions(sys.argv, [ 'JOB_NAME', 'region_name', 'database_name', 'table_prefix', 'output_path']) databaseName = args['database_name'] tablePrefix = args['table_prefix'] outputPath = args['output_path'] glue = boto3.client('glue', region_name=args['region_name']) glue_context = GlueContext(SparkContext.getOrCreate()) spark = glue_context.spark_session job = Job(glue_context) job.init(args['JOB_NAME'], args) def get_tables(database_name, table_prefix): tables = [] paginator = glue.get_paginator('get_tables') for page in paginator.paginate(DatabaseName=database_name, Expression=table_prefix+"*"): tables.extend(page['TableList']) return tables for table in get_tables(databaseName, tablePrefix): tableName = table['Name'] partitionList = table['PartitionKeys'] partitionKeys = [] for partition in partitionList: partitionKeys.append(partition['Name']) # Create DynamicFrame from Catalog dyf = glue_context.create_dynamic_frame.from_catalog( name_space=databaseName, table_name=tableName, additional_options={ 'useS3ListImplementation': True }, transformation_ctx='dyf' ) # Resolve choice type with make_struct dyf = ResolveChoice.apply( frame=dyf, choice='make_struct', transformation_ctx='resolvechoice_' + tableName ) # Drop null fields dyf = DropNullFields.apply( frame=dyf, transformation_ctx="dropnullfields_" + tableName ) # Write DynamicFrame to S3 in glueparquet sink = glue_context.getSink( connection_type="s3", path=outputPath, enableUpdateCatalog=True, partitionKeys=partitionKeys ) sink.setFormat("glueparquet") sink.setCatalogInfo( catalogDatabase=databaseName, catalogTableName=tableName[len(tablePrefix):] ) sink.writeFrame(dyf) job.commit()
Nota

Solo due percorsi Amazon S3 possono essere forniti come input per il piano di esempio. Questo perché le attivazioni di AWS Glue possono richiamare solo due operazioni crawler.