本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
EMRFS S3-optimized遞交者的需求
當符合下列條件時,會使用 EMRFS S3-optimized遞交器:
-
您可以執行使用 Spark SQL DataFrames、 或資料集將檔案寫入 Amazon S3 的 Spark 任務。從 Amazon EMR 6.4.0 開始,此遞交器可用於所有常用格式ORC,包括 parquet、 和文字型格式 (包括 CSV和 JSON)。對於 Amazon EMR 6.4.0 之前的版本,僅支援 Parquet 格式。
-
在 Amazon EMR 中啟用分段上傳。此為預設值。如需詳細資訊,請參閱EMRFS S3-optimized遞交者和分段上傳。
-
使用 Spark 的內置檔案格式支援。在以下情況會使用內建的檔案格式支援:
-
對於 Hive 中繼存放區資料表,當
spark.sql.hive.convertMetastoreParquet
設為true
適用於 Parquet 資料表,或spark.sql.hive.convertMetastoreOrc
設為true
適用於具有 Amazon 6.4.0 EMR 或更高版本的 Orc 資料表。這些是預設設定。 -
當作業寫入至檔案格式資料來源或資料表時,例如使用
USING parquet
子句建立目標資料表時。 -
當任務寫入至未分割 Hive 中繼存放區 Parquet 資料表時。已知限制是 Spark 的內建 Parquet 支援並不支援已分割的 Hive 資料表。如需詳細資訊,請參閱 Apache Spark SQL DataFrames 和資料集指南中的 Hive 中繼存放區 Parquet 資料表轉換
。
-
-
寫入至預設分割區位置 Spark 作業操作,例如
${table_location}/k1=v1/k2=v2/
,使用遞交者。如果作業操作寫入至自訂分割區位置,則不使用遞交者,例如使用ALTER TABLE SQL
命令設定自訂分割區位置。 -
必須使用下列用於 Spark 的值:
-
spark.sql.parquet.fs.optimized.committer.optimization-enabled
屬性必須設為true
。這是 Amazon EMR 5.20.0 及更新版本的預設設定。使用 Amazon EMR 5.19.0 時,預設值為false
。如需如何設定此值的詳細資訊,請參閱 為 Amazon 5.19.0 啟用 EMRFS S3-optimized遞交者 EMR 。 -
如果寫入至非分割的 Hive 中繼存放區資料表,則僅支援 Parquet 和 Orc 檔案格式。
true
如果寫入至非分割的 Parquet Hive 中繼存放區資料表,spark.sql.hive.convertMetastoreParquet
則必須設定為 。true
如果寫入至非分割的 Orc Hive 中繼存放區資料表,spark.sql.hive.convertMetastoreOrc
則必須設定為 。這些是預設設定。 -
spark.sql.parquet.output.committer.class
必須設定為com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
。這是預設設定。 -
spark.sql.sources.commitProtocolClass
必須設定為org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
或org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
。org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol
是 Amazon EMR 5.x 系列 5.30.0 版和更新版本的預設設定,而 Amazon EMR 6.x 系列 6.2.0 版和更新版本則預設設定。org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
是先前 Amazon EMR版本的預設設定。 -
如果 Spark 任務使用動態分割區欄覆寫分割的 Parquet 資料集,則
partitionOverwriteMode
寫入選項和spark.sql.sources.partitionOverwriteMode
必須設為static
。這是預設設定。注意
partitionOverwriteMode
寫入選項已導入至 Spark 2.4.0。對於 Amazon 5.19.0 版隨附的 Spark 2.3.2 EMR版,設定spark.sql.sources.partitionOverwriteMode
屬性。
-
不使用 EMRFS S3-optimized遞交者的情況
一般而言,EMRFSS3-optimized遞交者不會在下列情況中使用。
情形 | 為什麼不使用遞交者 |
---|---|
當您寫入 時 HDFS | 遞交者僅支援使用 寫入 Amazon S3EMRFS。 |
當您使用 S3A 檔案系統時 | 遞交者僅支援 EMRFS。 |
當您使用 MapReduce 或 Spark 的 RDD API | 遞交者僅支援使用 Spark SQL DataFrame、 或資料集 APIs。 |
下列 Scala 範例示範一些其他情況,這些情況會阻止 EMRFS S3-optimized遞交者完全使用 (第一個範例) 和部分使用 (第二個範例)。
範例 – 動態分割區覆寫模式
下列 Scala 範例會指示 Spark 使用不同的遞交演算法,以防止完全使用 EMRFS S3-optimized遞交者。程式碼將 partitionOverwriteMode
屬性設為 dynamic
,僅覆寫您要寫入資料的分割區。然後,由 partitionBy
指定動態分割區資料欄,且寫入模式被設為 overwrite
。
val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://
EXAMPLE-DOC-BUCKET
/output")
您必須設定所有三個設定,以避免使用 EMRFS S3-optimized遞交者。當您這樣做時,Spark 會執行在 Spark 遞交通訊協定中指定的其他遞交演算法。對於 5.30.0 之前的 Amazon EMR 5.x 版本,以及 6.2.0 EMR 之前的 Amazon 6.x 版本,遞交通訊協定會使用 Spark 的暫存目錄,這是在以 開頭的輸出位置下建立的暫存目錄.spark-staging
。該演算法會按順序重新命名分割區目錄,這可能對效能產生負面影響。如需 Amazon 5.30.0 及更新EMR版本以及 6.2.0 及更新版本的詳細資訊,請參閱 使用 EMRFS S3-optimized遞交通訊協定。
Spark 2.4.0 中的演算法遵循以下步驟:
-
任務會試圖將輸出寫入 Spark 暫存目錄下的分割區目錄,例如
${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/
。 -
針對每個寫入的分割區,該任務會試圖保持相對分割區路徑的追蹤,例如
k1=v1/k2=v2
。 -
在任務成功完成後,它會將所有追蹤的相對分割區路徑提供給驅動程式。
-
完成所有任務後,該任務遞交階段將收集在 Spark 臨時目錄下,所有成功任務嘗試寫入的分割區目錄。Spark 使用目錄樹狀圖重新命名操作,按順序將每個目錄重新命名為其最終輸出位置。
-
在任務遞交階段完成之前刪除臨時目錄。
範例 – 自訂分割區位置
在此範例中,該 Scala 程式碼將插入至兩個分割區。其中一個分割區有自訂的分割區位置。另一個分割區使用預設分割區位置。EMRFS S3-optimized遞交器僅用於將任務輸出寫入使用預設分割區位置的分割區。
val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)
Scala 程式碼會建立以下 Amazon S3 物件:
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$
在寫入至自訂位置的分割區時,Spark 會使用一個和上一個範例相似的遞交演算法,如下所述。如之前的範例所示,該演算法會導致順序重新命名,這可能會對效能產生負面影響。
-
在將輸出寫入自訂位置的分割區時,任務會寫入至 Spark 臨時目錄下的檔案 (該目錄建立在最終輸出位置下)。檔案的名稱包含隨機UUID,以防止檔案衝突。該任務會嘗試追蹤每個檔案以及最終所要的輸出路徑。
-
在任務成功完成後,它會將檔案和其最終所要之輸出路徑提供給驅動程式。
-
完成所有任務後,任務遞交階段會依序將所有為分割區寫入至自訂位置的檔案,重新命名為其最終輸出路徑。
-
在任務遞交階段完成之前刪除臨時目錄。