Bekerja dengan set data Hudi - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Bekerja dengan set data Hudi

Hudi mendukung penyisipan, perbaruan, dan penghapusan data dalam set data Hudi melalui Spark. Untuk informasi selengkapnya, lihat Menulis tabel Hudi dalam dokumentasi Apache Hudi.

Contoh berikut menunjukkan cara meluncurkan shell Spark interaktif, menggunakan Spark submit, atau menggunakan EMR Notebook Amazon untuk bekerja dengan Hudi di Amazon. EMR Anda juga dapat menggunakan DeltaStreamer utilitas Hudi atau alat lain untuk menulis ke kumpulan data. Sepanjang bagian ini, contoh menunjukkan bekerja dengan dataset menggunakan Spark shell saat terhubung ke master node menggunakan SSH sebagai pengguna default. hadoop

Saat menjalankanspark-shell,spark-submit, atau spark-sql menggunakan Amazon EMR 6.7.0 atau yang lebih baru, teruskan perintah berikut.

catatan

Amazon EMR 6.7.0 menggunakan Apache Hudi 0.11.0-amzn-0, yang berisi peningkatan signifikan dibandingkan versi Hudi sebelumnya. Untuk informasi selengkapnya, lihat Panduan Migrasi Apache Hudi 0.11.0. Contoh pada tab ini mencerminkan perubahan ini.

Untuk membuka shell Spark pada node utama
  1. Connect ke node utama menggunakanSSH. Untuk informasi selengkapnya, lihat Connect ke node utama menggunakan SSH Panduan EMR Manajemen Amazon.

  2. Masukkan perintah berikut untuk meluncurkan shell Spark. Untuk menggunakan PySpark shell, ganti spark-shell dengan pyspark.

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Saat menjalankanspark-shell,spark-submit, atau spark-sql menggunakan Amazon EMR 6.6.x atau yang lebih lama, teruskan perintah berikut.

catatan
Untuk membuka shell Spark pada node utama
  1. Connect ke node utama menggunakanSSH. Untuk informasi selengkapnya, lihat Connect ke node utama menggunakan SSH Panduan EMR Manajemen Amazon.

  2. Masukkan perintah berikut untuk meluncurkan shell Spark. Untuk menggunakan PySpark shell, ganti spark-shell dengan pyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Untuk menggunakan Hudi dengan EMR Notebook Amazon, Anda harus terlebih dahulu menyalin file jar Hudi dari sistem file lokal ke HDFS node master cluster notebook. Anda kemudian menggunakan editor notebook untuk mengkonfigurasi EMR notebook Anda untuk menggunakan Hudi.

Untuk menggunakan Hudi dengan Notebook Amazon EMR
  1. Buat dan luncurkan cluster untuk EMR Notebook Amazon. Untuk informasi selengkapnya, lihat Membuat EMR klaster Amazon untuk buku catatan di Panduan EMRManajemen Amazon.

  2. Connect ke master node dari cluster menggunakan SSH dan kemudian menyalin file jar dari filesystem lokal HDFS seperti yang ditunjukkan dalam contoh berikut. Dalam contoh, kami membuat direktori HDFS untuk kejelasan manajemen file. Anda dapat memilih tujuan Anda sendiri diHDFS, jika diinginkan.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. Buka editor notebook, masukkan kode dari contoh berikut, dan jalankan.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Untuk menggunakan Hudi dengan EMR Notebook Amazon, Anda harus terlebih dahulu menyalin file jar Hudi dari sistem file lokal ke HDFS node master cluster notebook. Anda kemudian menggunakan editor notebook untuk mengkonfigurasi EMR notebook Anda untuk menggunakan Hudi.

Untuk menggunakan Hudi dengan Notebook Amazon EMR
  1. Buat dan luncurkan cluster untuk EMR Notebook Amazon. Untuk informasi selengkapnya, lihat Membuat EMR klaster Amazon untuk buku catatan di Panduan EMRManajemen Amazon.

  2. Connect ke master node dari cluster menggunakan SSH dan kemudian menyalin file jar dari filesystem lokal HDFS seperti yang ditunjukkan dalam contoh berikut. Dalam contoh, kami membuat direktori HDFS untuk kejelasan manajemen file. Anda dapat memilih tujuan Anda sendiri diHDFS, jika diinginkan.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Buka editor notebook, masukkan kode dari contoh berikut, dan jalankan.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Inisialisasi sesi Spark untuk Hudi

