管理 AWS Glue 中用于 ETL 输出的分区 - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

管理 AWS Glue 中用于 ETL 输出的分区

分区是用于组织数据集以便高效查询数据集的重要技术。它根据一个或多个列的不同值用分层目录结构来组织数据。

例如,您可以决定按日期(包括年、月和日)对 Amazon Simple Storage Service(Amazon S3)中的应用程序日志分区。然后,与某一天的数据相对应的文件将放置在前缀下,例如 s3://my_bucket/logs/year=2018/month=01/day=23/。Amazon Athena、Amazon Redshift Spectrum 和 AWS Glue 等系统可以使用这些分区按分区值筛选数据,而无需读取 Amazon S3 中的所有底层数据。

爬网程序不仅推断文件类型和架构,它们还会在填充 AWS Glue 数据目录时自动标识数据集的分区结构。生成的分区列可用于在 AWS Glue ETL 任务或 Amazon Athena 之类的查询引擎中进行查询。

对表进行网络爬取后,您可以查看爬网程序创建的分区。在 AWS Glue 控制台的左侧导航窗格中,选择 Tables (表)。选择爬网程序创建的表,然后选择 View Partitions (查看分区)

对于采用 key=val 样式的 Apache Hive 风格分区路径,爬网程序会使用键名自动填充列名称。否则,它使用默认名称,如 partition_0partition_1 等。您可以在控制台上更改默认名称。为此,请导航至该表格。检查索引选项卡下是否存在索引。如果是这样的话,您需要删除它们才能继续(之后您可以使用新的列名重新创建它们)。然后,选择编辑架构,并在那里修改分区列的名称。

然后,在您的 ETL 脚本中,便可以筛选分区列。因为分区信息存储于数据目录,所以使用 from_catalog API 调用包含 DynamicFrame 中的分区列。例如,使用 create_dynamic_frame.from_catalog 而不是 create_dynamic_frame.from_options

分区是一种可减少数据扫描量的优化技术。要详细了解确定何时适合使用这种技术的过程,请参阅《AWS 规范性指南》中“优化 AWS Glue for Apache Spark 作业性能的最佳实践”指南中的 减少数据扫描量

使用下推谓词进行预筛选

在许多情况下,您可以使用下推谓词来筛选分区,而不必列出并读取数据集中的所有文件。您可以直接对数据目录中的分区元数据应用筛选,而不是读取整个数据集,然后在 DynamicFrame 中筛选。这样,只需将您实际需要的内容列出和读取到 DynamicFrame 中即可。

例如,在 Python 中,您可以写入以下内容。

glue_context.create_dynamic_frame.from_catalog( database = "my_S3_data_set", table_name = "catalog_data_table", push_down_predicate = my_partition_predicate)

这会创建一个 DynamicFrame,它仅在数据目录中加载满足谓词表达式的分区。根据您要加载的数据子集的规模,这样可以节省大量处理时间。

谓词表达式可以是 Spark SQL 支持的任何布尔表达式。您可以在 Spark SQL 查询的 WHERE 子句中放置的任何内容都可以使用。例如,谓词表达式 pushDownPredicate = "(year=='2017' and month=='04')" 仅加载数据目录中 year 等于 2017 并且 month 等于 04 的分区。有关更多信息,请参阅 Apache Spark SQL 文档,尤其是 Scala SQL 函数参考

使用目录分区谓词进行服务器端筛选

push_down_predicate 选项将在列出目录中的所有分区之后以及列出 Amazon S3 中针对这些分区的文件之前应用。如果您有大量表分区,则目录分区列表仍然会产生额外的时间开销。为解决这一开销,您可以结合使用服务器端分区修剪和 catalogPartitionPredicate 选项,该选项使用 AWS Glue 数据目录中的分区索引。当您在一个表中有数百万个分区时,这样可以提高分区筛选速度。如果您的 catalogPartitionPredicate 需要目录分区索引尚不支持的谓词语法,您可以结合使用 push_down_predicateadditional_options 中的 catalogPartitionPredicate

Python:

dynamic_frame = glueContext.create_dynamic_frame.from_catalog( database=dbname, table_name=tablename, transformation_ctx="datasource0", push_down_predicate="day>=10 and customer_id like '10%'", additional_options={"catalogPartitionPredicate":"year='2021' and month='06'"} )

Scala:

val dynamicFrame = glueContext.getCatalogSource( database = dbname, tableName = tablename, transformationContext = "datasource0", pushDownPredicate="day>=10 and customer_id like '10%'", additionalOptions = JsonOptions("""{ "catalogPartitionPredicate": "year='2021' and month='06'"}""") ).getDynamicFrame()
注意

push_down_predicatecatalogPartitionPredicate 使用不同的语法。前者使用 Spark SQL 标准语法,后者使用 JSQL 解析器。

写入分区

默认情况下,DynamicFrame 在写入时不分区。所有输出文件都写入指定输出路径的顶级。直到最近,将 DynamicFrame 写入分区的唯一途径是在写入之前将其转换为 Spark SQL DataFrame。

但是,DynamicFrames 现在支持您在创建接收器时使用 partitionKeys 选项通过密钥序列进行本机分区。例如,以下 Python 代码将数据集以 Parquet 格式写出到 Amazon S3 中,写入到类型字段分区的目录中。然后,您可以使用其他系统(如 Amazon Athena)处理这些分区。

glue_context.write_dynamic_frame.from_options( frame = projectedEvents, connection_type = "s3", connection_options = {"path": "$outpath", "partitionKeys": ["type"]}, format = "parquet")