本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
三种用于将数据转换为 Apache Parq AWS uet 的 Glue ETL 作业类型
由 Adnan Alvee (AWS)、Karthikeyan Ramachandran 和 Nith Govindasivan () 创作 AWS
环境:PoC 或试点 | 技术:分析 | 工作负载:所有其他工作负载 |
AWS服务:AWSGlue |
Summary
在亚马逊 Web Services (AWS) 云上AWS,Glue 是一项完全托管的提取、转换和加载 (ETL) 服务。AWSGlue 可以经济高效地对数据进行分类、清理、丰富数据,并在各种数据存储和数据流之间可靠地移动数据。
此模式在 Glue 中提供了不同的作业类型AWS,并使用三种不同的脚本来演示创作作作ETL业。
你可以使用 AWS Glue 在 Python 外壳环境中写ETL作业。您还可以在托管 Apache Spark 环境中使用 Python (PySpark) 或 Scala 创建批处理和流式处理ETL作业。为了让你开始创作作ETL业,这种模式侧重于使用 Python shell PySpark、和 Scala 的批处理ETL作业。Python Shell 作业适用于需要较低计算能力的工作负载。托管 Apache Spark 环境适用于需要高计算能力的工作负载。
Apache Parquet 旨在支持高效的压缩和编码方案。它可以加快分析工作负载的速度,因为它以列式方式存储数据。从长远来看,将数据转换为 Parquet 可以为您节省存储空间、成本和时间。要了解有关 Parquet 的更多信息,请参阅博客文章 Apache Parquet:如何使用开源列式数据格式成为英雄
先决条件和限制
先决条件
AWSIdentity and Access Management (IAM) 角色(如果您没有角色,请参阅 “其他信息” 部分。)
架构
目标技术堆栈
AWSGlue
Amazon Simple Storage Service (Amazon S3)
Apache Parquet
自动化和扩缩
AWSGlue 工作流程支持ETL管道的完全自动化。
您可以将数据处理单元的数量 (DPUs) 或工作器类型更改为水平和垂直扩展。
工具
AWS服务
Amazon Simple Storage Service (Amazon S3) 是一项基于云的对象存储服务,可帮助您存储、保护和检索任意数量的数据。
AWSGlu e 是一项完全托管的ETL服务,用于在各种数据存储和数据流之间对数据进行分类、清理、丰富和移动。
其他工具
Apache Parquet
是一种专为存储和检索而设计的开源列式数据文件格式。
配置
使用以下设置来配置 Glue 的计算AWS能力ETL。要降低成本,请在运行此模式中提供的工作负载时使用最低设置。
Python shell — 你可以使用 1 DPU 来利用 16 GB 的内存,也可以使用 0.0625 DPU 来使用 1 GB 的内存。此模式使用 0.0625DPU,这是 Glue 控制台中的默认模式AWS。
Python 或 Scala for Spark — 如果你在控制台中选择与 Spark 相关的作业类型,Gl AWS ue 默认使用 10 个工作人员和 G.1X 工作人员类型。这种模式使用两个 Worker(这是允许的最小数量),标准 Worker 类型足够且具有成本效益。
下表显示了 Apache Spark 环境的不同 AWS Glue 工作器类型。由于 Python Shell 作业不使用 Apache Spark 环境来运行 Python,因此它未包含在表中。
Standard | G.1X | G.2X | |
---|---|---|---|
v CPU | 4 | 4 | 8 |
内存 | 16 GB | 16 GB | 32 GB |
磁盘空间 | 50 GB | 64 GB | 128 GB |
每个 Worker 的执行程序 | 2 | 1 | 1 |
代码
有关此模式中使用的代码,包括IAM角色和参数配置,请参阅 “其他信息” 部分。
操作说明
任务 | 描述 | 所需技能 |
---|---|---|
将数据上传到新的或现有 S3 存储桶。 | 在账户中创建 S3 存储桶或使用现有 S3 存储桶。从附件部分上传 sample_data.csv 文件,并记下 S3 存储桶和前缀位置。 | 将军 AWS |
任务 | 描述 | 所需技能 |
---|---|---|
创建 AWS Glue 作业。 | 在 AWS Glue 控制台的ETL部分下,添加一个 AWS Glue 作业。选择适当的作业类型、AWS Glue 版本以及相应的 DPU /Worker 类型和工作人员人数。有关详细信息,请参阅配置部分。 | 开发人员、云或数据 |
更改输入和输出位置。 | 复制与你的 Glue AWS 任务对应的代码,然后更改你在 “上传数据” 长篇故事中记下的输入和输出位置。 | 开发人员、云或数据 |
配置参数。 | 您可以使用 “其他信息” 部分中提供的片段为您的ETL任务设置参数。AWSGlue 在内部使用四个参数名称:
必须在 Glue 控制台上明确AWS输入该 必须在每个参数名称之前添加 | 开发人员、云或数据 |
运行作ETL业。 | 运行作业并检查输出。注意与原始文件相比减少了多少空间。 | 开发人员、云或数据 |
相关资源
参考
教程和视频
其他信息
IAM角色
创建 AWS Glue 作业时,您可以使用具有以下代码片段所示权限的现有IAM角色或新角色。
要创建新角色,请使用以下YAML代码。
# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]
AWSGlue Python 外壳
Python 代码使用 Pandas 和 PyArrow 库将数据转换为 Parquet。Pandas 库已经可用。该 PyArrow 库是在你运行模式时下载的,因为这是一次性运行。您可以使用 wheel 文件 PyArrow 转换为库并将该文件作为库包提供。有关打包 Wheel 文件的更多信息,请参阅提供您自己的 Python 库。
AWSGlue Python 外壳参数
from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])
AWSGlue Python 外壳代码
from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1] s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False) s3_resource.Object(output_loc_bucket, output_loc_prefix + 'data' + '.parquet').put(Body=parquet_buffer.getvalue())
AWS使用 Python 进行 Glue Spark 工作
要在 Pyth AWS on 中使用 Glue Spark 作业类型,请选择 Spark 作为作业类型。选择作业启动时间缩短的 Spark 3.1、Python 3(Glue 版本 3.0)作为 AWS Glue 版本。
AWSGlue Python 参数
from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])
AWS使用 Python 代码进行 Glue Spark 作业
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\ connection_type = "s3", \ connection_options = { "paths": [input_loc]}, \ format = "csv", format_options={ "withHeader": True, "separator": "," }) outputDF = glueContext.write_dynamic_frame.from_options(\ frame = inputDyf, \ connection_type = "s3", \ connection_options = {"path": output_loc \ }, format = "parquet")
对于大量压缩的大文件(例如,1,000 个文件,每个文件约为 3 MB),请使用带有 recurse
参数的 compressionType
参数读取前缀内的所有可用文件,如以下代码所示。
input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )
对于大量压缩的小文件(例如,1,000 个文件,每个文件大小约为 133 KB),请使用 groupFiles
参数以及 compressionType
和 recurse
参数。groupFiles
参数将小文件分组为多个大文件,groupSize
参数将分组控制为以字节为单位的指定大小(例如,1 MB)。以下代码片段提供了在代码中使用这些参数的示例。
input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )
这些设置使得 AWS Glue 作业能够读取多个文件(大或小,有或没有压缩),然后以 Parquet 格式将它们写入目标,而无需对工作节点进行任何更改。
AWSGlue Spark 在 Scala 工作
要在 Scala AWS 中使用 Glue Spark 作业类型,请选择 Spark 作为作业类型,选择 “语言” 作为 Scala。选择作业启动时间缩短的 Spark 3.1、Scala 2(Glue 版本 3.0)作为 AWS Glue 版本。为了节省存储空间,以下 AWS Glue with Scala 示例还使用该applyMapping
功能来转换数据类型。
AWSGlue Scala 参数
import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)
AWS使用 Scala 代码进行 Glue Spark 作业
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp { def main(sysArgs: Array[String]) { @transient val spark: SparkContext = SparkContext.getOrCreate() val glueContext: GlueContext = new GlueContext(spark) val inputLoc = "s3://bucket-name/prefix/sample_data.csv" val outputLoc = "s3://bucket-name/prefix/" val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame() val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"), ("_c2", "string", "profit", "double")), caseSensitive = false) val formatPartition = applyMapping.toDF().coalesce(1) val dynamicFrame = DynamicFrame(formatPartition, glueContext) val dataSink = glueContext.getSinkWithFormat( connectionType = "s3", options = JsonOptions(Map("path" -> outputLoc )), transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame) } }
附件
要访问与此文档相关联的其他内容,请解压以下文件:attachment.zip