Bila Anda menggunakan Scala, Anda harus mengimpor kelas berikut di sesi Spark Anda. Hal ini perlu dilakukan sekali per sesi Spark.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Tulis ke set data Hudi

Contoh berikut menunjukkan cara membuat DataFrame dan menulisnya sebagai kumpulan data Hudi.

catatan

Untuk menyisipkan contoh kode ke shell Spark, ketik :paste pada prompt, tempel contoh, dan kemudian tekan CTRL + D.

Setiap kali Anda menulis DataFrame ke kumpulan data Hudi, Anda harus menentukan. DataSourceWriteOptions Banyak dari opsi ini cenderung identik antara operasi tulis. Contoh berikut menentukan pilihan umum menggunakan hudiOptions variabel, yang digunakan contoh berikutnya.

catatan

Amazon EMR 6.7.0 menggunakan Apache Hudi 0.11.0-amzn-0, yang berisi peningkatan signifikan dibandingkan versi Hudi sebelumnya. Untuk informasi selengkapnya, lihat Panduan Migrasi Apache Hudi 0.11.0. Contoh pada tab ini mencerminkan perubahan ini.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')
catatan

Anda mungkin melihat "hoodie" alih-alih Hudi dalam contoh kode dan pemberitahuan. Basis kode Hudi secara luas menggunakan ejaan "hoodie" lama.

DataSourceWriteOptions Referensi untuk Hudi
Opsi Deskripsi

TABLE_NAME

Nama tabel untuk mendaftarkan set data.

TABLE_TYPE_OPT_KEY

Tidak wajib. Menentukan apakah set data dibuat sebagai "COPY_ON_WRITE" atau "MERGE_ON_READ". Default-nya adalah "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

Bidang kunci rekam yang nilainya akan digunakan sebagai recordKey komponenHoodieKey. Nilai aktual akan diperoleh dengan meminta .toString() pada nilai bidang. Bidang bersarang dapat ditentukan menggunakan notasi titik, misalnya,. a.b.c

PARTITIONPATH_FIELD_OPT_KEY

Bidang jalur partisi yang nilainya akan digunakan sebagai partitionPath komponen dariHoodieKey. Nilai aktual akan diperoleh dengan meminta .toString() pada nilai bidang.

PRECOMBINE_FIELD_OPT_KEY

Bidang yang digunakan dalam pra-menggabungkan sebelum penulisan yang sebenarnya. Ketika dua catatan memiliki nilai kunci yang sama, Hudi memilih tyang memiliki nilai terbesar untuk bidang penggabungan sebagaimana ditentukan oleh Object.compareTo(..).

Opsi berikut diperlukan hanya untuk mendaftarkan tabel set data Hudi metastore Anda. Jika Anda tidak mendaftar set data Hudi Anda sebagai tabel di metastore Hive, pilihan ini tidak diperlukan.

DataSourceWriteOptions referensi untuk Hive
Opsi Deskripsi

HIVE_DATABASE_OPT_KEY

Basis data Hive untuk disinkronkan ke. Default-nya adalah "default".

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

Kelas yang digunakan untuk mengekstrak nilai bidang ke kolom partisi Hive.

HIVE_PARTITION_FIELDS_OPT_KEY

Bidang dalam set data yang digunakan untuk menentukan kolom partisi Hive.

HIVE_SYNC_ENABLED_OPT_KEY

Ketika diatur ke "true", daftarkan set data dengan metastore Apache Hive. Default-nya adalah "false".

HIVE_TABLE_OPT_KEY

Wajib. Nama tabel di Hive untuk disinkronkan ke. Misalnya, "my_hudi_table_cow".

HIVE_USER_OPT_KEY

Tidak wajib. Nama pengguna Hive untuk digunakan saat sinkronisasi. Misalnya, "hadoop".

