Lavorare con un set di dati Hudi - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Lavorare con un set di dati Hudi

Hudi supporta l'inserimento, l'aggiornamento e l'eliminazione di dati nei set di dati Hudi tramite Spark. Per ulteriori informazioni, consulta Scrittura delle tabelle Hudi nella documentazione di Apache Hudi.

Negli esempi seguenti viene illustrato come avviare la shell interattiva Spark, utilizzare Spark submit o utilizzare Amazon EMR Notebooks per lavorare con Hudi su Amazon EMR. È inoltre possibile utilizzare l' DeltaStreamer utilità Hudi o altri strumenti per scrivere su un set di dati. In questa sezione, gli esempi illustrano l'utilizzo di set di dati con la shell Spark connessi al nodo master utilizzando SSH come utente hadoop predefinito.

Durante l'esecuzione di spark-shell, spark-submit o spark-sql utilizzando Amazon EMR versione 6.7.0 o successive, esegui i seguenti comandi.

Nota

Amazon EMR 6.7.0 impiega Apache Hudi 0.11.0-amzn-0, che contiene miglioramenti significativi rispetto alle versioni precedenti di Hudi. Per ulteriori informazioni, consulta la sezione Apache Hudi 0.11.0 Migration Guide (Guida alla migrazione di Apache Hudi 0.11.0). Gli esempi in questa scheda riflettono tali modifiche.

Per aprire la shell (interprete di comandi) Spark nel nodo primario
  1. Effettua la connessione al nodo primario tramite SSH. Per ulteriori informazioni, consulta la sezione Connect to the primary node using SSH (Connessione al nodo primario tramite SSH) nella Guida alla gestione di Amazon EMR.

  2. Immettere il seguente comando per avviare la shell Spark. Per usare la PySpark shell, spark-shell sostituiscila con. pyspark

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Durante l'esecuzione di spark-shell, spark-submit o spark-sql utilizzando Amazon EMR 6.6.x o rilasci precedenti, esegui i seguenti comandi.

Nota
  • Amazon EMR versioni 6.2 e 5.31 e successive (Hudi versione 0.6.x e successive) può omettere spark-avro.jar dalla configurazione.

  • Amazon EMR rilasci 6.5 e 5.35 e successivi (Hudi rilascio 0.9.x e successivi) può omettere spark.sql.hive.convertMetastoreParquet=false dalla configurazione.

  • Amazon EMR versioni 6.6 e 5.36 e successive (Hudi versione 0.10.x e successive) deve includere la configurazione HoodieSparkSessionExtension descritta in Version: 0.10.0 Spark Guide (Guida a Spark versione 0.10.0):

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Per aprire la shell (interprete di comandi) Spark nel nodo primario
  1. Effettua la connessione al nodo primario tramite SSH. Per ulteriori informazioni, consulta la sezione Connect to the primary node using SSH (Connessione al nodo primario tramite SSH) nella Guida alla gestione di Amazon EMR.

  2. Immettere il seguente comando per avviare la shell Spark. Per usare la PySpark shell, sostituiscila spark-shell conpyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Per utilizzare Hudi con Amazon EMR Notebooks, è necessario innanzitutto copiare i file jar Hudi dal file system locale in HDFS sul nodo master del cluster di notebook. È quindi possibile utilizzare l'editor del notebook per configurare EMR Notebooks per l'utilizzo di Hudi.

Utilizzo di Hudi con Amazon EMR Notebooks
  1. Crea e avvia un cluster per Amazon EMR Notebooks. Per ulteriori informazioni, consulta Creazione di cluster per i notebook Amazon EMR nella Guida alla gestione di Amazon EMR.

  2. Connettersi al nodo master del cluster utilizzando SSH e quindi copiare i file jar dal filesystem locale in HDFS, come illustrato negli esempi seguenti. Nell'esempio, creiamo una directory in HDFS per chiarezza nella gestione dei file. È possibile scegliere la propria destinazione in HDFS, se lo si desidera.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. Aprire l'editor del notebook, immettere il codice dall'esempio seguente ed eseguirlo.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Per utilizzare Hudi con Amazon EMR Notebooks, è necessario innanzitutto copiare i file jar Hudi dal file system locale in HDFS sul nodo master del cluster di notebook. È quindi possibile utilizzare l'editor del notebook per configurare EMR Notebooks per l'utilizzo di Hudi.

