Tres tipos de ETL trabajos de AWS Glue para convertir datos a Apache Parquet - Recomendaciones de AWS

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Tres tipos de ETL trabajos de AWS Glue para convertir datos a Apache Parquet

Creado por Adnan Alvee (AWS), Karthikeyan Ramachandran y Nith Govindasivan () AWS

Entorno: PoC o piloto

Tecnologías: análisis

Carga de trabajo: todas las demás cargas de trabajo

AWSservicios: AWS Glue

Resumen

En la nube de Amazon Web Services (AWS), AWS Glue es un servicio de extracción, transformación y carga (ETL) totalmente gestionado. AWSGlue hace que sea rentable clasificar los datos, limpiarlos, enriquecerlos y moverlos de forma fiable entre varios almacenes de datos y flujos de datos.

Este patrón proporciona diferentes tipos de trabajos en AWS Glue y utiliza tres scripts diferentes para mostrar los ETL trabajos de creación.

Puedes usar AWS Glue para escribir ETL trabajos en un entorno de shell de Python. También puede crear ETL trabajos por lotes y de streaming mediante Python (PySpark) o Scala en un entorno gestionado de Apache Spark. Para empezar con la creación de ETL trabajos, este patrón se centra en los ETL trabajos por lotes que utilizan Python shell y Scala. PySpark Los trabajos de intérprete de comandos de Python están pensados para cargas de trabajo que requieren menos potencia de cálculo. El entorno gestionado de Apache Spark está diseñado para cargas de trabajo que requieren una gran potencia de cálculo.

Apache Parquet está diseñado para admitir esquemas de compresión y codificación eficientes. Puede acelerar sus cargas de trabajo de análisis porque almacena los datos en forma de columnas. La conversión de datos a Parquet puede ahorrarle espacio de almacenamiento, costos y tiempo a largo plazo. Para obtener más información sobre Parquet, consulte la entrada del blog Apache Parquet: cómo ser un héroe con el formato de datos en columnas de código abierto.

Requisitos previos y limitaciones

Requisitos previos 

  • AWSFunción Identity and Access Management (IAM) (si no tiene ninguna función, consulte la sección Información adicional).

Arquitectura

Pila de tecnología de destino

  • AWS Glue

  • Amazon Simple Storage Service (Amazon S3)

  • Apache Parquet

Automatizar y escalar

  • AWSLos flujos de trabajo de Glue permiten la automatización total de una ETL canalización.

  • Puede cambiar el número de unidades de procesamiento de datos (DPUs), o los tipos de trabajadores, para escalar horizontal y verticalmente.

Herramientas

AWSservicios

  • Amazon Simple Storage Service (Amazon S3) es un servicio de almacenamiento de objetos basado en la nube que le ayuda a almacenar, proteger y recuperar cualquier cantidad de datos.

  • AWSGlue es un ETL servicio totalmente gestionado para categorizar, limpiar, enriquecer y mover sus datos entre varios almacenes de datos y flujos de datos.

Otras herramientas

  • Apache Parquet es un formato de archivo de datos de código abierto orientado por columnas diseñado para el almacenamiento y la recuperación.

Configuración

Utilice los siguientes ajustes para configurar la potencia de cálculo de AWS GlueETL. Para reducir los costos, utilice la configuración mínima cuando ejecute la carga de trabajo que se proporciona en este patrón. 

  • Shell de Python: puede usar 1 DPU para utilizar 16 GB de memoria o 0.0625 DPU para utilizar 1 GB de memoria. Este patrón usa 0.0625DPU, que es el valor predeterminado en la consola de AWS Glue.

  • Python o Scala para Spark: si eliges los tipos de trabajo relacionados con Spark en la consola, AWS Glue usa de forma predeterminada 10 trabajadores y el tipo de trabajo G.1X. Este patrón utiliza dos trabajadores, que es el número mínimo permitido, y el tipo de trabajador estándar es suficiente y rentable.

La siguiente tabla muestra los distintos tipos de trabajadores de AWS Glue para el entorno Apache Spark. Como un trabajo de intérprete de comandos de Python no utiliza el entorno Apache Spark para ejecutar Python, no se incluye en la tabla.

Estándar

G.1 X

G.2X

v CPU

4

4

8

Memoria

16 GB

16 GB

32 GB

Espacio en disco

50 GB

64 GB

128 GB

Ejecutor por trabajo

2.

1

Código

Para ver el código que se utiliza en este patrón, incluida la configuración del IAM rol y el parámetro, consulte la sección de información adicional.

Epics

TareaDescripciónHabilidades requeridas

Cargue los datos en un bucket de S3 nuevo o en un bucket de S3 ya existente.

Cree utilice un bucket de S3 existente en su cuenta. Cargue el archivo sample_data.csv de la sección de Adjuntos y anote el bucket de S3 y la ubicación del prefijo.

General AWS
TareaDescripciónHabilidades requeridas

Crea el trabajo de AWS Glue.

En la ETL sección de la consola de AWS Glue, añade un trabajo de AWS Glue. Seleccione el tipo de trabajo adecuado, la versión de AWS Glue y el tipo DPU /Worker y el número de trabajadores correspondientes. Para más información, consulte la sección Configuración.

Desarrollador, nube o datos

Cambie las ubicaciones de entrada y salida.

Copia el código correspondiente a tu trabajo de AWS Glue y cambia la ubicación de entrada y salida que indicaste en la epopeya Carga los datos.

