Drei AWS Glue ETL-Auftragstypen für die Konvertierung von Daten in Apache Parquet - AWS Prescriptive Guidance

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Drei AWS Glue ETL-Auftragstypen für die Konvertierung von Daten in Apache Parquet

Erstellt von Adnan Alvee (AWS), Karthikeyan Ramachandran und Nith Govindasivan (AWS)

Umgebung: PoC oder Pilotprojekt

Technologien: Analytik

Arbeitslast: Alle anderen Workloads

AWS-Services: AWS Glue

Übersicht

In der Amazon Web Services (AWS) Cloud ist AWS Glue ein vollständig verwalteter Service zum Extrahieren, Transformieren und Laden (ETL). Mit AWS Glue können Sie Ihre Daten kostengünstig kategorisieren, bereinigen, anreichern und zuverlässig zwischen verschiedenen Datenspeichern und Datenströmen verschieben.

Dieses Muster bietet verschiedene Auftragstypen in AWS Glue und verwendet drei verschiedene Skripts, um die Erstellung von ETL-Jobs zu demonstrieren.

Sie können AWS Glue verwenden, um ETL-Jobs in einer Python-Shell-Umgebung zu schreiben. Sie können auch Batch- und Streaming-ETL-Jobs mithilfe von Python (PySpark) oder Scala in einer verwalteten Apache Spark-Umgebung erstellen. Um Ihnen den Einstieg in die Erstellung von ETL-Jobs zu erleichtern, konzentriert sich dieses Muster auf Batch-ETL-Jobs mit Python-Shell und Scala. PySpark Python-Shell-Jobs sind für Workloads gedacht, die weniger Rechenleistung benötigen. Die verwaltete Apache Spark-Umgebung ist für Workloads gedacht, die eine hohe Rechenleistung erfordern.

Apache Parquet wurde entwickelt, um effiziente Komprimierungs- und Kodierungsschemata zu unterstützen. Es kann Ihre Analytics-Workloads beschleunigen, da es Daten spaltenweise speichert. Durch die Konvertierung von Daten in Parquet können Sie auf längere Sicht Speicherplatz, Kosten und Zeit sparen. Weitere Informationen zu Parquet finden Sie im Blogbeitrag Apache Parquet: How to be a hero with the open-source columnar data format.

Voraussetzungen und Einschränkungen

Voraussetzungen

  • Rolle AWS Identity and Access Management (IAM) (Wenn Sie noch keine Rolle haben, lesen Sie den Abschnitt Zusätzliche Informationen.)

Architektur

Zieltechnologie-Stack

  • AWS Glue

  • Amazon-Simple-Storage-Service (Amazon-S3)

  • Apache Parquet

Automatisierung und Skalierung

  • AWS Glue Glue-Workflows unterstützen die vollständige Automatisierung einer ETL-Pipeline.

  • Sie können die Anzahl der Datenverarbeitungseinheiten (DPUs) oder Workertypen ändern, um sie horizontal und vertikal zu skalieren.

Tools

AWS-Services

  • Amazon Simple Storage Service (Amazon S3) ist ein cloudbasierter Objektspeicherservice, der Sie beim Speichern, Schützen und Abrufen beliebiger Datenmengen unterstützt.

  • AWS Glue ist ein vollständig verwalteter ETL-Service zum Kategorisieren, Bereinigen, Anreichern und Verschieben Ihrer Daten zwischen verschiedenen Datenspeichern und Datenströmen.

Andere Tools

  • Apache Parquet ist ein spaltenorientiertes Open-Source-Datendateiformat, das zum Speichern und Abrufen entwickelt wurde.

Konfiguration

Verwenden Sie die folgenden Einstellungen für die Konfiguration der Rechenleistung von AWS Glue ETL. Um die Kosten zu senken, sollten Sie die minimalen Einstellungen verwenden, wenn Sie den Workload ausführen, der in diesem Muster bereitgestellt wird. 

  • Python-Shell — Sie können 1 DPU verwenden, um 16 GB Speicher zu nutzen, oder 0,0625 DPU, um 1 GB Speicher zu nutzen. Dieses Muster verwendet 0,0625 DPU, was die Standardeinstellung in der AWS Glue Glue-Konsole ist.

  • Python oder Scala für Spark — Wenn Sie die Spark-bezogenen Jobtypen in der Konsole auswählen, verwendet AWS Glue standardmäßig 10 Worker und den G.1X-Workertyp. Bei diesem Muster werden zwei Worker verwendet, was der zulässigen Mindestanzahl entspricht. Der Standard-Worker-Typ ist ausreichend und kostengünstig.