Utilizzo di Hudi con Amazon EMR Notebooks
  1. Crea e avvia un cluster per Amazon EMR Notebooks. Per ulteriori informazioni, consulta Creazione di cluster per i notebook Amazon EMR nella Guida alla gestione di Amazon EMR.

  2. Connettersi al nodo master del cluster utilizzando SSH e quindi copiare i file jar dal filesystem locale in HDFS, come illustrato negli esempi seguenti. Nell'esempio, creiamo una directory in HDFS per chiarezza nella gestione dei file. È possibile scegliere la propria destinazione in HDFS, se lo si desidera.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Aprire l'editor del notebook, immettere il codice dall'esempio seguente ed eseguirlo.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Inizializzazione di una sessione Spark per Hudi

Quando si utilizza Scala, è necessario importare le seguenti classi nella sessione Spark. Questo deve essere fatto una volta per sessione Spark.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Scrittura in un set di dati Hudi

Gli esempi seguenti mostrano come creare un set di dati Hudi DataFrame e scriverlo come set di dati.

Nota

Per incollare gli esempi di codice nella shell Spark, digitare :paste al prompt, incollare l'esempio e premere CTRL + D.

Ogni volta che si scrive un su un set di dati Hudi, è DataFrame necessario specificare. DataSourceWriteOptions Molte di queste opzioni sono probabilmente identiche tra le operazioni di scrittura. L'esempio seguente specifica le opzioni comuni utilizzando la variabile hudiOptions, che gli esempi successivi utilizzano.

Nota

Amazon EMR 6.7.0 impiega Apache Hudi 0.11.0-amzn-0, che contiene miglioramenti significativi rispetto alle versioni precedenti di Hudi. Per ulteriori informazioni, consulta la sezione Apache Hudi 0.11.0 Migration Guide (Guida alla migrazione di Apache Hudi 0.11.0). Gli esempi in questa scheda riflettono tali modifiche.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')
Nota

Potresti visualizzare "hoodie" invece di Hudi negli esempi di codice e nelle notifiche. La base di codice Hudi utilizza ampiamente la vecchia ortografia "hoodie".

DataSourceWriteOptions riferimento per Hudi
Opzione Descrizione

TABLE_NAME

Il nome della tabella in cui registrare il set di dati.

TABLE_TYPE_OPT_KEY

Facoltativo. Specificare se il set di dati viene creato come "COPY_ON_WRITE" o "MERGE_ON_READ". Il valore predefinito è "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

Il campo chiave record il cui valore verrà utilizzato come componente recordKey di HoodieKey. Il valore effettivo sarà ottenuto richiamando .toString() sul valore del campo. I campi nidificati possono essere specificati utilizzando la notazione con punti, ad esempio a.b.c.

PARTITIONPATH_FIELD_OPT_KEY

Il campo del percorso di partizione il cui valore verrà utilizzato come componente partitionPath di HoodieKey. Il valore effettivo sarà ottenuto richiamando .toString() sul valore del campo.

PRECOMBINE_FIELD_OPT_KEY

Il campo utilizzato nella pre-combinazione prima della scrittura effettiva. Quando due registri hanno lo stesso valore chiave, Hudi seleziona quello con il valore più grande per il campo di precombinazione come determinato da Object.compareTo(..).

Le seguenti opzioni sono necessarie solo per registrare la tabella del set di dati Hudi nel metastore. Se non si registra il set di dati Hudi come tabella nel metastore Hive, queste opzioni non sono necessarie.

DataSourceWriteOptions riferimento per Hive
Opzione Descrizione

HIVE_DATABASE_OPT_KEY

Il database Hive da sincronizzare. Il valore predefinito è "default".

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

La classe utilizzata per estrarre i valori dei campi delle partizioni nelle colonne delle partizioni Hive.

HIVE_PARTITION_FIELDS_OPT_KEY

Campo nel set di dati da utilizzare per determinare le colonne di partizione Hive.

HIVE_SYNC_ENABLED_OPT_KEY

Quando impostato su "true", registra il set di dati con il metastore Apache Hive. Il valore predefinito è "false".

HIVE_TABLE_OPT_KEY

Obbligatorio. Il nome della tabella in Hive da sincronizzare. Ad esempio "my_hudi_table_cow".

