Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Bekerja dengan set data Hudi
Hudi mendukung penyisipan, perbaruan, dan penghapusan data dalam set data Hudi melalui Spark. Untuk informasi selengkapnya, lihat Menulis tabel Hudi
Contoh berikut menunjukkan cara meluncurkan shell Spark interaktif, menggunakan Spark submit, atau menggunakan EMR Notebook Amazon untuk bekerja dengan Hudi di Amazon. EMR Anda juga dapat menggunakan DeltaStreamer utilitas Hudi atau alat lain untuk menulis ke kumpulan data. Sepanjang bagian ini, contoh menunjukkan bekerja dengan dataset menggunakan Spark shell saat terhubung ke master node menggunakan SSH sebagai pengguna default. hadoop
Saat menjalankanspark-shell
,spark-submit
, atau spark-sql
menggunakan Amazon EMR 6.7.0 atau yang lebih baru, teruskan perintah berikut.
catatan
Amazon EMR 6.7.0 menggunakan Apache Hudi
Untuk membuka shell Spark pada node utama
-
Connect ke node utama menggunakanSSH. Untuk informasi selengkapnya, lihat Connect ke node utama menggunakan SSH Panduan EMR Manajemen Amazon.
-
Masukkan perintah berikut untuk meluncurkan shell Spark. Untuk menggunakan PySpark shell, ganti
spark-shell
denganpyspark
.spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Saat menjalankanspark-shell
,spark-submit
, atau spark-sql
menggunakan Amazon EMR 6.6.x atau yang lebih lama, teruskan perintah berikut.
catatan
-
Amazon EMR 6.2 dan 5.31 dan yang lebih baru (Hudi 0.6.x dan yang lebih baru) dapat menghilangkan dari konfigurasi.
spark-avro.jar
-
Amazon EMR 6.5 dan 5.35 dan yang lebih baru (Hudi 0.9.x dan yang lebih baru) dapat menghilangkan dari konfigurasi.
spark.sql.hive.convertMetastoreParquet=false
-
--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Untuk membuka shell Spark pada node utama
-
Connect ke node utama menggunakanSSH. Untuk informasi selengkapnya, lihat Connect ke node utama menggunakan SSH Panduan EMR Manajemen Amazon.
-
Masukkan perintah berikut untuk meluncurkan shell Spark. Untuk menggunakan PySpark shell, ganti
spark-shell
denganpyspark
.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Untuk menggunakan Hudi dengan EMR Notebook Amazon, Anda harus terlebih dahulu menyalin file jar Hudi dari sistem file lokal ke HDFS node master cluster notebook. Anda kemudian menggunakan editor notebook untuk mengkonfigurasi EMR notebook Anda untuk menggunakan Hudi.
Untuk menggunakan Hudi dengan Notebook Amazon EMR
-
Buat dan luncurkan cluster untuk EMR Notebook Amazon. Untuk informasi selengkapnya, lihat Membuat EMR klaster Amazon untuk buku catatan di Panduan EMRManajemen Amazon.
-
Connect ke master node dari cluster menggunakan SSH dan kemudian menyalin file jar dari filesystem lokal HDFS seperti yang ditunjukkan dalam contoh berikut. Dalam contoh, kami membuat direktori HDFS untuk kejelasan manajemen file. Anda dapat memilih tujuan Anda sendiri diHDFS, jika diinginkan.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
Buka editor notebook, masukkan kode dari contoh berikut, dan jalankan.
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Untuk menggunakan Hudi dengan EMR Notebook Amazon, Anda harus terlebih dahulu menyalin file jar Hudi dari sistem file lokal ke HDFS node master cluster notebook. Anda kemudian menggunakan editor notebook untuk mengkonfigurasi EMR notebook Anda untuk menggunakan Hudi.
Untuk menggunakan Hudi dengan Notebook Amazon EMR
-
Buat dan luncurkan cluster untuk EMR Notebook Amazon. Untuk informasi selengkapnya, lihat Membuat EMR klaster Amazon untuk buku catatan di Panduan EMRManajemen Amazon.
-
Connect ke master node dari cluster menggunakan SSH dan kemudian menyalin file jar dari filesystem lokal HDFS seperti yang ditunjukkan dalam contoh berikut. Dalam contoh, kami membuat direktori HDFS untuk kejelasan manajemen file. Anda dapat memilih tujuan Anda sendiri diHDFS, jika diinginkan.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
Buka editor notebook, masukkan kode dari contoh berikut, dan jalankan.
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Inisialisasi sesi Spark untuk Hudi
Bila Anda menggunakan Scala, Anda harus mengimpor kelas berikut di sesi Spark Anda. Hal ini perlu dilakukan sekali per sesi Spark.
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
Tulis ke set data Hudi
Contoh berikut menunjukkan cara membuat DataFrame dan menulisnya sebagai kumpulan data Hudi.
catatan
Untuk menyisipkan contoh kode ke shell Spark, ketik :paste
pada prompt, tempel contoh, dan kemudian tekan CTRL
+ D
.
Setiap kali Anda menulis DataFrame ke kumpulan data Hudi, Anda harus menentukan. DataSourceWriteOptions
Banyak dari opsi ini cenderung identik antara operasi tulis. Contoh berikut menentukan pilihan umum menggunakan
variabel, yang digunakan contoh berikutnya.hudiOptions
catatan
Amazon EMR 6.7.0 menggunakan Apache Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
catatan
Anda mungkin melihat "hoodie" alih-alih Hudi dalam contoh kode dan pemberitahuan. Basis kode Hudi secara luas menggunakan ejaan "hoodie" lama.
Opsi | Deskripsi |
---|---|
TABLE_NAME |
Nama tabel untuk mendaftarkan set data. |
TABLE_TYPE_OPT_KEY |
Tidak wajib. Menentukan apakah set data dibuat sebagai |
RECORDKEY_FIELD_OPT_KEY |
Bidang kunci rekam yang nilainya akan digunakan sebagai |
PARTITIONPATH_FIELD_OPT_KEY |
Bidang jalur partisi yang nilainya akan digunakan sebagai |
PRECOMBINE_FIELD_OPT_KEY |
Bidang yang digunakan dalam pra-menggabungkan sebelum penulisan yang sebenarnya. Ketika dua catatan memiliki nilai kunci yang sama, Hudi memilih tyang memiliki nilai terbesar untuk bidang penggabungan sebagaimana ditentukan oleh |
Opsi berikut diperlukan hanya untuk mendaftarkan tabel set data Hudi metastore Anda. Jika Anda tidak mendaftar set data Hudi Anda sebagai tabel di metastore Hive, pilihan ini tidak diperlukan.
Opsi | Deskripsi |
---|---|
HIVE_DATABASE_OPT_KEY |
Basis data Hive untuk disinkronkan ke. Default-nya adalah |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
Kelas yang digunakan untuk mengekstrak nilai bidang ke kolom partisi Hive. |
HIVE_PARTITION_FIELDS_OPT_KEY |
Bidang dalam set data yang digunakan untuk menentukan kolom partisi Hive. |
HIVE_SYNC_ENABLED_OPT_KEY |
Ketika diatur ke |
HIVE_TABLE_OPT_KEY |
Wajib. Nama tabel di Hive untuk disinkronkan ke. Misalnya, |
HIVE_USER_OPT_KEY |
Tidak wajib. Nama pengguna Hive untuk digunakan saat sinkronisasi. Misalnya, |
HIVE_PASS_OPT_KEY |
Tidak wajib. Kata sandi Hive untuk pengguna yang ditentukan oleh |
HIVE_URL_OPT_KEY |
URLMetastore Sarang. |
Tambahkan data
Contoh berikut menunjukkan bagaimana untuk meningkatkan data dengan menulis. DataFrame Berbeda dengan contoh penyisipan sebelumnya, nilai OPERATION_OPT_KEY
diatur ke UPSERT_OPERATION_OPT_VAL
. Selain itu, .mode(SaveMode.Append)
ditentukan untuk menunjukkan bahwa catatan harus ditambahkan.
catatan
Amazon EMR 6.7.0 menggunakan Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Hapus catatan
Untuk menghapus catatan, Anda dapat menambahkan muatan kosong. Dalam hal ini, pilihan PAYLOAD_CLASS_OPT_KEY
menentukan EmptyHoodieRecordPayload
kelas. Contoh menggunakan yang sama DataFrame,updateDF
, digunakan dalam contoh upsert untuk menentukan catatan yang sama.
catatan
Amazon EMR 6.7.0 menggunakan Apache Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://amzn-s3-demo-bucket/myhudidataset/
')
Anda juga dapat menghapus data dengan menyetel OPERATION_OPT_KEY
ke DELETE_OPERATION_OPT_VAL
untuk menghapus semua catatan dalam set data yang Anda kirimkan. Untuk petunjuk tentang melakukan penghapusan secara halus, dan untuk informasi lebih lanjut tentang menghapus data yang disimpan dalam tabel Hudi, lihat Menghapus
Baca dari set data Hudi
Untuk mengambil data pada saat ini, Hudi melakukan kueri snapshot secara default. Berikut ini adalah contoh mengkueri set data yang ditulis ke S3 di Tulis ke set data Hudi. Ganti s3://amzn-s3-demo-bucket/myhudidataset
dengan jalur tabel Anda, dan tambahkan tanda bintang wildcard untuk setiap tingkat partisi, ditambah satu tanda bintang tambahan. Dalam contoh ini, ada satu tingkat partisi, jadi kami telah menambahkan dua simbol wildcard.
catatan
Amazon EMR 6.7.0 menggunakan Apache Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://amzn-s3-demo-bucket/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*') snapshotQueryDF.show()
Kueri tambahan
Anda juga dapat melakukan kueri tambahan dengan Hudi untuk mendapatkan aliran catatan yang telah berubah sejak stempel waktu komit diberikan. Untuk melakukannya, atur QUERY_TYPE_OPT_KEY
bidang ke QUERY_TYPE_INCREMENTAL_OPT_VAL
. Kemudian, tambahkan nilai BEGIN_INSTANTTIME_OPT_KEY
untuk mendapatkan semua catatan yang ditulis sejak waktu yang ditentukan. Permintaan tambahan biasanya sepuluh kali lebih efisien daripada rekan batch mereka karena hanya memproses catatan yang berubah.
Ketika Anda melakukan kueri tambahan, gunakan jalur tabel akar (dasar) tanpa tanda bintang wildcard yang digunakan untuk kueri Snapshot.
catatan
Presto tidak mendukung kueri tambahan.
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://amzn-s3-demo-bucket/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset
') incQueryDF.show()
Untuk informasi lebih lanjut tentang membaca dari set data Hudi, lihat Mengkueri tabel Hudi