使用 Hudi 資料集 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Hudi 資料集

Hudi 支援透過 Spark 插入、更新和刪除 Hudi 資料集中的資料。如需詳細資訊,請參閱 Apache Hudi 文件中的寫入 Hudi 資料表

下列範例示範如何啟動互動式 Spark shell、使用 Spark 提交或使用 Amazon EMR Notebooks 在 Amazon 上使用 HudiEMR。您也可以使用 Hudi DeltaStreamer 公用程式或其他工具來寫入資料集。在本節中,範例示範使用 做為預設hadoop使用者的 連線到主節點時,使用 Spark shell SSH的資料集。

spark-sql 使用 Amazon 6.7.0 或更新版本執行 spark-shellspark-submit、 EMR 或 時,請傳遞下列命令。

注意

Amazon EMR 6.7.0 使用 Apache Hudi 0.11.0-amzn-0,相較於先前的 Hudi 版本,它包含了重大的改善。如需詳細資訊,請參閱 Apache Hudi 0.11.0 遷移指南。此標籤上的範例反映了這些變更。

在主節點上開啟 Spark Shell
  1. 使用 連線至主要節點SSH。如需詳細資訊,請參閱《Amazon EMR管理指南》中的使用 連線至主要節點SSH

  2. 輸入以下命令啟動 Spark shell。若要使用 PySpark Shell,請將 取代spark-shellpyspark

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

spark-sql 使用 Amazon 6.6.x 或更早版本執行 spark-shellspark-submit、 EMR 或 時,請傳遞下列命令。

注意
  • Amazon EMR 6.2 和 5.31 及更新版本 (Hudi 0.6.x 及更新版本) 可以從spark-avro.jar組態中省略 。

  • Amazon EMR 6.5 和 5.35 及更新版本 (Hudi 0.9.x 及更新版本) 可以從spark.sql.hive.convertMetastoreParquet=false組態中省略。

  • Amazon EMR 6.6 和 5.36 及更新版本 HoodieSparkSessionExtension (Hudi 0.10.x 及更新版本) 必須包含組態,如版本:0.10.0 Spark 指南中所述:

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
在主節點上開啟 Spark Shell
  1. 使用 連線至主要節點SSH。如需詳細資訊,請參閱《Amazon EMR管理指南》中的使用 連線至主要節點SSH

  2. 輸入以下命令啟動 Spark shell。若要使用 PySpark Shell,請將 取代spark-shellpyspark

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

若要將 Hudi 與 Amazon EMR Notebooks 搭配使用,您必須先將 Hudi jar 檔案從本機檔案系統複製到筆記本叢集的主節點HDFS上的 。然後,您可以使用筆記本編輯器來設定EMR筆記本以使用 Hudi。

將 Hudi 與 Amazon EMR Notebooks 搭配使用
  1. 建立和啟動 Amazon EMR Notebooks 的叢集。如需詳細資訊,請參閱《Amazon 管理指南》中的為筆記本建立 Amazon EMR叢集 EMR

  2. 使用 連線到叢集的主節點,SSH然後將 jar 檔案從本機檔案系統複製到 ,HDFS如下列範例所示。在此範例中,我們會在 中建立目錄HDFS,以清楚說明檔案管理。如有需要HDFS,您可以在 中選擇自己的目的地。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. 開啟筆記本編輯器,輸入下列範例中的程式碼,然後執行程式碼。

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

若要將 Hudi 與 Amazon EMR Notebooks 搭配使用,您必須先將 Hudi jar 檔案從本機檔案系統複製到筆記本叢集的主節點HDFS上的 。然後,您可以使用筆記本編輯器來設定EMR筆記本以使用 Hudi。

將 Hudi 與 Amazon EMR Notebooks 搭配使用
  1. 建立和啟動 Amazon EMR Notebooks 的叢集。如需詳細資訊,請參閱《Amazon 管理指南》中的為筆記本建立 Amazon EMR叢集 EMR

  2. 使用 連線到叢集的主節點,SSH然後將 jar 檔案從本機檔案系統複製到 ,HDFS如下列範例所示。在此範例中,我們會在 中建立目錄HDFS,以清楚說明檔案管理。如有需要HDFS,您可以在 中選擇自己的目的地。

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. 開啟筆記本編輯器,輸入下列範例中的程式碼,然後執行程式碼。

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

初始化 Hudi 的 Spark 工作階段

使用 Scala 時,您必須在 Spark 工作階段中匯入下列類別。這需要在每個 Spark 工作階段都各自完成一次。

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

寫入 Hudi 資料集

下列範例示範如何建立 DataFrame 並將其寫入 Hudi 資料集。

注意

若要將程式碼範例貼到 Spark shell 中,請在提示字元中輸入 :paste、貼上範例,然後按 CTRL + D

每次將 DataFrame 寫入 Hudi 資料集時,您必須指定 DataSourceWriteOptions。這些選項有許多在各個寫入操作之間可能是相同的。以下範例會使用 hudiOptions 變數指定常用選項,後續範例會使用這些變數。

注意

Amazon EMR 6.7.0 使用 Apache Hudi 0.11.0-amzn-0,相較於先前的 Hudi 版本,它包含了重大的改善。如需詳細資訊,請參閱 Apache Hudi 0.11.0 遷移指南。此標籤上的範例反映了這些變更。

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')
注意

您可能會在程式碼範例和通知中看到 "hoodie" 而不是 Hudi。Hudi 程式碼庫廣泛使用舊的 "hoodie" 拼寫。

