三種 AWS Glue ETL 任務類型,可將資料轉換為 Apache 實木地板 - AWS 方案指引

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

三種 AWS Glue ETL 任務類型,可將資料轉換為 Apache 實木地板

由阿德南阿爾維(AWS),卡爾蒂安·拉馬干德蘭和尼特·戈文達西文(AWS)創建

環境:PoC 或試點

技術:分析

工作負載:所有其他工作

AWS 服務:AWS AWS Glue

Summary

在 Amazon Web Services (AWS) 雲端上,AWS Glue 是全受管的擷取、轉換和載入 (ETL) 服務。AWS Glue 讓您能夠以符合成本效益的方式,將資料分類、清理、豐富資料,以及在各種資料存放區和資料串流之間可靠地移動資料。

此模式在 AWS Glue 中提供不同的任務類型,並使用三種不同的指令碼來示範編寫 ETL 任務。

您可以使用 AWS Glue 在 Python 殼層環境中撰寫 ETL 任務。您也可以在託管的 Apache 星火環境中使用 Python(PySpark)或斯卡拉創建批處理和流 ETL 任務。為了讓您開始編寫 ETL 作業,此模式著重於使用 Python 殼層和 Scala 的批次 ETL 作業。 PySparkPython 殼層作業適用於需要較低運算能力的工作負載。受管理的 Apache Spark 環境適用於需要高運算能力的工作負載。

阿帕奇實木複合地板是建立支持高效的壓縮和編碼方案。它可以加速您的分析工作負載,因為它以單欄式方式儲存資料。在長期運行中將數據轉換為 Parquet 可以節省您的存儲空間,成本和時間。要了解有關鑲木地板的更多信息,請參閱博客文章 Apache Parquet:如何使用開源柱狀數據格式成為英雄

先決條件和限制

先決條件

  • AWS Identity and Access Management (IAM) 角色 (如果您沒有角色,請參閱其他資訊一節)。

架構

目標技術堆疊

  • AWS Glue

  • Amazon Simple Storage Service (Amazon S3)

  • Apache Parquet

自動化和規模

  • AWS Glue 工作流程支援 ETL 管道的完全自動化。

  • 您可以變更資料處理單位 (DPU) 或 Worker 類型的數目,以水平和垂直縮放。

工具

AWS 服務

  • Amazon Simple Storage Service (Amazon S3) 是一種雲端型物件儲存服務,可協助您儲存、保護和擷取任何數量的資料。

  • AWS Glue 是全受管 ETL 服務,可在各種資料存放區和資料串流之間分類、清理、豐富和移動資料。

其他工具

組態

使用下列設定來設定 AWS Glue ETL 的運算能力。若要降低成本,請在執行此模式中提供的工作負載時使用最小設定。 

  • Python 外殼 — 您可以使用 1 個 DPU 來利用 16 GB 的記憶體,或使用 0.0625 個 DPU 來利用 1 GB 的記憶體。此模式使用 0.0625 DPU,這是 AWS Glue 主控台中的預設值。

  • 適用於 Spark 的 Python 或 Scala — 如果您在主控台中選擇與火花相關的任務類型,AWS Glue 預設會使用 10 個工作程式和 G.1X 工作者類型。此模式使用兩個 Worker,這是允許的最小數目,具有標準 Worker 類型,這足夠且具有成本效益。

下表顯示 Apache Spark 環境的不同 AWS AWS Glue 工作者類型。因為 Python 殼層作業不使用 Apache 星火環境來執行 Python,所以它不會包含在資料表中。

標準

G.1X

G.2X

vCPU

4

4

8

記憶體

16 GB

16 GB

32 GB

磁碟空間

50 GB

64 GB

128 GB

每名工人的執行人

2

1

Code

如需此模式中使用的程式碼 (包括 IAM 角色和參數設定),請參閱其他資訊一節。

史诗

任務描述所需技能

將資料上傳到新的或現有的 S3 儲存貯體。

在您的帳戶中建立或使用現有的 S3 儲存貯體。從「附件」區段上傳 sample_data.csv 檔案,並記下 S3 儲存貯體和前置碼的位置。

一般 AWS
任務描述所需技能

建立 AWS AWS Glue 任務。

在 AWS Glue 主控台的 ETL 區段下,新增 AWS AWS Glue 任務。選取適當的任務類型、AWS Glue 版本,以及對應的 DPU/ 工作者類型和工作者數目。如需詳細資訊,請參閱「組態」一節。

開發人員、雲端或資料

更改輸入和輸出位置。

複製與 AWS Glue 任務對應的程式碼,然後變更您在上傳資料史詩中記下的輸入和輸出位置。

開發人員、雲端或資料

設定參數。