In der folgenden Tabelle sind die verschiedenen AWS Glue Glue-Worker-Typen für die Apache Spark-Umgebung aufgeführt. Da ein Python-Shell-Job die Apache Spark-Umgebung nicht zum Ausführen von Python verwendet, ist er nicht in der Tabelle enthalten.

Standard

G.1X

G.2X

vCPU

4

4

8

Arbeitsspeicher

16 GB

16 GB

32 GB

Festplattenkapazität

50 GB

64 GB

128 GB

Testamentsvollstrecker pro Arbeiter

2

1

Code

Den Code, der in diesem Muster verwendet wird, einschließlich der IAM-Rolle und der Parameterkonfiguration, finden Sie im Abschnitt Zusätzliche Informationen.

Epen

AufgabeBeschreibungErforderliche Fähigkeiten

Laden Sie die Daten in einen neuen oder vorhandenen S3-Bucket hoch.

Erstellen oder verwenden Sie einen vorhandenen S3-Bucket in Ihrem Konto. Laden Sie die Datei sample_data.csv aus dem Bereich Anlagen hoch und notieren Sie sich den S3-Bucket und den Speicherort des Präfixes.

Allgemeines AWS
AufgabeBeschreibungErforderliche Fähigkeiten

Erstellen Sie den AWS Glue Glue-Job.

Fügen Sie im ETL-Bereich der AWS Glue Glue-Konsole einen AWS Glue Glue-Job hinzu. Wählen Sie den entsprechenden Jobtyp, die AWS Glue Glue-Version und den entsprechenden DPU-/Worker-Typ und die Anzahl der Worker aus. Einzelheiten finden Sie im Abschnitt Konfiguration.

Entwickler, Cloud oder Daten

Ändern Sie die Eingabe- und Ausgabeorte.

Kopieren Sie den Code, der Ihrem AWS Glue Glue-Job entspricht, und ändern Sie den Eingabe- und Ausgabeort, den Sie im Abschnitt Daten hochladen notiert haben.

Entwickler, Cloud oder Daten

Konfigurieren Sie die Parameter.

Sie können die im Abschnitt Zusätzliche Informationen bereitgestellten Codefragmente verwenden, um Parameter für Ihren ETL-Job festzulegen. AWS Glue verwendet intern vier Argumentnamen:

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

Der --JOB_NAME Parameter muss explizit in der AWS Glue Glue-Konsole eingegeben werden. Wählen Sie Jobs, Job bearbeiten, Sicherheitskonfiguration, Skriptbibliotheken und Jobparameter (optional). Geben Sie --JOB_NAME den Schlüssel ein und geben Sie einen Wert ein. Sie können diesen Parameter auch über die AWS-Befehlszeilenschnittstelle (AWS CLI) oder die AWS Glue Glue-API festlegen. Der --JOB_NAME Parameter wird von Spark verwendet und wird in einem Python-Shell-Umgebungsjob nicht benötigt.

Sie müssen -- vor jedem Parameter einen Namen hinzufügen, sonst funktioniert der Code nicht. Für die Codefragmente müssen die Standortparameter beispielsweise mit und aufgerufen werden. --input_loc --output_loc

Entwickler, Cloud oder Daten

Führen Sie den ETL-Job aus.

Führen Sie Ihren Job aus und überprüfen Sie die Ausgabe. Beachten Sie, wie viel Speicherplatz gegenüber der Originaldatei reduziert wurde.

Entwickler, Cloud oder Daten

Zugehörige Ressourcen

Referenzen

Tutorials und Videos

Zusätzliche Informationen

IAM role (IAM-Rolle)

Wenn Sie die AWS Glue Glue-Jobs erstellen, können Sie entweder eine vorhandene IAM-Rolle mit den im folgenden Codeausschnitt angegebenen Berechtigungen oder eine neue Rolle verwenden.

