Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Arbeiten mit einem Hudi-Datensatz
Hudi unterstützt das Einfügen, Aktualisieren und Löschen von Daten in Hudi-Datensätzen über Spark. Weitere Informationen finden Sie unter Hudi-Tabellen schreiben
In den folgenden Beispielen wird veranschaulicht, wie Sie die interaktive Spark-Shell starten und die Spark-Übermittlung und Amazon EMR Notebooks verwenden, um mit Hudi in Amazon EMR zu arbeiten. Sie können auch das DeltaStreamer Hudi-Hilfsprogramm oder andere Tools verwenden, um in einen Datensatz zu schreiben. Die Beispiele in diesem Abschnitt veranschaulichen das Arbeiten mit Datasets unter Verwendung der Spark-Shell, während eine Verbindung mit dem Master-Knoten mittels SSH als Standard-hadoop
-Benutzer vorhanden ist.
Wenn Sie spark-shell
, spark-submit
oder spark-sql
mit Amazon EMR 6.7.0 oder höher ausführen, übergeben Sie die folgenden Befehle.
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
So öffnen Sie die Spark-Shell auf dem Primärknoten
-
Verbinden Sie sich dem Primärknoten über SSH. Weitere Informationen finden Sie unter Stellen Sie über SSH eine Verbindung zum Primärknoten her im Verwaltungshandbuch für Amazon EMR.
-
Geben Sie den folgenden Befehl ein, um die Spark-Shell zu starten. Um die PySpark Shell zu verwenden, ersetzen Sie
spark-shell
durchpyspark
.spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Wenn Sie spark-shell
, spark-submit
, oder spark-sql
mit Amazon EMR 6.6.x oder früher ausführen, übergeben Sie die folgenden Befehle.
Anmerkung
-
Amazon EMR 6.2 und 5.31 und höher (Hudi 0.6.x und höher) können
spark-avro.jar
in der Konfiguration weglassen. -
Amazon EMR 6.5 und 5.35 und höher (Hudi 0.9.x und höher) können
spark.sql.hive.convertMetastoreParquet=false
in der Konfiguration weglassen. -
Amazon EMR 6.6 und 5.36 und höher (Hudi 0.10.x und höher) müssen die
HoodieSparkSessionExtension
Konfiguration enthalten, wie sie im Spark-Leitfaden zu Version: 0.10.0 beschrieben ist: --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
So öffnen Sie die Spark-Shell auf dem Primärknoten
-
Verbinden Sie sich dem Primärknoten über SSH. Weitere Informationen finden Sie unter Stellen Sie über SSH eine Verbindung zum Primärknoten her im Verwaltungshandbuch für Amazon EMR.
-
Geben Sie den folgenden Befehl ein, um die Spark-Shell zu starten. Um die PySpark Shell zu verwenden,
spark-shell
ersetzen Sie durchpyspark
.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Um Hudi mit Amazon EMR Notebooks verwenden zu können, müssen Sie zuerst die Hudi Jar-Dateien aus dem lokalen Dateisystem auf HDFS auf dem Hauptknoten des Notebook-Clusters kopieren. Anschließend verwenden Sie den Notebook-Editor, um Ihr EMR Notebook für die Verwendung von Hudi zu konfigurieren.
Wie Sie Hudi mit Amazon EMR Notebooks verwenden
-
Erstellen und starten Sie einen Cluster für Amazon EMR Notebooks. Weitere Informationen finden Sie unter Erstellen von Amazon-EMR-Clustern für Notebooks im Verwaltungshandbuch für Amazon EMR.
-
Verbinden Sie sich über SSH mit dem Master-Knoten des Clusters und kopieren Sie dann die Jar-Dateien aus dem lokalen Dateisystem auf HDFS, wie in den folgenden Beispielen veranschaulicht. In diesem Beispiel erstellen wir ein Verzeichnis in HDFS zur Veranschaulichung der Dateiverwaltung. Sie können Ihr eigenes Ziel in HDFS wählen, falls gewünscht.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
Öffnen Sie den Notebook-Editor, geben Sie den Code aus dem folgenden Beispiel ein und führen Sie ihn aus.
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Um Hudi mit Amazon EMR Notebooks verwenden zu können, müssen Sie zuerst die Hudi Jar-Dateien aus dem lokalen Dateisystem auf HDFS auf dem Hauptknoten des Notebook-Clusters kopieren. Anschließend verwenden Sie den Notebook-Editor, um Ihr EMR Notebook für die Verwendung von Hudi zu konfigurieren.
Wie Sie Hudi mit Amazon EMR Notebooks verwenden
-
Erstellen und starten Sie einen Cluster für Amazon EMR Notebooks. Weitere Informationen finden Sie unter Erstellen von Amazon-EMR-Clustern für Notebooks im Verwaltungshandbuch für Amazon EMR.
-
Verbinden Sie sich über SSH mit dem Master-Knoten des Clusters und kopieren Sie dann die Jar-Dateien aus dem lokalen Dateisystem auf HDFS, wie in den folgenden Beispielen veranschaulicht. In diesem Beispiel erstellen wir ein Verzeichnis in HDFS zur Veranschaulichung der Dateiverwaltung. Sie können Ihr eigenes Ziel in HDFS wählen, falls gewünscht.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
Öffnen Sie den Notebook-Editor, geben Sie den Code aus dem folgenden Beispiel ein und führen Sie ihn aus.
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Initialisieren Sie eine Spark-Sitzung für Hudi
Wenn Sie Scala verwenden, müssen Sie die folgenden Klassen in Ihre Spark-Sitzung importieren. Dies muss einmal pro Spark-Sitzung erfolgen.
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
Schreiben Sie in einen Hudi-Datensatz
Die folgenden Beispiele zeigen, wie ein Hudi-Datensatz erstellt DataFrame und als Hudi-Datensatz geschrieben wird.
Anmerkung
Um Codebeispiele in die Spark-Shell einzufügen, geben Sie an der Eingabeaufforderung :paste
ein, fügen das Beispiel ein und drücken dann CTRL
+ D
.
Jedes Mal, wenn Sie einen DataFrame in einen Hudi-Datensatz schreiben, müssen Sie Folgendes angeben. DataSourceWriteOptions
Viele dieser Optionen sind unter den Schreiboperationen wahrscheinlich identisch. Im folgenden Beispiel werden allgemeine Optionen unter Verwendung der Variablen
angegeben, die von nachfolgenden Beispielen verwendet wird.hudiOptions
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Anmerkung
Möglicherweise sehen Sie in Codebeispielen und Benachrichtigungen „hoodie“ anstelle von Hudi. In der Hudi-Codebasis wird häufig die alte Schreibweise „hoodie“ verwendet.
Option | Beschreibung |
---|---|
TABLE_NAME |
Der Tabellenname, unter dem das Dataset registriert werden soll. |
TABLE_TYPE_OPT_KEY |
Optional. Gibt an, ob das Dataset als |
RECORDKEY_FIELD_OPT_KEY |
Das Datensatzschlüsselfeld, dessen Wert als die |
PARTITIONPATH_FIELD_OPT_KEY |
Das Partitionspfadfeld, dessen Wert als die Komponente |
PRECOMBINE_FIELD_OPT_KEY |
Das Feld, das in der Vorab-Kombination vor dem tatsächlichen Schreiben verwendet wird. Wenn zwei Datensätze denselben Schlüsselwert haben, wählt Hudi den Datensatz mit dem größten Wert für das Vorab-Kombinationsfeld wie von |
Die folgenden Optionen sind nur erforderlich, um die Hudi-Datensatz-Tabelle in Ihrem Metastore zu registrieren. Wenn Sie Ihr Hudi-Datensatz nicht als Tabelle im Hive-Metastore registrieren, sind diese Optionen nicht erforderlich.
Option | Beschreibung |
---|---|
HIVE_DATABASE_OPT_KEY |
Die Hive-Datenbank, mit der synchronisiert werden soll. Der Standardwert ist |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
Die Klasse, mit der Partitionsfeldwerte in Hive-Partitionsspalten extrahiert werden. |
HIVE_PARTITION_FIELDS_OPT_KEY |
Das Feld im Dataset, anhand dessen Hive-Partitionsspalten bestimmt werden sollen. |
HIVE_SYNC_ENABLED_OPT_KEY |
Wenn diese Option auf |
HIVE_TABLE_OPT_KEY |
Erforderlich Der Name der Tabelle in Hive, mit der synchronisiert werden soll. Beispiel, |
HIVE_USER_OPT_KEY |
Optional. Der Hive-Benutzername, der bei der Synchronisierung verwendet werden soll. Beispiel, |
HIVE_PASS_OPT_KEY |
Optional. Das von |
HIVE_URL_OPT_KEY |
Die Hive-Metastore-URL. |
Upsert Daten
Das folgende Beispiel zeigt, wie Daten durch das Schreiben von A verändert werden. DataFrame Im Gegensatz zum vorherigen Einfügebeispiel wird der Wert OPERATION_OPT_KEY
auf UPSERT_OPERATION_OPT_VAL
eingestellt. Darüber hinaus wird mit .mode(SaveMode.Append)
angegeben, dass der Datensatz angehängt werden soll.
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Einen Datensatz löschen
Um einen Datensatz dauerhaft zu löschen, können Sie eine leere Datenlast einfügen. In diesem Fall gibt die Option PAYLOAD_CLASS_OPT_KEY
die Klasse EmptyHoodieRecordPayload
an. Im Beispiel wird derselbe DataFrame,, verwendetupdateDF
, der im Upsert-Beispiel verwendet wurde, um denselben Datensatz anzugeben.
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://amzn-s3-demo-bucket/myhudidataset/
')
Sie können Daten auch dauerhaft löschen, indem Sie OPERATION_OPT_KEY
auf DELETE_OPERATION_OPT_VAL
einstellen, um alle Datensätze in dem von Ihnen eingereichten Datensatz zu entfernen. Anweisungen zur Durchführung von „weichen Löschungen“ und weitere Informationen zum Löschen von Daten, die in Hudi-Tabellen gespeichert sind, finden Sie unter Löschen
Aus einem Hudi-Datensatz lesen
Um Daten zum aktuellen Zeitpunkt abzurufen, führt Hudi standardmäßig Snapshot-Abfragen durch. Im Folgenden finden Sie ein Beispiel für die Abfrage des in S3 geschriebenen Datensatzes in Schreiben Sie in einen Hudi-Datensatz. s3://amzn-s3-demo-bucket/myhudidataset
Ersetzen Sie es durch Ihren Tabellenpfad und fügen Sie Platzhalter-Sternchen für jede Partitionsebene sowie ein zusätzliches Sternchen hinzu. In diesem Beispiel gibt es eine Partitionsebene, daher haben wir zwei Platzhaltersymbole hinzugefügt.
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://amzn-s3-demo-bucket/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*') snapshotQueryDF.show()
Inkrementelle Abfragen
Sie können mit Hudi auch inkrementelle Abfragen durchführen, um einen Stream von Datensätzen abzurufen, die sich seit einem bestimmten Commit-Zeitstempel geändert haben. Setzen Sie dazu das Feld QUERY_TYPE_OPT_KEY
auf QUERY_TYPE_INCREMENTAL_OPT_VAL
. Fügen Sie dann einen Wert für BEGIN_INSTANTTIME_OPT_KEY
hinzu, um alle Datensätze abzurufen, die seit dem angegebenen Zeitpunkt geschrieben wurden. Inkrementelle Abfragen sind in der Regel zehnmal effizienter als ihre Gegenstücke im Batch-Modus, da sie nur geänderte Datensätze verarbeiten.
Wenn Sie inkrementelle Abfragen ausführen, verwenden Sie den Pfad der Stammtabelle (Basistabelle) ohne die für Snapshot-Abfragen verwendeten Platzhaltersterchen.
Anmerkung
Presto unterstützt keine inkrementellen Abfragen.
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://amzn-s3-demo-bucket/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset
') incQueryDF.show()
Weitere Informationen zum Lesen von Hudi-Datensätzen finden Sie unter Abfragen von Hudi-Tabellen