Persyaratan untuk komitter yang EMRFS dioptimalkan S3 - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Persyaratan untuk komitter yang EMRFS dioptimalkan S3

Komitter yang EMRFS dioptimalkan S3 digunakan ketika kondisi berikut terpenuhi:

  • Anda menjalankan pekerjaan Spark yang menggunakan Spark, DataFrames, atau Datasets untuk menulis file ke Amazon S3. Dimulai dengan Amazon EMR 6.4.0, committer ini dapat digunakan untuk semua format umum termasuk parket,ORC, dan format berbasis teks (termasuk dan). CSV JSON Untuk rilis sebelum Amazon EMR 6.4.0, hanya format Parket yang didukung.

  • Unggahan multibagian diaktifkan di Amazon. EMR Ini adalah opsi default. Untuk informasi selengkapnya, lihat Komitter yang EMRFS dioptimalkan S3 dan unggahan multipart.

  • Dukungan format file bawaan Spark digunakan. Dukungan format file bawaan digunakan dalam keadaan berikut:

    • Untuk tabel metastore Hive, kapan spark.sql.hive.convertMetastoreParquet diatur ke true untuk tabel Parket, atau spark.sql.hive.convertMetastoreOrc diatur ke untuk tabel Orc dengan Amazon true EMR 6.4.0 atau lebih tinggi. Ini adalah pengaturan default.

    • Saat pekerjaan menulis ke sumber data format file atau tabel — misalnya, tabel target dibuat dengan klausa. USING parquet

    • Ketika pekerjaan menulis ke tabel Parket non-dipartisi Hive metastore. built-in dukungan Parket Spark tidak mendukung tabel Hive dipartisi, yang merupakan keterbatasan diketahui. Untuk informasi lebih lanjut, lihat Konversi tabel Parket metastore Hive di Apache Spark, and Datasets Guide. DataFrames

  • Spark pekerjaan operasi yang menulis ke lokasi partisi default — misalnya, ${table_location}/k1=v1/k2=v2/—gunakan committer. Committer tidak digunakan jika operasi pekerjaan menulis ke lokasi partisi kustom—misalnya, jika lokasi partisi kustom disetel menggunakan ALTER TABLE SQL perintah.

  • Nilai berikut untuk Spark mesti digunakan:

    • Parameter spark.sql.parquet.fs.optimized.committer.optimization-enabled properti harus diatur ke true. Ini adalah pengaturan default dengan Amazon EMR 5.20.0 dan yang lebih baru. Dengan Amazon EMR 5.19.0, nilai defaultnya adalah. false Untuk informasi tentang mengonfigurasi retensi, lihat Aktifkan committer yang EMRFS dioptimalkan S3 untuk Amazon 5.19.0 EMR.

    • Jika menulis ke tabel metastore Hive yang tidak dipartisi, hanya format file Parket dan Orc yang didukung. spark.sql.hive.convertMetastoreParquetharus diatur ke true jika menulis ke tabel metastore Parquet Hive yang tidak dipartisi. spark.sql.hive.convertMetastoreOrcharus disetel ke true if writing ke tabel metastore Orc Hive yang tidak dipartisi. Ini adalah pengaturan default.

    • spark.sql.parquet.output.committer.class harus diatur ke com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter. Ini adalah pengaturan default.

    • spark.sql.sources.commitProtocolClassharus diatur ke org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol atauorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol. org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocoladalah pengaturan default untuk seri Amazon EMR 5.x versi 5.30.0 dan lebih tinggi, dan untuk seri Amazon EMR 6.x versi 6.2.0 dan lebih tinggi. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocoladalah pengaturan default untuk EMR versi Amazon sebelumnya.

    • Jika pekerjaan Spark menimpa dataset Parket dipartisi dengan kolom partisi dinamis, maka partitionOverwriteMode tulis opsi dan spark.sql.sources.partitionOverwriteMode harus diatur ke static. Ini adalah pengaturan default.

      catatan

      Parameter partitionOverwriteMode menulis opsi diperkenalkan di Spark 2.4.0. Untuk Spark versi 2.3.2, disertakan dengan EMR rilis Amazon 5.19.0, atur properti. spark.sql.sources.partitionOverwriteMode

Kesempatan ketika committer yang EMRFS dioptimalkan S3 tidak digunakan

Umumnya, committer yang EMRFS dioptimalkan S3 tidak digunakan dalam situasi berikut.

