EMRFS S3 向けに最適化されたコミッターの要件 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

EMRFS S3 向けに最適化されたコミッターの要件

以下の条件が満たされる場合に、EMRFS S3 向けに最適化されたコミッターが使用されます。

  • Spark、DataFrames、または Datasets を使用して Amazon S3 にファイルを書き込む Spark ジョブを実行します。Amazon EMR 6.4.0 以降では、Parquet、ORC、テキストベースの形式 (CSV と JSON を含む) など、一般的なあらゆる形式にこのコミッターを使用できます。Amazon EMR 6.4.0 より前のリリースでは、Parquet 形式のみがサポートされています。

  • Amazon EMR でマルチパートアップロードが有効になっている。これがデフォルトです。詳細については、「EMRFS S3 向けに最適化されたコミッターとマルチパートアップロード」を参照してください。

  • Spark の組み込みファイル形式のサポートが使用されます。組み込みファイル形式のサポートは以下の状況で使用されます。

    • Hive メタストアテーブルの場合、Parquet テーブルに対して spark.sql.hive.convertMetastoreParquettrue に設定される場合、または、Amazon EMR 6.4.0 以降の Orc テーブルに対して spark.sql.hive.convertMetastoreOrctrue に設定される場合。これらはデフォルトの設定です。

    • ジョブによってファイル形式のデータソースまたはテーブルに書き込まれる場合。例えば、ターゲットテーブルが USING parquet 句で作成される場合などです。

    • ジョブでパーティション分割されていない Hive メタストア Parquet テーブルに書き込む場合。Spark の組み込み Parquet サポートはパーティション分割された Hive テーブルをサポートしていません。これは既知の制限です。詳細については、「Apache Spark、DataFrames、データセットガイド」の「Hive メタストア Parquet テーブル変換」を参照してください。

  • デフォルトのパーティションの場所 (${table_location}/k1=v1/k2=v2/ など) に書き込む Spark ジョブオペレーションでコミッターが使用される。ジョブオペレーションによってカスタムのパーティション場所に書き込まれる場合、例えば、カスタムのパーティション場所が ALTER TABLE SQL コマンドを使用して設定されている場合、コミッターは使用されません。

  • Spark で以下の値を使用する。

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled プロパティを true に設定する必要があります。これは、Amazon EMR 5.20.0 以降でのデフォルト設定です。Amazon EMR 5.19.0 では、デフォルト値は false です。この値の設定については、「Amazon EMR 5.19.0 の EMRFS S3 向けに最適化されたコミッターを有効にする」を参照してください。

    • パーティション化されていない Hive メタストアテーブルに書き込む場合は、Parquet と Orc のファイル形式のみがサポートされます。パーティション化されていない Parquet Hive メタストアテーブルに書き込む場合は、spark.sql.hive.convertMetastoreParquettrue に設定する必要があります。パーティション化されていない Orc Hive メタストアテーブルに書き込む場合は、spark.sql.hive.convertMetastoreOrctrue に設定する必要があります。これらはデフォルトの設定です。

    • spark.sql.parquet.output.committer.classcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter に設定する必要があります。これはデフォルトの設定です。

    • spark.sql.sources.commitProtocolClassorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol または org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol に設定する必要があります。Amazon EMR 5.x シリーズバージョン 5.30.0 以降、および Amazon EMR 6.x シリーズバージョン 6.2.0 以降の場合、org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol がデフォルト設定です。それよりも前の Amazon EMR バージョンの場合、org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol がデフォルト設定です。

    • Spark ジョブでパーティション分割された Parquet データセットを動的パーティション列で上書きする場合は、partitionOverwriteMode 書き込みオプションと spark.sql.sources.partitionOverwriteModestatic に設定する必要があります。これはデフォルトの設定です。

      注記

      partitionOverwriteMode 書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、spark.sql.sources.partitionOverwriteMode プロパティを設定します。

EMRFS S3 用に最適化されたコミッターが使用されていない場合

一般的に、EMRFS S3 用に最適化されたコミッターは次の状況では、使用されません。

状況 コミッターが使われない理由
HDFS に書き込む場合 コミッターは、EMRFS を使用した Amazon S3 への書き込みのみをサポートします。
S3A ファイルシステムを使用する場合 コミッターは EMRFS のみをサポートします。
MapReduce または Spark の RDD API を使用する場合 コミッターは SparkSQL、DataFrame、またはデータセット API の使用のみをサポートします。

以下の Scala の例では、いくつかの追加の状況を示しています。EMRFS S3 向けに最適化されたコミッターを全体に使用しないもの (最初の例) と、部分的に使用しないもの (2 番目の例) です。

例 - 動的パーティション上書きモード

以下の Scala の例では、別のコミットアルゴリズムを使用するように Spark に指示しています。これでは、EMRFS S3 向けに最適化されたコミッターはまったく使用されません。このコードは、データを書き込むパーティションのみを上書きするように partitionOverwriteMode プロパティを dynamic に設定します。次に、動的パーティション列が partitionBy で指定され、書き込みモードが overwrite に設定されます。

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://amzn-s3-demo-bucket1/output")

EMRFS S3 向けに最適化されたコミッターが使用されないようにするには、3 つの設定をすべて構成する必要があります。これを行うと、Spark は Spark のコミットプロトコルで指定されている別のコミットアルゴリズムを実行します。5.30.0 より前の Amazon EMR 5.x リリースと 6.2.0 より前の Amazon EMR 6.x リリースの場合、コミットプロトコルは Spark のステージングディレクトリを使用します。これは、.spark-staging で始まる出力場所に作成された一時ディレクトリです。このアルゴリズムではパーティションディレクトリの名前が順番に変更されるため、パフォーマンスが低下する可能性があります。Amazon EMR リリース 5.30.0 以降および 6.2.0 以降の詳細については、「EMRFS S3 向けに最適化されたコミットプロトコルを使用する」を参照してください。

Spark 2.4.0 のアルゴリズムは以下の手順に従います。

  1. タスク試行により、Spark のステージングディレクトリ (${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/ など) の下のパーティションディレクトリに出力が書き込まれます。

  2. 書き込まれたパーティションごとに、タスク試行は相対パーティションパス (k1=v1/k2=v2 など) を管理します。

  3. タスクが正常に完了すると、追跡されたすべての相対パーティションパスがドライバーに渡されます。

  4. すべてのタスクが完了した後、ジョブのコミットフェーズでは、成功したタスクの試行によって Spark のステージングディレクトリに書き込まれたすべてのパーティションディレクトリが収集されます。ディレクトリツリーの名前変更オペレーションを使用して、これらの各ディレクトリの名前が最終的な出力場所に順番に変更されます。

  5. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。

例 - カスタムのパーティション場所

この例では、Scala コードは 2 つのパーティションを挿入します。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 向けに最適化されたコミッターは、デフォルトのパーティション場所を使用するパーティションへのタスク出力の書き込みにのみ使用されます。

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala コードは以下の Amazon S3 オブジェクトを作成します。

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。先ほどの例と同様、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスが低下する可能性があります。

  1. カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。

  2. タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。

  3. すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。

  4. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。