Desarrollador, nube o datos

Configure los parámetros.

Puedes usar los fragmentos que se proporcionan en la sección de información adicional para configurar los parámetros de tu ETL trabajo. AWSGlue usa cuatro nombres de argumentos internamente:

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

El --JOB_NAME parámetro debe introducirse de forma explícita en la consola de AWS Glue. Elija Trabajos, Edit Job (Editar trabajo), Security configuration, script libraries, and job parameters (optional) (Configuración de seguridad, bibliotecas de scripts y parámetros de trabajo (opcional)). Introduzca --JOB_NAME como clave y proporcione un valor. También puede utilizar la interfaz de línea de AWS comandos (AWSCLI) o AWS Glue API para configurar este parámetro. Spark usa el parámetro --JOB_NAME y no es necesario en un trabajo de entorno de intérprete de comandos de Python.

Debe añadir -- antes del nombre de cada parámetro; de lo contrario, el código no funcionará. Por ejemplo, en el caso de los fragmentos de código, los parámetros de ubicación deben invocarse mediante --input_loc y --output_loc.

Desarrollador, nube o datos

Ejecute el ETL trabajo.

Ejecute su trabajo y compruebe el resultado. Observe cuánto espacio se ha reducido con respecto al archivo original.

Desarrollador, nube o datos

Recursos relacionados

Referencias

Tutoriales y videos

Información adicional

IAMpapel

Al crear los trabajos de AWS Glue, puede usar un IAM rol existente que tenga los permisos que se muestran en el siguiente fragmento de código o un rol nuevo.

Para crear un rol nuevo, usa el siguiente YAML código.

# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]

AWSCáscara Glue Python

El código Python usa los Pandas y PyArrow las bibliotecas para convertir los datos a Parquet. La biblioteca Pandas ya está disponible. La PyArrow biblioteca se descarga al ejecutar el patrón, ya que se ejecuta una sola vez. Puede usar archivos de rueda PyArrow para convertirlos en una biblioteca y proporcionar el archivo como un paquete de biblioteca. Para obtener más información, consulte Proporcionar su propia biblioteca de Python

AWSParámetros de shell de Glue Python

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

AWSGlue Python: código shell

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1]  s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False)  s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())

AWSTrabajo de Glue Spark con Python

Para usar un tipo de trabajo de AWS Glue Spark con Python, elige Spark como tipo de trabajo. Elija Spark 3.1, Python 3 con un tiempo de inicio de trabajo mejorado (Glue versión 3.0) como versión AWS Glue.

AWSParámetros de Glue Python

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

AWSTrabajo de Glue Spark con código Python

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\     connection_type = "s3", \     connection_options = {          "paths": [input_loc]}, \     format = "csv",     format_options={         "withHeader": True,         "separator": ","     }) outputDF = glueContext.write_dynamic_frame.from_options(\     frame = inputDyf, \     connection_type = "s3", \     connection_options = {"path": output_loc \         }, format = "parquet")    

En el caso de un gran número de archivos comprimidos de gran tamaño (por ejemplo, 1000 archivos de aproximadamente 3 MB cada uno), utilice el parámetro compressionType junto con el parámetro recurse para leer todos los archivos disponibles en el prefijo, tal y como se muestra en el código siguiente.

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

Para un gran número de archivos pequeños comprimidos (por ejemplo, 1000 archivos de aproximadamente 133 KB cada uno), utilice el parámetrogroupFiles junto con los parámetros compressionType y recurse. El parámetro groupFiles agrupa los archivos pequeños en varios archivos grandes y el parámetro groupSize controla la agrupación según el tamaño especificado en bytes (por ejemplo, 1 MB). El siguiente fragmento de código proporciona un ejemplo del uso de estos parámetros en el código.

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

Sin ningún cambio en los nodos de trabajo, estos ajustes permiten al trabajo de AWS Glue leer varios archivos (grandes o pequeños, con o sin compresión) y escribirlos en el destino en formato Parquet.

AWSTrabaja con Glue Spark con Scala

Para usar un tipo de trabajo de AWS Glue Spark con Scala, elige Spark como tipo de trabajo e Idioma como Scala. Elija Spark 3.1, Scala 2 con un tiempo de inicio de trabajo mejorado (Glue Version 3.0) como versión AWS Glue. Para ahorrar espacio de almacenamiento, en el siguiente ejemplo de AWS Glue with Scala también se utiliza la applyMapping función para convertir tipos de datos.

AWSParámetros de Glue Scala

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

AWSTrabajo de Glue Spark con código Scala

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp {   def main(sysArgs: Array[String]) {          @transient val spark: SparkContext = SparkContext.getOrCreate()     val glueContext: GlueContext = new GlueContext(spark)     val inputLoc = "s3://bucket-name/prefix/sample_data.csv"     val outputLoc = "s3://bucket-name/prefix/"     val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()     val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),     ("_c2", "string", "profit", "double")), caseSensitive = false)     val formatPartition = applyMapping.toDF().coalesce(1)     val dynamicFrame = DynamicFrame(formatPartition, glueContext)     val dataSink = glueContext.getSinkWithFormat(         connectionType = "s3",          options = JsonOptions(Map("path" -> outputLoc )),         transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)   } }

Conexiones

Para acceder al contenido adicional asociado a este documento, descomprima el archivo: attachment.zip