HIVE_PASS_OPT_KEY

Tidak wajib. Kata sandi Hive untuk pengguna yang ditentukan oleh HIVE_USER_OPT_KEY.

HIVE_URL_OPT_KEY

URLMetastore Sarang.

Tambahkan data

Contoh berikut menunjukkan bagaimana untuk meningkatkan data dengan menulis. DataFrame Berbeda dengan contoh penyisipan sebelumnya, nilai OPERATION_OPT_KEY diatur ke UPSERT_OPERATION_OPT_VAL. Selain itu, .mode(SaveMode.Append) ditentukan untuk menunjukkan bahwa catatan harus ditambahkan.

catatan

Amazon EMR 6.7.0 menggunakan Apache Hudi 0.11.0-amzn-0, yang berisi peningkatan signifikan dibandingkan versi Hudi sebelumnya. Untuk informasi selengkapnya, lihat Panduan Migrasi Apache Hudi 0.11.0. Contoh pada tab ini mencerminkan perubahan ini.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Hapus catatan

Untuk menghapus catatan, Anda dapat menambahkan muatan kosong. Dalam hal ini, pilihan PAYLOAD_CLASS_OPT_KEY menentukan EmptyHoodieRecordPayload kelas. Contoh menggunakan yang sama DataFrame,updateDF, digunakan dalam contoh upsert untuk menentukan catatan yang sama.

catatan

Amazon EMR 6.7.0 menggunakan Apache Hudi 0.11.0-amzn-0, yang berisi peningkatan signifikan dibandingkan versi Hudi sebelumnya. Untuk informasi selengkapnya, lihat Panduan Migrasi Apache Hudi 0.11.0. Contoh pada tab ini mencerminkan perubahan ini.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Anda juga dapat menghapus data dengan menyetel OPERATION_OPT_KEY ke DELETE_OPERATION_OPT_VAL untuk menghapus semua catatan dalam set data yang Anda kirimkan. Untuk petunjuk tentang melakukan penghapusan secara halus, dan untuk informasi lebih lanjut tentang menghapus data yang disimpan dalam tabel Hudi, lihat Menghapus dalam dokumentasi Apache Hudi.

Baca dari set data Hudi

Untuk mengambil data pada saat ini, Hudi melakukan kueri snapshot secara default. Berikut ini adalah contoh mengkueri set data yang ditulis ke S3 di Tulis ke set data Hudi. Ganti s3://amzn-s3-demo-bucket/myhudidataset dengan jalur tabel Anda, dan tambahkan tanda bintang wildcard untuk setiap tingkat partisi, ditambah satu tanda bintang tambahan. Dalam contoh ini, ada satu tingkat partisi, jadi kami telah menambahkan dua simbol wildcard.

catatan

Amazon EMR 6.7.0 menggunakan Apache Hudi 0.11.0-amzn-0, yang berisi peningkatan signifikan dibandingkan versi Hudi sebelumnya. Untuk informasi selengkapnya, lihat Panduan Migrasi Apache Hudi 0.11.0. Contoh pada tab ini mencerminkan perubahan ini.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

Kueri tambahan

Anda juga dapat melakukan kueri tambahan dengan Hudi untuk mendapatkan aliran catatan yang telah berubah sejak stempel waktu komit diberikan. Untuk melakukannya, atur QUERY_TYPE_OPT_KEY bidang ke QUERY_TYPE_INCREMENTAL_OPT_VAL. Kemudian, tambahkan nilai BEGIN_INSTANTTIME_OPT_KEY untuk mendapatkan semua catatan yang ditulis sejak waktu yang ditentukan. Permintaan tambahan biasanya sepuluh kali lebih efisien daripada rekan batch mereka karena hanya memproses catatan yang berubah.

Ketika Anda melakukan kueri tambahan, gunakan jalur tabel akar (dasar) tanpa tanda bintang wildcard yang digunakan untuk kueri Snapshot.

catatan

Presto tidak mendukung kueri tambahan.

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

Untuk informasi lebih lanjut tentang membaca dari set data Hudi, lihat Mengkueri tabel Hudi dalam dokumentasi Apache Hudi.