本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Hudi 資料集
Hudi 支援透過 Spark 插入、更新和刪除 Hudi 資料集中的資料。如需詳細資訊,請參閱 Apache Hudi 文件中的寫入 Hudi 資料表
下列範例示範如何啟動互動式 Spark shell、使用 Spark 提交或使用 Amazon EMR Notebooks 在 Amazon 上使用 HudiEMR。您也可以使用 Hudi DeltaStreamer 公用程式或其他工具來寫入資料集。在本節中,範例示範使用 做為預設hadoop
使用者的 連線到主節點時,使用 Spark shell SSH的資料集。
spark-sql
使用 Amazon 6.7.0 或更新版本執行 spark-shell
spark-submit
、 EMR 或 時,請傳遞下列命令。
注意
Amazon EMR 6.7.0 使用 Apache Hudi
在主節點上開啟 Spark Shell
-
使用 連線至主要節點SSH。如需詳細資訊,請參閱《Amazon EMR管理指南》中的使用 連線至主要節點SSH。
-
輸入以下命令啟動 Spark shell。若要使用 PySpark Shell,請將 取代
spark-shell
為pyspark
。spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
spark-sql
使用 Amazon 6.6.x 或更早版本執行 spark-shell
spark-submit
、 EMR 或 時,請傳遞下列命令。
注意
-
Amazon EMR 6.2 和 5.31 及更新版本 (Hudi 0.6.x 及更新版本) 可以從
spark-avro.jar
組態中省略 。 -
Amazon EMR 6.5 和 5.35 及更新版本 (Hudi 0.9.x 及更新版本) 可以從
spark.sql.hive.convertMetastoreParquet=false
組態中省略。 -
Amazon EMR 6.6 和 5.36 及更新版本
HoodieSparkSessionExtension
(Hudi 0.10.x 及更新版本) 必須包含組態,如版本:0.10.0 Spark 指南中所述: --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
在主節點上開啟 Spark Shell
-
使用 連線至主要節點SSH。如需詳細資訊,請參閱《Amazon EMR管理指南》中的使用 連線至主要節點SSH。
-
輸入以下命令啟動 Spark shell。若要使用 PySpark Shell,請將 取代
spark-shell
為pyspark
。spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
若要將 Hudi 與 Amazon EMR Notebooks 搭配使用,您必須先將 Hudi jar 檔案從本機檔案系統複製到筆記本叢集的主節點HDFS上的 。然後,您可以使用筆記本編輯器來設定EMR筆記本以使用 Hudi。
將 Hudi 與 Amazon EMR Notebooks 搭配使用
-
建立和啟動 Amazon EMR Notebooks 的叢集。如需詳細資訊,請參閱《Amazon 管理指南》中的為筆記本建立 Amazon EMR叢集。 EMR
-
使用 連線到叢集的主節點,SSH然後將 jar 檔案從本機檔案系統複製到 ,HDFS如下列範例所示。在此範例中,我們會在 中建立目錄HDFS,以清楚說明檔案管理。如有需要HDFS,您可以在 中選擇自己的目的地。
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
開啟筆記本編輯器,輸入下列範例中的程式碼,然後執行程式碼。
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
若要將 Hudi 與 Amazon EMR Notebooks 搭配使用,您必須先將 Hudi jar 檔案從本機檔案系統複製到筆記本叢集的主節點HDFS上的 。然後,您可以使用筆記本編輯器來設定EMR筆記本以使用 Hudi。
將 Hudi 與 Amazon EMR Notebooks 搭配使用
-
建立和啟動 Amazon EMR Notebooks 的叢集。如需詳細資訊,請參閱《Amazon 管理指南》中的為筆記本建立 Amazon EMR叢集。 EMR
-
使用 連線到叢集的主節點,SSH然後將 jar 檔案從本機檔案系統複製到 ,HDFS如下列範例所示。在此範例中,我們會在 中建立目錄HDFS,以清楚說明檔案管理。如有需要HDFS,您可以在 中選擇自己的目的地。
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
開啟筆記本編輯器,輸入下列範例中的程式碼,然後執行程式碼。
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
初始化 Hudi 的 Spark 工作階段
使用 Scala 時,您必須在 Spark 工作階段中匯入下列類別。這需要在每個 Spark 工作階段都各自完成一次。
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
寫入 Hudi 資料集
下列範例示範如何建立 DataFrame 並將其寫入 Hudi 資料集。
注意
若要將程式碼範例貼到 Spark shell 中,請在提示字元中輸入 :paste
、貼上範例,然後按 CTRL
+ D
。
每次將 DataFrame 寫入 Hudi 資料集時,您必須指定 DataSourceWriteOptions
。這些選項有許多在各個寫入操作之間可能是相同的。以下範例會使用
變數指定常用選項,後續範例會使用這些變數。hudiOptions
注意
Amazon EMR 6.7.0 使用 Apache Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
注意
您可能會在程式碼範例和通知中看到 "hoodie" 而不是 Hudi。Hudi 程式碼庫廣泛使用舊的 "hoodie" 拼寫。
選項 | 描述 |
---|---|
TABLE_NAME |
在其下註冊資料集的表名。 |
TABLE_TYPE_OPT_KEY |
選用。指定要將資料集建立為 |
RECORDKEY_FIELD_OPT_KEY |
記錄金鑰欄位,其值將用作 |
PARTITIONPATH_FIELD_OPT_KEY |
分割區路徑欄位,其值將用作 |
PRECOMBINE_FIELD_OPT_KEY |
在實際寫入之前,會在預先結合中使用此欄位。當兩個記錄具有相同的金鑰值時,Hudi 選取 (由 |
只有在您的中繼存放區中註冊 Hudi 資料集資料表時才需要下列選項。如果您未將 Hudi 資料集註冊為 Hive 中繼存放區中的資料表,則不需要這些選項。
選項 | 描述 |
---|---|
HIVE_DATABASE_OPT_KEY |
要同步的 Hive 資料庫。預設值為 |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
用於將分割區欄位值擷取至 Hive 分割區欄的類別。 |
HIVE_PARTITION_FIELDS_OPT_KEY |
資料集中用於判斷 Hive 分割區欄的欄位。 |
HIVE_SYNC_ENABLED_OPT_KEY |
當設定為 |
HIVE_TABLE_OPT_KEY |
必要。Hive 中要同步的資料表的名稱。例如: |
HIVE_USER_OPT_KEY |
選用。同步時要使用的 Hive 使用者名稱。例如: |
HIVE_PASS_OPT_KEY |
選用。由 |
HIVE_URL_OPT_KEY |
Hive 中繼存放區 URL。 |
Upsert 資料
下列範例示範如何撰寫 來提升資料 DataFrame。與先前插入範例不同,OPERATION_OPT_KEY
值設定為 UPSERT_OPERATION_OPT_VAL
。此外,指定 .mode(SaveMode.Append)
以指示應附加記錄。
注意
Amazon EMR 6.7.0 使用 Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
刪除記錄
若要硬刪除記錄,您可以 upsert 空的承載。在此情況下,PAYLOAD_CLASS_OPT_KEY
選項會指定 EmptyHoodieRecordPayload
類別。此範例使用 upsert 範例中使用的相同 DataFrame、 updateDF
來指定相同的記錄。
注意
Amazon EMR 6.7.0 使用 Apache Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://amzn-s3-demo-bucket/myhudidataset/
')
您還可以透過以下方式硬刪除資料:將 OPERATION_OPT_KEY
設定為 DELETE_OPERATION_OPT_VAL
,來刪除您提交的資料集中的所有記錄。如需有關執行軟刪除的指示,以及有關刪除儲存在 Hudi 資料表中的資料的詳細資訊,請參閱 Apache Hudi 文件中的 Deletes
從 Hudi 資料集讀取
為在目前時間點擷取資料,Hudi 依預設執行快照查詢。以下是查詢在 寫入 Hudi 資料集 中寫入至 S3 的資料集的範例。s3://amzn-s3-demo-bucket/myhudidataset
以資料表路徑取代 ,並為每個分割區層級新增萬用字元星號,加上一個額外的星號。在此範例中,有一個分割區層級,因此我們新增了兩個萬用字符號。
注意
Amazon EMR 6.7.0 使用 Apache Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://amzn-s3-demo-bucket/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*') snapshotQueryDF.show()
增量查詢
您還可以使用 Hudi 執行增量查詢,以取得自提供遞交時間戳記以來已變更的記錄串流。若要這麼做,請將 QUERY_TYPE_OPT_KEY
欄位設定為 QUERY_TYPE_INCREMENTAL_OPT_VAL
。然後,為 BEGIN_INSTANTTIME_OPT_KEY
新增一個值,以取得自指定時間以來寫入的所有記錄。增量查詢的效率通常是批次處理查詢的十倍,因為它們僅處理變更的記錄。
在執行增量查詢時,請使用根 (基本) 資料表路徑,而不需要用於快照查詢的萬用字元星號。
注意
Presto 不支援增量查詢。
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://amzn-s3-demo-bucket/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset
') incQueryDF.show()
如需有關從 Hudi 資料集讀取的詳細資訊,請參閱 Apache Hudi 文件中的查詢 Hudi 資料表