Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Lavorare con un set di dati Hudi
Hudi supporta l'inserimento, l'aggiornamento e l'eliminazione di dati nei set di dati Hudi tramite Spark. Per ulteriori informazioni, consulta Scrittura delle tabelle Hudi
Gli esempi seguenti mostrano come avviare la shell Spark interattiva, utilizzare Spark submit o utilizzare Amazon EMR Notebooks per lavorare con Hudi su Amazon. EMR Puoi anche usare l' DeltaStreamer utilità Hudi o altri strumenti per scrivere su un set di dati. In questa sezione, gli esempi dimostrano come lavorare con i set di dati utilizzando la shell Spark mentre si è connessi al nodo master utilizzando SSH come utente predefinito. hadoop
Durante l'esecuzione spark-shell
o spark-submit
l'spark-sql
utilizzo di Amazon EMR 6.7.0 o versioni successive, invia i seguenti comandi.
Nota
Amazon EMR 6.7.0 utilizza Apache Hudi
Per aprire la shell (interprete di comandi) Spark nel nodo primario
-
Connect al nodo primario utilizzandoSSH. Per ulteriori informazioni, consulta Connect al nodo primario utilizzando SSH nell'Amazon EMR Management Guide.
-
Immettere il seguente comando per avviare la shell Spark. Per usare la PySpark shell, sostituiscila
spark-shell
conpyspark
.spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Durante l'esecuzione spark-shell
o spark-submit
l'spark-sql
utilizzo di Amazon EMR 6.6.x o versioni precedenti, invia i seguenti comandi.
Nota
-
Amazon EMR 6.2 e 5.31 e versioni successive (Hudi 0.6.x e versioni successive) possono ometterlo dalla configurazione.
spark-avro.jar
-
Amazon EMR 6.5 e 5.35 e versioni successive (Hudi 0.9.x e versioni successive) possono essere omesse dalla configurazione.
spark.sql.hive.convertMetastoreParquet=false
-
--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Per aprire la shell (interprete di comandi) Spark nel nodo primario
-
Connect al nodo primario utilizzandoSSH. Per ulteriori informazioni, consulta Connect al nodo primario utilizzando SSH nell'Amazon EMR Management Guide.
-
Immettere il seguente comando per avviare la shell Spark. Per usare la PySpark shell, sostituiscila
spark-shell
conpyspark
.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Per utilizzare Hudi con Amazon EMR Notebooks, devi prima copiare i file jar Hudi dal file system locale HDFS sul nodo master del cluster di notebook. Quindi utilizzi l'editor di notebook per configurare il notebook in modo che utilizzi EMR Hudi.
Per usare Hudi con Amazon EMR Notebooks
-
Crea e avvia un cluster per Amazon EMR Notebooks. Per ulteriori informazioni, consulta Creazione di EMR cluster Amazon per notebook nella Amazon Management Guide. EMR
-
Connect al nodo master del cluster utilizzando SSH e quindi copiando i file jar dal filesystem locale HDFS come mostrato negli esempi seguenti. Nell'esempio, creiamo una directory HDFS per facilitare la gestione dei file. Se lo desideriHDFS, puoi scegliere la tua destinazione in.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
Aprire l'editor del notebook, immettere il codice dall'esempio seguente ed eseguirlo.
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Per utilizzare Hudi con Amazon EMR Notebooks, devi prima copiare i file jar Hudi dal file system locale HDFS sul nodo master del cluster di notebook. Quindi utilizzi l'editor di notebook per configurare il notebook in modo che utilizzi EMR Hudi.
Per usare Hudi con Amazon EMR Notebooks
-
Crea e avvia un cluster per Amazon EMR Notebooks. Per ulteriori informazioni, consulta Creazione di EMR cluster Amazon per notebook nella Amazon Management Guide. EMR
-
Connect al nodo master del cluster utilizzando SSH e quindi copiando i file jar dal filesystem locale HDFS come mostrato negli esempi seguenti. Nell'esempio, creiamo una directory HDFS per facilitare la gestione dei file. Se lo desideriHDFS, puoi scegliere la tua destinazione in.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
Aprire l'editor del notebook, immettere il codice dall'esempio seguente ed eseguirlo.
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Inizializzazione di una sessione Spark per Hudi
Quando si utilizza Scala, è necessario importare le seguenti classi nella sessione Spark. Questo deve essere fatto una volta per sessione Spark.
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
Scrittura in un set di dati Hudi
Gli esempi seguenti mostrano come creare un set di dati Hudi DataFrame e scriverlo come set di dati.
Nota
Per incollare gli esempi di codice nella shell Spark, digitare :paste
al prompt, incollare l'esempio e premere CTRL
+ D
.
Ogni volta che si scrive un su un set di dati Hudi, è DataFrame necessario specificare. DataSourceWriteOptions
Molte di queste opzioni sono probabilmente identiche tra le operazioni di scrittura. L'esempio seguente specifica le opzioni comuni utilizzando la variabile
, che gli esempi successivi utilizzano.hudiOptions
Nota
Amazon EMR 6.7.0 utilizza Apache Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Nota
Potresti visualizzare "hoodie" invece di Hudi negli esempi di codice e nelle notifiche. La base di codice Hudi utilizza ampiamente la vecchia ortografia "hoodie".
Opzione | Descrizione |
---|---|
TABLE_NAME |
Il nome della tabella in cui registrare il set di dati. |
TABLE_TYPE_OPT_KEY |
Facoltativo. Specificare se il set di dati viene creato come |
RECORDKEY_FIELD_OPT_KEY |
Il campo chiave record il cui valore verrà utilizzato come componente |
PARTITIONPATH_FIELD_OPT_KEY |
Il campo del percorso di partizione il cui valore verrà utilizzato come componente |
PRECOMBINE_FIELD_OPT_KEY |
Il campo utilizzato nella pre-combinazione prima della scrittura effettiva. Quando due registri hanno lo stesso valore chiave, Hudi seleziona quello con il valore più grande per il campo di precombinazione come determinato da |
Le seguenti opzioni sono necessarie solo per registrare la tabella del set di dati Hudi nel metastore. Se non si registra il set di dati Hudi come tabella nel metastore Hive, queste opzioni non sono necessarie.
Opzione | Descrizione |
---|---|
HIVE_DATABASE_OPT_KEY |
Il database Hive da sincronizzare. Il valore predefinito è |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
La classe utilizzata per estrarre i valori dei campi delle partizioni nelle colonne delle partizioni Hive. |
HIVE_PARTITION_FIELDS_OPT_KEY |
Campo nel set di dati da utilizzare per determinare le colonne di partizione Hive. |
HIVE_SYNC_ENABLED_OPT_KEY |
Quando impostato su |
HIVE_TABLE_OPT_KEY |
Obbligatorio. Il nome della tabella in Hive da sincronizzare. Ad esempio |
HIVE_USER_OPT_KEY |
Facoltativo. Il nome utente Hive da utilizzare durante la sincronizzazione. Ad esempio |
HIVE_PASS_OPT_KEY |
Facoltativo. La password Hive per l'utente specificato da |
HIVE_URL_OPT_KEY |
Il metastore di Hive. URL |
Upsert dei dati
L'esempio seguente mostra come modificare i dati scrivendo un. DataFrame A differenza dell'esempio di inserimento precedente, il valore OPERATION_OPT_KEY
è impostato su UPSERT_OPERATION_OPT_VAL
. Inoltre, .mode(SaveMode.Append)
è specificato per indicare che il record deve essere aggiunto.
Nota
Amazon EMR 6.7.0 utilizza Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Eliminazione di un registro
Per eliminare un registro, è possibile eseguire l'upsert di un payload vuoto. In questo caso, l'opzione PAYLOAD_CLASS_OPT_KEY
specifica la classe EmptyHoodieRecordPayload
. L'esempio utilizza lo stesso DataFrameupdateDF
, usato nell'esempio upsert per specificare lo stesso record.
Nota
Amazon EMR 6.7.0 utilizza Apache Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://amzn-s3-demo-bucket/myhudidataset/
')
È inoltre possibile eliminare i dati impostando OPERATION_OPT_KEY
su DELETE_OPERATION_OPT_VAL
per rimuovere tutti i registri nel set di dati inviato. Per istruzioni sull'esecuzione delle eliminazioni soft e per ulteriori informazioni sull'eliminazione dei dati archiviati nelle tabelle Hudi, consulta Deletes (Eliminazioni)
Lettura da un set di dati Hudi
Per recuperare i dati al momento attuale, Hudi esegue query degli snapshot per impostazione predefinita. Di seguito è riportato un esempio di interrogazione del set di dati scritto su S3 in Scrittura in un set di dati Hudi. Sostituisci s3://amzn-s3-demo-bucket/myhudidataset
con il percorso della tabella e aggiungi asterischi con caratteri jolly per ogni livello di partizione, più un asterisco aggiuntivo. Dal momento che questo esempio mostra un livello di partizione, sono stati aggiunti due simboli jolly.
Nota
Amazon EMR 6.7.0 utilizza Apache Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://amzn-s3-demo-bucket/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*') snapshotQueryDF.show()
Query incrementali
È inoltre possibile eseguire query incrementali con Hudi per ottenere un flusso di registri che hanno subito modifiche a partire da un dato timestamp di commit. Per fare questo, imposta il campo QUERY_TYPE_OPT_KEY
su QUERY_TYPE_INCREMENTAL_OPT_VAL
. Quindi, aggiungi un valore per BEGIN_INSTANTTIME_OPT_KEY
per ottenere tutti i registri scritti dall'ora specificata. Le query incrementali sono in genere dieci volte più efficienti rispetto alle controparti batch poiché elaborano solo i registri modificati.
Quando si eseguono query incrementali, utilizza il percorso della tabella radice (di base) senza gli asterischi jolly utilizzati per le query degli snapshot.
Nota
Presto non supporta le query incrementali.
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://amzn-s3-demo-bucket/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset
') incQueryDF.show()
Per ulteriori informazioni sulla lettura dai set di dati Hudi, consulta Esecuzione di query sulle tabelle Hudi