选择您的 Cookie 首选项

我们使用必要 Cookie 和类似工具提供我们的网站和服务。我们使用性能 Cookie 收集匿名统计数据,以便我们可以了解客户如何使用我们的网站并进行改进。必要 Cookie 无法停用,但您可以单击“自定义”或“拒绝”来拒绝性能 Cookie。

如果您同意,AWS 和经批准的第三方还将使用 Cookie 提供有用的网站功能、记住您的首选项并显示相关内容,包括相关广告。要接受或拒绝所有非必要 Cookie,请单击“接受”或“拒绝”。要做出更详细的选择,请单击“自定义”。

在 AWS Glue 中使用 Hudi 框架

聚焦模式
在 AWS Glue 中使用 Hudi 框架 - AWS Glue

AWS Glue 3.0 及更高版本支持数据湖的 Apache Hudi 框架。Hudi 是一个开源数据湖存储框架,简化增量数据处理和数据管道开发。本主题涵盖了在 Hudi 表中传输或存储数据时,在 AWS Glue 中使用数据的可用功能。要了解有关 Hudi 的更多信息,请参阅 Apache Hudi 官方文档

您可以使用 AWS Glue 对 Amazon S3 中的 Hudi 表执行读写操作,也可以使用 AWS Glue 数据目录处理 Hudi 表。还支持其他操作,包括插入、更新和所有 Apache Spark 操作

注意

Apache Hudi 0.10.1 for AWS Glue 3.0 不支持 Read(MoR)表上的 Hudi Merge。

下表列出了 AWS Glue 每个版本中包含的 Hudi 版本。

AWS Glue 版本 支持的 Hudi 版本
4.0 0.12.1
3.0 0.10.1

要了解有关 AWS Glue 支持的数据湖框架的更多信息,请参阅在 AWS Glue ETL 任务中使用数据湖框架

启用 Hudi

要为 AWS Glue 启用 Hudi,请完成以下任务:

  • 指定 hudi 作为 --datalake-formats 作业参数的值。有关更多信息,请参阅 在 AWS Glue 作业中使用作业参数

  • --conf 为 Glue 作业创建一个名为 AWS 的密钥,并将其设置为以下值。或者,您可以在脚本中使用 SparkConf 设置以下配置。这些设置有助于 Apache Spark 正确处理 Hudi 表。

    spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
  • AWS Glue 4.0 默认为 Hudi 表启用了 Lake Formation 权限支持。无需额外配置即可读取/写入注册到 Lake Formation 的 Hudi 表。AWS Glue 作业 IAM 角色必须具有 SELECT 权限才能读取已注册的 Hudi 表。AWS Glue 作业 IAM 角色必须具有 SUPER 权限才能写入已注册的 Hudi 表。要了解有关管理 Lake Formation 权限的更多信息,请参阅 Granting and revoking permissions on Data Catalog resources

使用不同的 Hudi 版本

要使用 AWS Glue 不支持的 Hudi 版本,请使用 --extra-jars 作业参数指定您自己的 Hudi JAR 文件。请勿使用 hudi 作为 --datalake-formats 作业参数的值。

示例:将 Hudi 表写入 Amazon S3 并将其注册到 AWS Glue 数据目录中

以下示例脚本脚本演示了如何将 Hudi 表写入 Amazon S3,并将该表注册到 AWS Glue 数据目录。该示例使用 Hudi Hive 同步工具来注册该表。

注意

此示例要求您设置 --enable-glue-datacatalog 作业参数,才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 在 AWS Glue 作业中使用作业参数

Python
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()
Scala
// Example: Example: Create a Hudi table from a DataFrame // and register the table to Glue Data Catalog val additionalOptions = Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms", "path" -> "s3://<s3Path/>") dataFrame.write.format("hudi") .options(additionalOptions) .mode("append") .save()
# Example: Create a Hudi table from a DataFrame # and register the table to Glue Data Catalog additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "path": "s3://<s3Path/>" } dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save()

示例:使用 AWS Glue Data Catalog 从 Amazon S3 读取 Hudi 表

此示例从 Amazon S3 读取您在 示例:将 Hudi 表写入 Amazon S3 并将其注册到 AWS Glue 数据目录中 中创建的 Hudi 表。

注意

此示例要求您设置 --enable-glue-datacatalog 任务参数,才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 在 AWS Glue 作业中使用作业参数

Python

在本示例中,使用 GlueContext.create_data_frame.from_catalog() 方法。

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )
Scala

在本示例中,使用 getCatalogSource 方法。

// Example: Read a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import org.apache.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame() } }

在本示例中,使用 GlueContext.create_data_frame.from_catalog() 方法。

# Example: Read a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) dataFrame = glueContext.create_data_frame.from_catalog( database = "<your_database_name>", table_name = "<your_table_name>" )

示例:更新 DataFrame 并将其插入到 Amazon S3 的 Hudi 表中

此示例使用 AWS Glue Data Catalog 将 DataFrame 插入到您在 示例:将 Hudi 表写入 Amazon S3 并将其注册到 AWS Glue 数据目录中 中创建的 Hudi 表中。

注意