Situasi Mengapa committer tidak digunakan
Ketika Anda menulis ke HDFS Komitter hanya mendukung penulisan ke Amazon EMRFS S3 menggunakan.
Saat Anda menggunakan sistem file S3A Komitter hanya mendukungEMRFS.
Saat Anda menggunakan MapReduce atau Spark RDD API Committer hanya mendukung penggunaan SparkSQL, DataFrame, atau Dataset. APIs

Contoh Scala berikut menunjukkan beberapa situasi tambahan yang mencegah committer yang EMRFS dioptimalkan S3 digunakan secara keseluruhan (contoh pertama) dan sebagian (contoh kedua).

contoh — Mode penimpaan partisi dinamis

Contoh Scala berikut menginstruksikan Spark untuk menggunakan algoritma komit yang berbeda, yang mencegah penggunaan committer yang dioptimalkan EMRFS S3 sama sekali. Kode menetapkan partitionOverwriteMode properti dynamic untuk menimpa hanya partisi yang Anda tulis data. Kemudian, kolom partisi dinamis ditentukan olehpartitionBy, dan mode tulis diatur keoverwrite.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://amzn-s3-demo-bucket1/output")

Anda harus mengonfigurasi ketiga pengaturan untuk menghindari penggunaan committer yang EMRFS dioptimalkan S3. Ketika Anda melakukannya, Spark mengeksekusi algoritma komit berbeda yang ditentukan dalam protokol komit Spark. Untuk rilis Amazon EMR 5.x lebih awal dari 5.30.0 dan untuk rilis Amazon EMR 6.x lebih awal dari 6.2.0, protokol komit menggunakan direktori pementasan Spark, yang merupakan direktori sementara yang dibuat di bawah lokasi keluaran yang dimulai dengan. .spark-staging Algoritma secara berurutan mengganti nama direktori partisi, yang dapat berdampak negatif pada kinerja. Untuk informasi selengkapnya tentang Amazon EMR merilis 5.30.0 dan yang lebih baru dan 6.2.0 dan yang lebih baru, lihat. Gunakan protokol komit yang EMRFS dioptimalkan S3

Algoritma di Spark 2.4.0 mengikuti langkah-langkah berikut:

  1. Upaya tugas menulis output mereka ke partisi direktori di bawah direktori pementasan Spark — misalnya, ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/.

  2. Untuk setiap partisi yang ditulis, percobaan tugas terus melacak path partisi relatif—misalnya, k1=v1/k2=v2.

  3. Ketika suatu tugas berhasil diselesaikan, tugas terkait menyediakan semua jalur partisi secara relatif yang dilacaknya kepada driver.

  4. Setelah semua tugas selesai, pekerjaan commit fase mengumpulkan semua direktori partisi yang berhasil tugas mencoba menulis di bawah direktori pementasan Spark ini. Spark berurutan mengganti nama masing-masing direktori ini ke lokasi output akhir menggunakan pohon direktori mengubah nama operasi.

  5. Direktori pementasan dihapus sebelum pekerjaan komit fase selesai.

contoh - Lokasi partisi kustom

Dalam contoh ini, kode Scala menyisipkan dalam dua partisi. Satu partisi memiliki lokasi partisi kustom. Partisi lain menggunakan lokasi partisi default. Komitter yang EMRFS dioptimalkan S3 hanya digunakan untuk menulis output tugas ke partisi yang menggunakan lokasi partisi default.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Kode Scala menciptakan objek Amazon S3 berikut:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

Ketika menulis ke partisi di lokasi kustom, Spark menggunakan algoritma komit mirip dengan contoh sebelumnya, yang diuraikan di bawah ini. Seperti contoh sebelumnya, algoritme menghasilkan penggantian nama berurutan, yang dapat berdampak negatif pada kinerja.

  1. Ketika menulis output ke partisi di lokasi kustom, tugas menulis ke file di bawah direktori pementasan Spark ini, yang dibuat di bawah lokasi output akhir. Nama file termasuk acak UUID untuk melindungi terhadap tabrakan file. Upaya tugas melacak setiap file bersama dengan path output akhir yang diinginkan.

  2. Ketika tugas selesai berhasil, menyediakan driver dengan file dan akhir yang diinginkan output jalan mereka.

  3. Setelah semua tugas selesai, pekerjaan commit fase berurutan mengganti nama semua file yang ditulis untuk partisi di lokasi kustom ke jalur output akhir mereka.

  4. Direktori pementasan dihapus sebelum pekerjaan komit fase selesai.