EMRFS S3-optimized遞交者的需求 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

EMRFS S3-optimized遞交者的需求

當符合下列條件時,會使用 EMRFS S3-optimized遞交器:

  • 您可以執行使用 Spark SQL DataFrames、 或資料集將檔案寫入 Amazon S3 的 Spark 任務。從 Amazon EMR 6.4.0 開始,此遞交器可用於所有常用格式ORC,包括 parquet、 和文字型格式 (包括 CSV和 JSON)。對於 Amazon EMR 6.4.0 之前的版本,僅支援 Parquet 格式。

  • 在 Amazon EMR 中啟用分段上傳。此為預設值。如需詳細資訊,請參閱EMRFS S3-optimized遞交者和分段上傳

  • 使用 Spark 的內置檔案格式支援。在以下情況會使用內建的檔案格式支援:

    • 對於 Hive 中繼存放區資料表,當 spark.sql.hive.convertMetastoreParquet 設為true適用於 Parquet 資料表,或 spark.sql.hive.convertMetastoreOrc 設為true適用於具有 Amazon 6.4.0 EMR 或更高版本的 Orc 資料表。這些是預設設定。

    • 當作業寫入至檔案格式資料來源或資料表時,例如使用 USING parquet 子句建立目標資料表時。

    • 當任務寫入至未分割 Hive 中繼存放區 Parquet 資料表時。已知限制是 Spark 的內建 Parquet 支援並不支援已分割的 Hive 資料表。如需詳細資訊,請參閱 Apache Spark SQL DataFrames 和資料集指南中的 Hive 中繼存放區 Parquet 資料表轉換

  • 寫入至預設分割區位置 Spark 作業操作,例如 ${table_location}/k1=v1/k2=v2/,使用遞交者。如果作業操作寫入至自訂分割區位置,則不使用遞交者,例如使用 ALTER TABLE SQL 命令設定自訂分割區位置。

  • 必須使用下列用於 Spark 的值:

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled 屬性必須設為 true。這是 Amazon EMR 5.20.0 及更新版本的預設設定。使用 Amazon EMR 5.19.0 時,預設值為 false。如需如何設定此值的詳細資訊,請參閱 為 Amazon 5.19.0 啟用 EMRFS S3-optimized遞交者 EMR

    • 如果寫入至非分割的 Hive 中繼存放區資料表,則僅支援 Parquet 和 Orc 檔案格式。true如果寫入至非分割的 Parquet Hive 中繼存放區資料表,spark.sql.hive.convertMetastoreParquet則必須設定為 。true如果寫入至非分割的 Orc Hive 中繼存放區資料表,spark.sql.hive.convertMetastoreOrc則必須設定為 。這些是預設設定。

    • spark.sql.parquet.output.committer.class 必須設定為 com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter。這是預設設定。

    • spark.sql.sources.commitProtocolClass 必須設定為 org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocolorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol是 Amazon EMR 5.x 系列 5.30.0 版和更新版本的預設設定,而 Amazon EMR 6.x 系列 6.2.0 版和更新版本則預設設定。 org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol是先前 Amazon EMR版本的預設設定。

    • 如果 Spark 任務使用動態分割區欄覆寫分割的 Parquet 資料集,則 partitionOverwriteMode 寫入選項和 spark.sql.sources.partitionOverwriteMode 必須設為 static。這是預設設定。

      注意

      partitionOverwriteMode 寫入選項已導入至 Spark 2.4.0。對於 Amazon 5.19.0 版隨附的 Spark 2.3.2 EMR版,設定 spark.sql.sources.partitionOverwriteMode 屬性。

不使用 EMRFS S3-optimized遞交者的情況

一般而言,EMRFSS3-optimized遞交者不會在下列情況中使用。

情形 為什麼不使用遞交者
當您寫入 時 HDFS 遞交者僅支援使用 寫入 Amazon S3EMRFS。
當您使用 S3A 檔案系統時 遞交者僅支援 EMRFS。
當您使用 MapReduce 或 Spark 的 RDD API 遞交者僅支援使用 Spark SQL DataFrame、 或資料集 APIs。

下列 Scala 範例示範一些其他情況,這些情況會阻止 EMRFS S3-optimized遞交者完全使用 (第一個範例) 和部分使用 (第二個範例)。

範例 – 動態分割區覆寫模式

下列 Scala 範例會指示 Spark 使用不同的遞交演算法,以防止完全使用 EMRFS S3-optimized遞交者。程式碼將 partitionOverwriteMode 屬性設為 dynamic,僅覆寫您要寫入資料的分割區。然後,由 partitionBy 指定動態分割區資料欄,且寫入模式被設為 overwrite

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://EXAMPLE-DOC-BUCKET/output")

您必須設定所有三個設定,以避免使用 EMRFS S3-optimized遞交者。當您這樣做時,Spark 會執行在 Spark 遞交通訊協定中指定的其他遞交演算法。對於 5.30.0 之前的 Amazon EMR 5.x 版本,以及 6.2.0 EMR 之前的 Amazon 6.x 版本,遞交通訊協定會使用 Spark 的暫存目錄,這是在以 開頭的輸出位置下建立的暫存目錄.spark-staging。該演算法會按順序重新命名分割區目錄,這可能對效能產生負面影響。如需 Amazon 5.30.0 及更新EMR版本以及 6.2.0 及更新版本的詳細資訊,請參閱 使用 EMRFS S3-optimized遞交通訊協定

Spark 2.4.0 中的演算法遵循以下步驟:

  1. 任務會試圖將輸出寫入 Spark 暫存目錄下的分割區目錄,例如 ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/

  2. 針對每個寫入的分割區,該任務會試圖保持相對分割區路徑的追蹤,例如 k1=v1/k2=v2

  3. 在任務成功完成後,它會將所有追蹤的相對分割區路徑提供給驅動程式。

  4. 完成所有任務後,該任務遞交階段將收集在 Spark 臨時目錄下,所有成功任務嘗試寫入的分割區目錄。Spark 使用目錄樹狀圖重新命名操作,按順序將每個目錄重新命名為其最終輸出位置。

  5. 在任務遞交階段完成之前刪除臨時目錄。

範例 – 自訂分割區位置

在此範例中,該 Scala 程式碼將插入至兩個分割區。其中一個分割區有自訂的分割區位置。另一個分割區使用預設分割區位置。EMRFS S3-optimized遞交器僅用於將任務輸出寫入使用預設分割區位置的分割區。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala 程式碼會建立以下 Amazon S3 物件:

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

在寫入至自訂位置的分割區時,Spark 會使用一個和上一個範例相似的遞交演算法,如下所述。如之前的範例所示,該演算法會導致順序重新命名,這可能會對效能產生負面影響。

  1. 在將輸出寫入自訂位置的分割區時,任務會寫入至 Spark 臨時目錄下的檔案 (該目錄建立在最終輸出位置下)。檔案的名稱包含隨機UUID,以防止檔案衝突。該任務會嘗試追蹤每個檔案以及最終所要的輸出路徑。

  2. 在任務成功完成後,它會將檔案和其最終所要之輸出路徑提供給驅動程式。

  3. 完成所有任務後,任務遞交階段會依序將所有為分割區寫入至自訂位置的檔案,重新命名為其最終輸出路徑。

  4. 在任務遞交階段完成之前刪除臨時目錄。