Verwenden Sie den folgenden YAML-Code, um eine neue Rolle zu erstellen.

# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]

AWS Glue Python-Shell

Der Python-Code verwendet die Pandas und PyArrow Bibliotheken, um Daten in Parquet zu konvertieren. Die Pandas-Bibliothek ist bereits verfügbar. Die PyArrow Bibliothek wird heruntergeladen, wenn Sie das Pattern ausführen, da es sich um eine einmalige Ausführung handelt. Sie können Raddateien verwenden, um sie in eine Bibliothek PyArrow zu konvertieren und die Datei als Bibliothekspaket bereitzustellen. Weitere Informationen zum Verpacken von Raddateien finden Sie unter Bereitstellen einer eigenen Python-Bibliothek.

Python-Shell-Parameter von AWS Glue

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

Python-Shellcode von AWS Glue

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1]  s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False)  s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())

AWS Glue Spark-Auftrag mit Python

Um einen AWS Glue Spark-Auftragstyp mit Python zu verwenden, wählen Sie Spark als Auftragstyp. Wählen Sie Spark 3.1, Python 3 mit verbesserter Jobstartzeit (Glue Version 3.0) als AWS Glue Glue-Version.

Python-Parameter von AWS Glue

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

AWS Glue Spark-Job mit Python-Code

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\     connection_type = "s3", \     connection_options = {          "paths": [input_loc]}, \     format = "csv",     format_options={         "withHeader": True,         "separator": ","     }) outputDF = glueContext.write_dynamic_frame.from_options(\     frame = inputDyf, \     connection_type = "s3", \     connection_options = {"path": output_loc \         }, format = "parquet")    

Verwenden Sie für eine große Anzahl komprimierter großer Dateien (z. B. 1.000 Dateien mit einer Größe von jeweils etwa 3 MB) den compressionType Parameter mit dem recurse Parameter, um alle Dateien zu lesen, die innerhalb des Präfixes verfügbar sind, wie im folgenden Code gezeigt.

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

Verwenden Sie für eine große Anzahl komprimierter kleiner Dateien (z. B. 1.000 Dateien mit jeweils etwa 133 KB) den groupFiles Parameter zusammen mit den recurse Parametern compressionType und. Der groupFiles Parameter gruppiert kleine Dateien in mehrere große Dateien, und der groupSize Parameter steuert die Gruppierung auf die angegebene Größe in Byte (z. B. 1 MB). Der folgende Codeausschnitt bietet ein Beispiel für die Verwendung dieser Parameter im Code.

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

Ohne Änderung der Worker-Knoten ermöglichen diese Einstellungen dem AWS Glue Glue-Job, mehrere Dateien (groß oder klein, mit oder ohne Komprimierung) zu lesen und sie im Parquet-Format auf das Ziel zu schreiben.

AWS Glue Spark-Auftrag mit Scala

Um einen AWS Glue Spark-Auftragstyp mit Scala zu verwenden, wählen Sie Spark als Auftragstyp und Sprache als Scala. Wählen Sie Spark 3.1, Scala 2 mit verbesserter Jobstartzeit (Glue Version 3.0) als AWS Glue Glue-Version. Um Speicherplatz zu sparen, verwendet das folgende Beispiel für AWS Glue mit Scala die applyMapping Funktion auch zum Konvertieren von Datentypen.

AWS Glue Scala-Parameter

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

AWS Glue Spark-Job mit Scala-Code

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp {   def main(sysArgs: Array[String]) {          @transient val spark: SparkContext = SparkContext.getOrCreate()     val glueContext: GlueContext = new GlueContext(spark)     val inputLoc = "s3://bucket-name/prefix/sample_data.csv"     val outputLoc = "s3://bucket-name/prefix/"     val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()     val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),     ("_c2", "string", "profit", "double")), caseSensitive = false)     val formatPartition = applyMapping.toDF().coalesce(1)     val dynamicFrame = DynamicFrame(formatPartition, glueContext)     val dataSink = glueContext.getSinkWithFormat(         connectionType = "s3",          options = JsonOptions(Map("path" -> outputLoc )),         transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)   } }

Anlagen

Um auf zusätzliche Inhalte zuzugreifen, die mit diesem Dokument verknüpft sind, entpacken Sie die folgende Datei: attachment.zip