此示例要求您设置 --enable-glue-datacatalog 任务参数,才能将 AWS Glue Data Catalog 用作 Apache Spark Hive 元存储。要了解更多信息,请参阅 在 AWS Glue 作业中使用作业参数

Python

在本示例中,使用 GlueContext.write_data_frame.from_catalog() 方法。

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )
Scala

在本示例中,使用 getCatalogSink 方法。

// Example: Upsert a Hudi table from Glue Data Catalog import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.JsonOptions import org.apacke.spark.SparkContext object GlueApp { def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext) } }

在本示例中,使用 GlueContext.write_data_frame.from_catalog() 方法。

# Example: Upsert a Hudi table from Glue Data Catalog from awsglue.context import GlueContext from pyspark.context import SparkContext sc = SparkContext() glueContext = GlueContext(sc) glueContext.write_data_frame.from_catalog( frame = dataFrame, database = "<your_database_name>", table_name = "<your_table_name>", additional_options={ "hoodie.table.name": "<your_table_name>", "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": "<your_recordkey_field>", "hoodie.datasource.write.precombine.field": "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field": "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": "<your_database_name>", "hoodie.datasource.hive_sync.table": "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms" } )

示例:使用 Spark 从 Amazon S3 读取 Hudi 表

此示例使用 Spark DataFrame API 从 Amazon S3 读取 Hudi 表。

Python
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
Scala
// Example: Read a Hudi table from S3 using a Spark DataFrame val dataFrame = spark.read.format("hudi").load("s3://<s3path/>")
# Example: Read a Hudi table from S3 using a Spark DataFrame dataFrame = spark.read.format("hudi").load("s3://<s3path/>")

示例:使用 Spark 向 Amazon S3 写入 Hudi 表

示例:使用 Spark 向 Amazon S3 写入 Hudi 表

Python
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)
Scala
// Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") .options(additionalOptions) .mode("overwrite") .save("s3://<s3path/>")
# Example: Write a Hudi table to S3 using a Spark DataFrame dataFrame.write.format("hudi") \ .options(**additional_options) \ .mode("overwrite") \ .save("s3://<s3Path/>)

示例:读取和写入具有 Lake Formation 权限控制的 Hudi 表

此示例将读取和写入一个具有 Lake Formation 权限控制的 Hudi 表。

  1. 创建一个 Hudi 表并将其注册到 Lake Formation。

    1. 要启用 Lake Formation 权限控制,您首先需要将表的 Amazon S3 路径注册到 Lake Formation。有关更多信息,请参阅 Registering an Amazon S3 location(注册 Amazon S3 位置)。您可以通过 Lake Formation 控制台或使用 AWS CLI 进行注册:

      aws lakeformation register-resource --resource-arn arn:aws:s3:::<s3-bucket>/<s3-folder> --use-service-linked-role --region <REGION>

      注册了 Amazon S3 位置后,对于任何指向该位置(或其任何子位置)的 AWS Glue 表,GetTable 调用中的 IsRegisteredWithLakeFormation 参数都将返回值 true。

    2. 创建一个指向通过 Spark dataframe API 注册的 Amazon S3 路径的 Hudi 表:

      hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'path': <S3_TABLE_LOCATION>, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms' } df_products.write.format("hudi") \ .options(**hudi_options) \ .mode("overwrite") \ .save()
  2. 向 AWS Glue 作业 IAM 角色授予 Lake Formation 权限。您可以通过 Lake Formation 控制台授予权限,也可以使用 AWS CLI 授予权限。有关更多信息,请参阅 Granting table permissions using the Lake Formation console and the named resource method

  3. 读取注册到 Lake Formation 的 Hudi 表。代码与读取未注册的 Hudi 表相同。请注意,AWS Glue 作业 IAM 角色需要具有 SELECT 权限才能成功读取。

    val dataFrame = glueContext.getCatalogSource( database = "<your_database_name>", tableName = "<your_table_name>" ).getDataFrame()
  4. 写入注册到 Lake Formation 的 Hudi 表。代码与写入未注册的 Hudi 表相同。请注意,AWS Glue 作业 IAM 角色需要具有 SUPER 权限才能成功写入。

    glueContext.getCatalogSink("<your_database_name>", "<your_table_name>", additionalOptions = JsonOptions(Map( "hoodie.table.name" -> "<your_table_name>", "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.operation" -> "<write_operation>", "hoodie.datasource.write.recordkey.field" -> "<your_recordkey_field>", "hoodie.datasource.write.precombine.field" -> "<your_precombine_field>", "hoodie.datasource.write.partitionpath.field" -> "<your_partitionkey_field>", "hoodie.datasource.write.hive_style_partitioning" -> "true", "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.database" -> "<your_database_name>", "hoodie.datasource.hive_sync.table" -> "<your_table_name>", "hoodie.datasource.hive_sync.partition_fields" -> "<your_partitionkey_field>", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc" -> "false", "hoodie.datasource.hive_sync.mode" -> "hms" ))) .writeDataFrame(dataFrame, glueContext)

下一主题:

Delta Lake

上一主题:

限制
隐私网站条款Cookie 首选项
© 2025, Amazon Web Services, Inc. 或其附属公司。保留所有权利。