您可以使用「其他資訊」區段中提供的片段來設定 ETL 工作的參數。AWS Glue 在內部使用四個引數名稱:

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

必須在 AWS Glue 主控台上明確輸入--JOB_NAME參數。選擇 「Job」、「編輯作」、「安全性組態」、「命令檔程式庫」和「工作參數」(選 輸入--JOB_NAME作為鍵並提供一個值。您也可以使用 AWS Command Line Interface (AWS CLI) (AWS CLI) 或 AWS Glue API 來設定此參數。該--JOB_NAME參數由星火使用,並且不需要在 Python 外殼環境作業。

您必須--在每個參數名稱之前加入;否則,程式碼將無法運作。例如,對於程式碼片段,位置參數必須由--input_loc和叫用--output_loc

開發人員、雲端或資料

執行 ETL 工作。

運行您的作業並檢查輸出。請注意原始檔案減少了多少空間。

開發人員、雲端或資料

相關資源

參考

教學課程和影片

其他資訊

IAM 角色

建立 AWS Glue 任務時,您可以使用具有下列程式碼片段中顯示許可的現有 IAM 角色,或使用新角色。

若要建立新角色,請使用下列 YAML 程式碼。

# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]

AWS AWS Glue Python 殼

Python 代碼使用熊貓和 PyArrow 庫將數據轉換為實木複合地板。熊貓圖書館已經可用。當您執行病毒碼時,會下載程式 PyArrow 庫,因為這是一次性執行。您可以使用 wheel 檔案轉換 PyArrow 為資源庫,並將檔案做為資源庫套件提供。如需有關封裝輪子檔案的詳細資訊,請參閱提供您自己的 Python 程式庫

AWS Glue Python 外殼參數

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

AWS Glue Python 殼代碼

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1]  s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False)  s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())

使用 Python 的 AWS AWS Glue 火花任務

若要搭配 Python 使用 AWS AWS Glue 星火工作類型,請選擇星火做為任務類型。選擇火花 3.1、Python 3,並改善任務啟動時間 (Glue 3.0 版) 做為 AWS AWS Glue 版本。

AWS Glue Python 參數

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

使用 Python 代碼進行 AWS AWS Glue 火花任務

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\     connection_type = "s3", \     connection_options = {          "paths": [input_loc]}, \     format = "csv",     format_options={         "withHeader": True,         "separator": ","     }) outputDF = glueContext.write_dynamic_frame.from_options(\     frame = inputDyf, \     connection_type = "s3", \     connection_options = {"path": output_loc \         }, format = "parquet")    

對於大量壓縮的大型檔案 (例如,1,000 個檔案大約 3 MB),請使用compressionType參數搭配recurse參數來讀取前置碼內可用的所有檔案,如下列程式碼所示。

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

對於大量壓縮的小檔案 (例如,1,000 個檔案大約 133 KB),請使用groupFiles參數以compressionType及和recurse參數。groupFiles參數會將小型檔案群組成多個大檔案,而groupSize參數則控制群組為以位元組為單位的指定大小 (例如 1 MB)。下列程式碼片段提供在程式碼中使用這些參數的範例。

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

這些設定可讓 AWS Glue 任務讀取多個檔案 (大或小,無論壓縮或不含壓縮),並以 Parquet 格式將檔案寫入目標中,無論是否有任何變更,都可以使用 Parquet 格式將檔案寫入目標。

AWS AWS Glue 火花與斯卡拉的任務

若要搭配 Scala 使用 AWS AWS Glue 星火工作類型,請選擇 S park 作為任務類型,選擇語言Scala。選擇 S park 3.1,斯卡拉 2 改善任務啟動時間 (Glue 版本 3.0) 做為 AWS AWS Glue 版本。為了節省儲存空間,下列 AWS Glue 與 Scala 範例也使用此applyMapping功能來轉換資料類型。

AWS AWS Glue 斯卡拉參數

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

AWS AWS Glue 星火任務與斯卡拉代碼

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp {   def main(sysArgs: Array[String]) {          @transient val spark: SparkContext = SparkContext.getOrCreate()     val glueContext: GlueContext = new GlueContext(spark)     val inputLoc = "s3://bucket-name/prefix/sample_data.csv"     val outputLoc = "s3://bucket-name/prefix/"     val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()     val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),     ("_c2", "string", "profit", "double")), caseSensitive = false)     val formatPartition = applyMapping.toDF().coalesce(1)     val dynamicFrame = DynamicFrame(formatPartition, glueContext)     val dataSink = glueContext.getSinkWithFormat(         connectionType = "s3",          options = JsonOptions(Map("path" -> outputLoc )),         transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)   } }

附件

若要存取與此文件相關聯的其他內容,請解壓縮下列檔案:attachment.zip