Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Tres tipos de ETL trabajos de AWS Glue para convertir datos a Apache Parquet
Creado por Adnan Alvee (AWS), Karthikeyan Ramachandran y Nith Govindasivan () AWS
Entorno: PoC o piloto | Tecnologías: análisis | Carga de trabajo: todas las demás cargas de trabajo |
AWSservicios: AWS Glue |
Resumen
En la nube de Amazon Web Services (AWS), AWS Glue es un servicio de extracción, transformación y carga (ETL) totalmente gestionado. AWSGlue hace que sea rentable clasificar los datos, limpiarlos, enriquecerlos y moverlos de forma fiable entre varios almacenes de datos y flujos de datos.
Este patrón proporciona diferentes tipos de trabajos en AWS Glue y utiliza tres scripts diferentes para mostrar los ETL trabajos de creación.
Puedes usar AWS Glue para escribir ETL trabajos en un entorno de shell de Python. También puede crear ETL trabajos por lotes y de streaming mediante Python (PySpark) o Scala en un entorno gestionado de Apache Spark. Para empezar con la creación de ETL trabajos, este patrón se centra en los ETL trabajos por lotes que utilizan Python shell y Scala. PySpark Los trabajos de intérprete de comandos de Python están pensados para cargas de trabajo que requieren menos potencia de cálculo. El entorno gestionado de Apache Spark está diseñado para cargas de trabajo que requieren una gran potencia de cálculo.
Apache Parquet está diseñado para admitir esquemas de compresión y codificación eficientes. Puede acelerar sus cargas de trabajo de análisis porque almacena los datos en forma de columnas. La conversión de datos a Parquet puede ahorrarle espacio de almacenamiento, costos y tiempo a largo plazo. Para obtener más información sobre Parquet, consulte la entrada del blog Apache Parquet: cómo ser un héroe con el formato de datos en columnas de código abierto
Requisitos previos y limitaciones
Requisitos previos
AWSFunción Identity and Access Management (IAM) (si no tiene ninguna función, consulte la sección Información adicional).
Arquitectura
Pila de tecnología de destino
AWS Glue
Amazon Simple Storage Service (Amazon S3)
Apache Parquet
Automatizar y escalar
AWSLos flujos de trabajo de Glue permiten la automatización total de una ETL canalización.
Puede cambiar el número de unidades de procesamiento de datos (DPUs), o los tipos de trabajadores, para escalar horizontal y verticalmente.
Herramientas
AWSservicios
Amazon Simple Storage Service (Amazon S3) es un servicio de almacenamiento de objetos basado en la nube que le ayuda a almacenar, proteger y recuperar cualquier cantidad de datos.
AWSGlue es un ETL servicio totalmente gestionado para categorizar, limpiar, enriquecer y mover sus datos entre varios almacenes de datos y flujos de datos.
Otras herramientas
Apache Parquet
es un formato de archivo de datos de código abierto orientado por columnas diseñado para el almacenamiento y la recuperación.
Configuración
Utilice los siguientes ajustes para configurar la potencia de cálculo de AWS GlueETL. Para reducir los costos, utilice la configuración mínima cuando ejecute la carga de trabajo que se proporciona en este patrón.
Shell de Python: puede usar 1 DPU para utilizar 16 GB de memoria o 0.0625 DPU para utilizar 1 GB de memoria. Este patrón usa 0.0625DPU, que es el valor predeterminado en la consola de AWS Glue.
Python o Scala para Spark: si eliges los tipos de trabajo relacionados con Spark en la consola, AWS Glue usa de forma predeterminada 10 trabajadores y el tipo de trabajo G.1X. Este patrón utiliza dos trabajadores, que es el número mínimo permitido, y el tipo de trabajador estándar es suficiente y rentable.
La siguiente tabla muestra los distintos tipos de trabajadores de AWS Glue para el entorno Apache Spark. Como un trabajo de intérprete de comandos de Python no utiliza el entorno Apache Spark para ejecutar Python, no se incluye en la tabla.
Estándar | G.1 X | G.2X | |
---|---|---|---|
v CPU | 4 | 4 | 8 |
Memoria | 16 GB | 16 GB | 32 GB |
Espacio en disco | 50 GB | 64 GB | 128 GB |
Ejecutor por trabajo | 2. | 1 | 1 |
Código
Para ver el código que se utiliza en este patrón, incluida la configuración del IAM rol y el parámetro, consulte la sección de información adicional.
Epics
Tarea | Descripción | Habilidades requeridas |
---|---|---|
Cargue los datos en un bucket de S3 nuevo o en un bucket de S3 ya existente. | Cree utilice un bucket de S3 existente en su cuenta. Cargue el archivo sample_data.csv de la sección de Adjuntos y anote el bucket de S3 y la ubicación del prefijo. | General AWS |
Tarea | Descripción | Habilidades requeridas |
---|---|---|
Crea el trabajo de AWS Glue. | En la ETL sección de la consola de AWS Glue, añade un trabajo de AWS Glue. Seleccione el tipo de trabajo adecuado, la versión de AWS Glue y el tipo DPU /Worker y el número de trabajadores correspondientes. Para más información, consulte la sección Configuración. | Desarrollador, nube o datos |
Cambie las ubicaciones de entrada y salida. | Copia el código correspondiente a tu trabajo de AWS Glue y cambia la ubicación de entrada y salida que indicaste en la epopeya Carga los datos. | Desarrollador, nube o datos |
Configure los parámetros. | Puedes usar los fragmentos que se proporcionan en la sección de información adicional para configurar los parámetros de tu ETL trabajo. AWSGlue usa cuatro nombres de argumentos internamente:
El Debe añadir | Desarrollador, nube o datos |
Ejecute el ETL trabajo. | Ejecute su trabajo y compruebe el resultado. Observe cuánto espacio se ha reducido con respecto al archivo original. | Desarrollador, nube o datos |
Recursos relacionados
Referencias
Tutoriales y videos
Información adicional
IAMpapel
Al crear los trabajos de AWS Glue, puede usar un IAM rol existente que tenga los permisos que se muestran en el siguiente fragmento de código o un rol nuevo.
Para crear un rol nuevo, usa el siguiente YAML código.
# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]
AWSCáscara Glue Python
El código Python usa los Pandas y PyArrow las bibliotecas para convertir los datos a Parquet. La biblioteca Pandas ya está disponible. La PyArrow biblioteca se descarga al ejecutar el patrón, ya que se ejecuta una sola vez. Puede usar archivos de rueda PyArrow para convertirlos en una biblioteca y proporcionar el archivo como un paquete de biblioteca. Para obtener más información, consulte Proporcionar su propia biblioteca de Python
AWSParámetros de shell de Glue Python
from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])
AWSGlue Python: código shell
from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1] s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False) s3_resource.Object(output_loc_bucket, output_loc_prefix + 'data' + '.parquet').put(Body=parquet_buffer.getvalue())
AWSTrabajo de Glue Spark con Python
Para usar un tipo de trabajo de AWS Glue Spark con Python, elige Spark como tipo de trabajo. Elija Spark 3.1, Python 3 con un tiempo de inicio de trabajo mejorado (Glue versión 3.0) como versión AWS Glue.
AWSParámetros de Glue Python
from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])
AWSTrabajo de Glue Spark con código Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\ connection_type = "s3", \ connection_options = { "paths": [input_loc]}, \ format = "csv", format_options={ "withHeader": True, "separator": "," }) outputDF = glueContext.write_dynamic_frame.from_options(\ frame = inputDyf, \ connection_type = "s3", \ connection_options = {"path": output_loc \ }, format = "parquet")
En el caso de un gran número de archivos comprimidos de gran tamaño (por ejemplo, 1000 archivos de aproximadamente 3 MB cada uno), utilice el parámetro compressionType
junto con el parámetro recurse
para leer todos los archivos disponibles en el prefijo, tal y como se muestra en el código siguiente.
input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )
Para un gran número de archivos pequeños comprimidos (por ejemplo, 1000 archivos de aproximadamente 133 KB cada uno), utilice el parámetrogroupFiles
junto con los parámetros compressionType
y recurse
. El parámetro groupFiles
agrupa los archivos pequeños en varios archivos grandes y el parámetro groupSize
controla la agrupación según el tamaño especificado en bytes (por ejemplo, 1 MB). El siguiente fragmento de código proporciona un ejemplo del uso de estos parámetros en el código.
input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )
Sin ningún cambio en los nodos de trabajo, estos ajustes permiten al trabajo de AWS Glue leer varios archivos (grandes o pequeños, con o sin compresión) y escribirlos en el destino en formato Parquet.
AWSTrabaja con Glue Spark con Scala
Para usar un tipo de trabajo de AWS Glue Spark con Scala, elige Spark como tipo de trabajo e Idioma como Scala. Elija Spark 3.1, Scala 2 con un tiempo de inicio de trabajo mejorado (Glue Version 3.0) como versión AWS Glue. Para ahorrar espacio de almacenamiento, en el siguiente ejemplo de AWS Glue with Scala también se utiliza la applyMapping
función para convertir tipos de datos.
AWSParámetros de Glue Scala
import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)
AWSTrabajo de Glue Spark con código Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp { def main(sysArgs: Array[String]) { @transient val spark: SparkContext = SparkContext.getOrCreate() val glueContext: GlueContext = new GlueContext(spark) val inputLoc = "s3://bucket-name/prefix/sample_data.csv" val outputLoc = "s3://bucket-name/prefix/" val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame() val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"), ("_c2", "string", "profit", "double")), caseSensitive = false) val formatPartition = applyMapping.toDF().coalesce(1) val dynamicFrame = DynamicFrame(formatPartition, glueContext) val dataSink = glueContext.getSinkWithFormat( connectionType = "s3", options = JsonOptions(Map("path" -> outputLoc )), transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame) } }
Conexiones
Para acceder al contenido adicional asociado a este documento, descomprima el archivo: attachment.zip