Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Persyaratan untuk komitter yang EMRFS dioptimalkan S3
Komitter yang EMRFS dioptimalkan S3 digunakan ketika kondisi berikut terpenuhi:
-
Anda menjalankan pekerjaan Spark yang menggunakan Spark, DataFrames, atau Datasets untuk menulis file ke Amazon S3. Dimulai dengan Amazon EMR 6.4.0, committer ini dapat digunakan untuk semua format umum termasuk parket,ORC, dan format berbasis teks (termasuk dan). CSV JSON Untuk rilis sebelum Amazon EMR 6.4.0, hanya format Parket yang didukung.
-
Unggahan multibagian diaktifkan di Amazon. EMR Ini adalah opsi default. Untuk informasi selengkapnya, lihat Komitter yang EMRFS dioptimalkan S3 dan unggahan multipart.
-
Dukungan format file bawaan Spark digunakan. Dukungan format file bawaan digunakan dalam keadaan berikut:
-
Untuk tabel metastore Hive, kapan
spark.sql.hive.convertMetastoreParquet
diatur ketrue
untuk tabel Parket, atauspark.sql.hive.convertMetastoreOrc
diatur ke untuk tabel Orc dengan Amazontrue
EMR 6.4.0 atau lebih tinggi. Ini adalah pengaturan default. -
Saat pekerjaan menulis ke sumber data format file atau tabel — misalnya, tabel target dibuat dengan klausa.
USING parquet
-
Ketika pekerjaan menulis ke tabel Parket non-dipartisi Hive metastore. built-in dukungan Parket Spark tidak mendukung tabel Hive dipartisi, yang merupakan keterbatasan diketahui. Untuk informasi lebih lanjut, lihat Konversi tabel Parket metastore Hive
di Apache Spark, and Datasets Guide. DataFrames
-
-
Spark pekerjaan operasi yang menulis ke lokasi partisi default — misalnya,
${table_location}/k1=v1/k2=v2/
—gunakan committer. Committer tidak digunakan jika operasi pekerjaan menulis ke lokasi partisi kustom—misalnya, jika lokasi partisi kustom disetel menggunakanALTER TABLE SQL
perintah. -
Nilai berikut untuk Spark mesti digunakan:
-
Parameter
spark.sql.parquet.fs.optimized.committer.optimization-enabled
properti harus diatur ketrue
. Ini adalah pengaturan default dengan Amazon EMR 5.20.0 dan yang lebih baru. Dengan Amazon EMR 5.19.0, nilai defaultnya adalah.false
Untuk informasi tentang mengonfigurasi retensi, lihat Aktifkan committer yang EMRFS dioptimalkan S3 untuk Amazon 5.19.0 EMR. -
Jika menulis ke tabel metastore Hive yang tidak dipartisi, hanya format file Parket dan Orc yang didukung.
spark.sql.hive.convertMetastoreParquet
harus diatur ketrue
jika menulis ke tabel metastore Parquet Hive yang tidak dipartisi.spark.sql.hive.convertMetastoreOrc
harus disetel ketrue
if writing ke tabel metastore Orc Hive yang tidak dipartisi. Ini adalah pengaturan default. -
spark.sql.parquet.output.committer.class
harus diatur kecom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
. Ini adalah pengaturan default. -
spark.sql.sources.commitProtocolClass
harus diatur keorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
atauorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
.org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
adalah pengaturan default untuk seri Amazon EMR 5.x versi 5.30.0 dan lebih tinggi, dan untuk seri Amazon EMR 6.x versi 6.2.0 dan lebih tinggi.org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
adalah pengaturan default untuk EMR versi Amazon sebelumnya. -
Jika pekerjaan Spark menimpa dataset Parket dipartisi dengan kolom partisi dinamis, maka
partitionOverwriteMode
tulis opsi danspark.sql.sources.partitionOverwriteMode
harus diatur kestatic
. Ini adalah pengaturan default.catatan
Parameter
partitionOverwriteMode
menulis opsi diperkenalkan di Spark 2.4.0. Untuk Spark versi 2.3.2, disertakan dengan EMR rilis Amazon 5.19.0, atur properti.spark.sql.sources.partitionOverwriteMode
-
Kesempatan ketika committer yang EMRFS dioptimalkan S3 tidak digunakan
Umumnya, committer yang EMRFS dioptimalkan S3 tidak digunakan dalam situasi berikut.
Situasi | Mengapa committer tidak digunakan |
---|---|
Ketika Anda menulis ke HDFS | Komitter hanya mendukung penulisan ke Amazon EMRFS S3 menggunakan. |
Saat Anda menggunakan sistem file S3A | Komitter hanya mendukungEMRFS. |
Saat Anda menggunakan MapReduce atau Spark RDD API | Committer hanya mendukung penggunaan SparkSQL, DataFrame, atau Dataset. APIs |
Contoh Scala berikut menunjukkan beberapa situasi tambahan yang mencegah committer yang EMRFS dioptimalkan S3 digunakan secara keseluruhan (contoh pertama) dan sebagian (contoh kedua).
contoh — Mode penimpaan partisi dinamis
Contoh Scala berikut menginstruksikan Spark untuk menggunakan algoritma komit yang berbeda, yang mencegah penggunaan committer yang dioptimalkan EMRFS S3 sama sekali. Kode menetapkan partitionOverwriteMode
properti dynamic
untuk menimpa hanya partisi yang Anda tulis data. Kemudian, kolom partisi dinamis ditentukan olehpartitionBy
, dan mode tulis diatur keoverwrite
.
val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://
amzn-s3-demo-bucket1
/output")
Anda harus mengonfigurasi ketiga pengaturan untuk menghindari penggunaan committer yang EMRFS dioptimalkan S3. Ketika Anda melakukannya, Spark mengeksekusi algoritma komit berbeda yang ditentukan dalam protokol komit Spark. Untuk rilis Amazon EMR 5.x lebih awal dari 5.30.0 dan untuk rilis Amazon EMR 6.x lebih awal dari 6.2.0, protokol komit menggunakan direktori pementasan Spark, yang merupakan direktori sementara yang dibuat di bawah lokasi keluaran yang dimulai dengan. .spark-staging
Algoritma secara berurutan mengganti nama direktori partisi, yang dapat berdampak negatif pada kinerja. Untuk informasi selengkapnya tentang Amazon EMR merilis 5.30.0 dan yang lebih baru dan 6.2.0 dan yang lebih baru, lihat. Gunakan protokol komit yang EMRFS dioptimalkan S3
Algoritma di Spark 2.4.0 mengikuti langkah-langkah berikut:
-
Upaya tugas menulis output mereka ke partisi direktori di bawah direktori pementasan Spark — misalnya,
${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/
. -
Untuk setiap partisi yang ditulis, percobaan tugas terus melacak path partisi relatif—misalnya,
k1=v1/k2=v2
. -
Ketika suatu tugas berhasil diselesaikan, tugas terkait menyediakan semua jalur partisi secara relatif yang dilacaknya kepada driver.
-
Setelah semua tugas selesai, pekerjaan commit fase mengumpulkan semua direktori partisi yang berhasil tugas mencoba menulis di bawah direktori pementasan Spark ini. Spark berurutan mengganti nama masing-masing direktori ini ke lokasi output akhir menggunakan pohon direktori mengubah nama operasi.
-
Direktori pementasan dihapus sebelum pekerjaan komit fase selesai.
contoh - Lokasi partisi kustom
Dalam contoh ini, kode Scala menyisipkan dalam dua partisi. Satu partisi memiliki lokasi partisi kustom. Partisi lain menggunakan lokasi partisi default. Komitter yang EMRFS dioptimalkan S3 hanya digunakan untuk menulis output tugas ke partisi yang menggunakan lokasi partisi default.
val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)
Kode Scala menciptakan objek Amazon S3 berikut:
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
Ketika menulis ke partisi di lokasi kustom, Spark menggunakan algoritma komit mirip dengan contoh sebelumnya, yang diuraikan di bawah ini. Seperti contoh sebelumnya, algoritme menghasilkan penggantian nama berurutan, yang dapat berdampak negatif pada kinerja.
-
Ketika menulis output ke partisi di lokasi kustom, tugas menulis ke file di bawah direktori pementasan Spark ini, yang dibuat di bawah lokasi output akhir. Nama file termasuk acak UUID untuk melindungi terhadap tabrakan file. Upaya tugas melacak setiap file bersama dengan path output akhir yang diinginkan.
-
Ketika tugas selesai berhasil, menyediakan driver dengan file dan akhir yang diinginkan output jalan mereka.
-
Setelah semua tugas selesai, pekerjaan commit fase berurutan mengganti nama semua file yang ditulis untuk partisi di lokasi kustom ke jalur output akhir mereka.
-
Direktori pementasan dihapus sebelum pekerjaan komit fase selesai.