HIVE_USER_OPT_KEY

Facoltativo. Il nome utente Hive da utilizzare durante la sincronizzazione. Ad esempio "hadoop".

HIVE_PASS_OPT_KEY

Facoltativo. La password Hive per l'utente specificato da HIVE_USER_OPT_KEY.

HIVE_URL_OPT_KEY

L'URL del metastore Hive.

Upsert dei dati

L'esempio seguente mostra come modificare i dati scrivendo un. DataFrame A differenza dell'esempio di inserimento precedente, il valore OPERATION_OPT_KEY è impostato su UPSERT_OPERATION_OPT_VAL. Inoltre, .mode(SaveMode.Append) è specificato per indicare che il record deve essere aggiunto.

Nota

Amazon EMR 6.7.0 impiega Apache Hudi 0.11.0-amzn-0, che contiene miglioramenti significativi rispetto alle versioni precedenti di Hudi. Per ulteriori informazioni, consulta la sezione Apache Hudi 0.11.0 Migration Guide (Guida alla migrazione di Apache Hudi 0.11.0). Gli esempi in questa scheda riflettono tali modifiche.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Eliminazione di un registro

Per eliminare un registro, è possibile eseguire l'upsert di un payload vuoto. In questo caso, l'opzione PAYLOAD_CLASS_OPT_KEY specifica la classe EmptyHoodieRecordPayload. L'esempio utilizza lo stesso DataFrameupdateDF, usato nell'esempio upsert per specificare lo stesso record.

Nota

Amazon EMR 6.7.0 impiega Apache Hudi 0.11.0-amzn-0, che contiene miglioramenti significativi rispetto alle versioni precedenti di Hudi. Per ulteriori informazioni, consulta la sezione Apache Hudi 0.11.0 Migration Guide (Guida alla migrazione di Apache Hudi 0.11.0). Gli esempi in questa scheda riflettono tali modifiche.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

È inoltre possibile eliminare i dati impostando OPERATION_OPT_KEY su DELETE_OPERATION_OPT_VAL per rimuovere tutti i registri nel set di dati inviato. Per istruzioni sull'esecuzione delle eliminazioni soft e per ulteriori informazioni sull'eliminazione dei dati archiviati nelle tabelle Hudi, consulta Deletes (Eliminazioni) nella documentazione di Apache Hudi.

Lettura da un set di dati Hudi

Per recuperare i dati al momento attuale, Hudi esegue query degli snapshot per impostazione predefinita. Di seguito è riportato un esempio di interrogazione del set di dati scritto su S3 in Scrittura in un set di dati Hudi. Sostituisci s3://amzn-s3-demo-bucket/myhudidataset con il percorso della tabella e aggiungi asterischi con caratteri jolly per ogni livello di partizione, più un asterisco aggiuntivo. Dal momento che questo esempio mostra un livello di partizione, sono stati aggiunti due simboli jolly.

Nota

Amazon EMR 6.7.0 impiega Apache Hudi 0.11.0-amzn-0, che contiene miglioramenti significativi rispetto alle versioni precedenti di Hudi. Per ulteriori informazioni, consulta la sezione Apache Hudi 0.11.0 Migration Guide (Guida alla migrazione di Apache Hudi 0.11.0). Gli esempi in questa scheda riflettono tali modifiche.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

Query incrementali

È inoltre possibile eseguire query incrementali con Hudi per ottenere un flusso di registri che hanno subito modifiche a partire da un dato timestamp di commit. Per fare questo, imposta il campo QUERY_TYPE_OPT_KEY su QUERY_TYPE_INCREMENTAL_OPT_VAL. Quindi, aggiungi un valore per BEGIN_INSTANTTIME_OPT_KEY per ottenere tutti i registri scritti dall'ora specificata. Le query incrementali sono in genere dieci volte più efficienti rispetto alle controparti batch poiché elaborano solo i registri modificati.

Quando si eseguono query incrementali, utilizza il percorso della tabella radice (di base) senza gli asterischi jolly utilizzati per le query degli snapshot.

Nota

Presto non supporta le query incrementali.

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

Per ulteriori informazioni sulla lettura dai set di dati Hudi, consulta Esecuzione di query sulle tabelle Hudi nella documentazione di Apache Hudi.