DataSourceWriteOptions Hudi 的參考
選項 描述

TABLE_NAME

在其下註冊資料集的表名。

TABLE_TYPE_OPT_KEY

選用。指定要將資料集建立為 "COPY_ON_WRITE""MERGE_ON_READ"。預設值為 "COPY_ON_WRITE"

RECORDKEY_FIELD_OPT_KEY

記錄金鑰欄位,其值將用作 HoodieKeyrecordKey 元件。實際值將藉由叫用欄位值的 .toString() 來取得。您可以使用點符號來指定巢狀欄位,例如 a.b.c

PARTITIONPATH_FIELD_OPT_KEY

分割區路徑欄位,其值將用作 HoodieKeypartitionPath 元件。實際值將藉由叫用欄位值的 .toString() 來取得。

PRECOMBINE_FIELD_OPT_KEY

在實際寫入之前,會在預先結合中使用此欄位。當兩個記錄具有相同的金鑰值時,Hudi 選取 (由 Object.compareTo(..) 決定之) 預先組合欄位中值最大的記錄。

只有在您的中繼存放區中註冊 Hudi 資料集資料表時才需要下列選項。如果您未將 Hudi 資料集註冊為 Hive 中繼存放區中的資料表,則不需要這些選項。

DataSourceWriteOptions Hive 的參考
選項 描述

HIVE_DATABASE_OPT_KEY

要同步的 Hive 資料庫。預設值為 "default"

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

用於將分割區欄位值擷取至 Hive 分割區欄的類別。

HIVE_PARTITION_FIELDS_OPT_KEY

資料集中用於判斷 Hive 分割區欄的欄位。

HIVE_SYNC_ENABLED_OPT_KEY

當設定為 "true" 時,將資料集註冊至 Apache Hive 中繼存放區。預設值為 "false"

HIVE_TABLE_OPT_KEY

必要。Hive 中要同步的資料表的名稱。例如:"my_hudi_table_cow"

HIVE_USER_OPT_KEY

選用。同步時要使用的 Hive 使用者名稱。例如:"hadoop"

HIVE_PASS_OPT_KEY

選用。由 HIVE_USER_OPT_KEY 指定之使用者的 Hive 密碼。

HIVE_URL_OPT_KEY

Hive 中繼存放區 URL。

Upsert 資料

下列範例示範如何撰寫 來提升資料 DataFrame。與先前插入範例不同,OPERATION_OPT_KEY 值設定為 UPSERT_OPERATION_OPT_VAL。此外,指定 .mode(SaveMode.Append) 以指示應附加記錄。

注意

Amazon EMR 6.7.0 使用 Apache Hudi 0.11.0-amzn-0,相較於先前的 Hudi 版本,它包含了重大的改善。如需詳細資訊,請參閱 Apache Hudi 0.11.0 遷移指南。此標籤上的範例反映了這些變更。

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

刪除記錄

若要硬刪除記錄,您可以 upsert 空的承載。在此情況下,PAYLOAD_CLASS_OPT_KEY 選項會指定 EmptyHoodieRecordPayload 類別。此範例使用 upsert 範例中使用的相同 DataFrame、 updateDF來指定相同的記錄。

注意

Amazon EMR 6.7.0 使用 Apache Hudi 0.11.0-amzn-0,相較於先前的 Hudi 版本,它包含了重大的改善。如需詳細資訊,請參閱 Apache Hudi 0.11.0 遷移指南。此標籤上的範例反映了這些變更。

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

您還可以透過以下方式硬刪除資料:將 OPERATION_OPT_KEY 設定為 DELETE_OPERATION_OPT_VAL,來刪除您提交的資料集中的所有記錄。如需有關執行軟刪除的指示,以及有關刪除儲存在 Hudi 資料表中的資料的詳細資訊,請參閱 Apache Hudi 文件中的 Deletes

從 Hudi 資料集讀取

為在目前時間點擷取資料,Hudi 依預設執行快照查詢。以下是查詢在 寫入 Hudi 資料集 中寫入至 S3 的資料集的範例。s3://amzn-s3-demo-bucket/myhudidataset 以資料表路徑取代 ,並為每個分割區層級新增萬用字元星號,加上一個額外的星號。在此範例中,有一個分割區層級,因此我們新增了兩個萬用字符號。

注意

Amazon EMR 6.7.0 使用 Apache Hudi 0.11.0-amzn-0,相較於先前的 Hudi 版本,它包含了重大的改善。如需詳細資訊,請參閱 Apache Hudi 0.11.0 遷移指南。此標籤上的範例反映了這些變更。

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

增量查詢

您還可以使用 Hudi 執行增量查詢,以取得自提供遞交時間戳記以來已變更的記錄串流。若要這麼做,請將 QUERY_TYPE_OPT_KEY 欄位設定為 QUERY_TYPE_INCREMENTAL_OPT_VAL。然後,為 BEGIN_INSTANTTIME_OPT_KEY 新增一個值,以取得自指定時間以來寫入的所有記錄。增量查詢的效率通常是批次處理查詢的十倍,因為它們僅處理變更的記錄。

在執行增量查詢時,請使用根 (基本) 資料表路徑,而不需要用於快照查詢的萬用字元星號。

注意

Presto 不支援增量查詢。

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

如需有關從 Hudi 資料集讀取的詳細資訊,請參閱 Apache Hudi 文件中的查詢 Hudi 資料表