Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Tre tipi di job ETL di AWS Glue per la conversione dei dati in Apache Parquet
Creato da Adnan Alvee (AWS), Karthikeyan Ramachandran e Nith Govindasivan (AWS)
Ambiente: PoC o pilota | Tecnologie: analisi | Carico di lavoro: tutti gli altri carichi di lavoro |
Servizi AWS: AWS Glue |
Riepilogo
Sul cloud Amazon Web Services (AWS), AWS Glue è un servizio di estrazione, trasformazione e caricamento (ETL) completamente gestito. AWS Glue rende conveniente classificare i dati, pulirli, arricchirli e spostarli in modo affidabile tra vari archivi e flussi di dati.
Questo modello fornisce diversi tipi di lavoro in AWS Glue e utilizza tre diversi script per dimostrare la creazione di lavori ETL.
Puoi usare AWS Glue per scrivere lavori ETL in un ambiente shell Python. Puoi anche creare lavori ETL in batch e in streaming utilizzando Python PySpark () o Scala in un ambiente Apache Spark gestito. Per iniziare a creare lavori ETL, questo modello si concentra sui lavori ETL in batch che utilizzano la shell Python e Scala. PySpark I job in Python shell sono pensati per carichi di lavoro che richiedono una potenza di calcolo inferiore. L'ambiente Apache Spark gestito è pensato per carichi di lavoro che richiedono un'elevata potenza di calcolo.
Apache Parquet è progettato per supportare schemi di compressione e codifica efficienti. Può velocizzare i carichi di lavoro di analisi perché archivia i dati in modo colonnare. La conversione dei dati in Parquet può far risparmiare spazio, costi e tempo di archiviazione a lungo termine. Per saperne di più su Parquet, consulta il post del blog Apache Parquet: How to be a hero with the open source columnar
Prerequisiti e limitazioni
Prerequisiti
Ruolo AWS Identity and Access Management (IAM) (se non hai un ruolo, consulta la sezione Informazioni aggiuntive).
Architettura
Stack tecnologico Target
AWS Glue
Amazon Simple Storage Service (Amazon S3)
Apache Parquet
Automazione e scalabilità
I flussi di lavoro AWS Glue supportano l'automazione completa di una pipeline ETL.
Puoi modificare il numero di unità di elaborazione dati (DPU) o i tipi di worker per scalare orizzontalmente e verticalmente.
Strumenti
Servizi AWS
Amazon Simple Storage Service (Amazon S3) è un servizio di archiviazione degli oggetti basato sul cloud che consente di archiviare, proteggere e recuperare qualsiasi quantità di dati.
AWS Glue è un servizio ETL completamente gestito per la categorizzazione, la pulizia, l'arricchimento e lo spostamento dei dati tra vari archivi e flussi di dati.
Altri strumenti
Apache Parquet
è un formato di file di dati open source orientato alle colonne progettato per l'archiviazione e il recupero.
Configurazione
Utilizza le seguenti impostazioni per configurare la potenza di calcolo di AWS Glue ETL. Per ridurre i costi, utilizza le impostazioni minime quando esegui il carico di lavoro fornito in questo schema.
Shell Python: è possibile utilizzare 1 DPU per utilizzare 16 GB di memoria o 0,0625 DPU per utilizzare 1 GB di memoria. Questo modello utilizza 0,0625 DPU, che è l'impostazione predefinita nella console AWS Glue.
Python o Scala per Spark: se scegli i tipi di lavoro relativi a Spark nella console, AWS Glue per impostazione predefinita utilizza 10 worker e il tipo di worker G.1X. Questo modello utilizza due lavoratori, che è il numero minimo consentito, con il tipo di lavoratore standard, che è sufficiente ed economico.
La tabella seguente mostra i diversi tipi di worker AWS Glue per l'ambiente Apache Spark. Poiché un job della shell Python non utilizza l'ambiente Apache Spark per eseguire Python, non è incluso nella tabella.
Standard | G.1X | G.2X | |
---|---|---|---|
VPCU | 4 | 4 | 8 |
Memoria | 16 GB | 16 GB | 32 GB |
Spazio su disco | 50 GB | 64 GB | 128 GB |
Esecutore per lavoratore | 2 | 1 | 1 |
Codice
Per il codice utilizzato in questo modello, inclusa la configurazione del ruolo e dei parametri IAM, consulta la sezione Informazioni aggiuntive.
Epiche
Attività | Descrizione | Competenze richieste |
---|---|---|
Carica i dati in un bucket S3 nuovo o esistente. | Crea o usa un bucket S3 esistente nel tuo account. Carica il file sample_data.csv dalla sezione Allegati e annota la posizione del bucket S3 e del prefisso. | Informazioni generali su AWS |
Attività | Descrizione | Competenze richieste |
---|---|---|
Crea il job AWS Glue. | Nella sezione ETL della console AWS Glue, aggiungi un job AWS Glue. Seleziona il tipo di lavoro appropriato, la versione di AWS Glue e il tipo di DPU/Worker corrispondente e il numero di lavoratori. Per i dettagli, consulta la sezione Configurazione. | Sviluppatore, cloud o dati |
Modifica le posizioni di input e output. | Copia il codice corrispondente al tuo job AWS Glue e modifica la posizione di input e output che hai annotato nell'epopea Upload the data. | Sviluppatore, cloud o dati |
Configura i parametri. | È possibile utilizzare gli snippet forniti nella sezione Informazioni aggiuntive per impostare i parametri per il job ETL. AWS Glue utilizza internamente quattro nomi di argomenti:
Il È necessario aggiungere | Sviluppatore, cloud o dati |
Esegui il job ETL. | Esegui il tuo lavoro e controlla l'output. Nota quanto spazio è stato ridotto rispetto al file originale. | Sviluppatore, cloud o dati |
Risorse correlate
Riferimenti
Tutorial e video
Informazioni aggiuntive
Ruolo IAM
Quando crei i job AWS Glue, puoi utilizzare un ruolo IAM esistente con le autorizzazioni mostrate nel seguente frammento di codice o un nuovo ruolo.
Per creare un nuovo ruolo, usa il seguente codice YAML.
# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]
Shell Python di AWS Glue
Il codice Python utilizza Pandas e le PyArrow librerie per convertire i dati in Parquet. La libreria Pandas è già disponibile. La PyArrow libreria viene scaricata quando si esegue il pattern, perché viene eseguita una sola volta. È possibile utilizzare i file wheel PyArrow per convertirli in una libreria e fornire il file come pacchetto di libreria. Per ulteriori informazioni sulla creazione di pacchetti di file wheel, consultate Fornire la propria libreria Python.
Parametri della shell AWS Glue Python
from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])
Codice shell AWS Glue Python
from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1] s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False) s3_resource.Object(output_loc_bucket, output_loc_prefix + 'data' + '.parquet').put(Body=parquet_buffer.getvalue())
Processo AWS Glue Spark con Python
Per utilizzare un tipo di lavoro AWS Glue Spark con Python, scegli Spark come tipo di lavoro. Scegli Spark 3.1, Python 3 con tempi di avvio del processo migliorati (Glue versione 3.0) come versione AWS Glue.
Parametri di AWS Glue Python
from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])
Processo AWS Glue Spark con codice Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\ connection_type = "s3", \ connection_options = { "paths": [input_loc]}, \ format = "csv", format_options={ "withHeader": True, "separator": "," }) outputDF = glueContext.write_dynamic_frame.from_options(\ frame = inputDyf, \ connection_type = "s3", \ connection_options = {"path": output_loc \ }, format = "parquet")
Per un gran numero di file compressi di grandi dimensioni (ad esempio, 1.000 file di circa 3 MB ciascuno), usa il compressionType
parametro con il recurse
parametro per leggere tutti i file disponibili all'interno del prefisso, come mostrato nel codice seguente.
input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )
Per un numero elevato di file compressi di piccole dimensioni (ad esempio 1.000 file di circa 133 KB ciascuno), utilizzate il groupFiles
parametro insieme ai parametri compressionType
e. recurse
Il groupFiles
parametro raggruppa file di piccole dimensioni in più file di grandi dimensioni e controlla il raggruppamento alla dimensione specificata in byte (ad esempio, 1 MB). groupSize
Il seguente frammento di codice fornisce un esempio di utilizzo di questi parametri all'interno del codice.
input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )
Senza alcuna modifica nei nodi di lavoro, queste impostazioni consentono al job AWS Glue di leggere più file (grandi o piccoli, con o senza compressione) e di scriverli sulla destinazione in formato Parquet.
Lavoro in AWS Glue Spark con Scala
Per utilizzare un tipo di lavoro AWS Glue Spark con Scala, scegli Spark come tipo di lavoro e Language come Scala. Scegli Spark 3.1, Scala 2 con tempi di avvio del lavoro migliorati (Glue versione 3.0) come versione AWS Glue. Per risparmiare spazio di archiviazione, anche il seguente esempio di AWS Glue with Scala utilizza la applyMapping
funzionalità per convertire i tipi di dati.
Parametri di AWS Glue Scala
import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)
Job AWS Glue Spark con codice Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp { def main(sysArgs: Array[String]) { @transient val spark: SparkContext = SparkContext.getOrCreate() val glueContext: GlueContext = new GlueContext(spark) val inputLoc = "s3://bucket-name/prefix/sample_data.csv" val outputLoc = "s3://bucket-name/prefix/" val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame() val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"), ("_c2", "string", "profit", "double")), caseSensitive = false) val formatPartition = applyMapping.toDF().coalesce(1) val dynamicFrame = DynamicFrame(formatPartition, glueContext) val dataSink = glueContext.getSinkWithFormat( connectionType = "s3", options = JsonOptions(Map("path" -> outputLoc )), transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame) } }