シャッフルの最適化 -

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

シャッフルの最適化

join() や などの特定のオペレーションではgroupByKey()、Spark でシャッフルを実行する必要があります。シャッフルは、データがRDDパーティション間で異なるようにグループ化されるように、Spark がデータを再分散するメカニズムです。シャッフルは、パフォーマンスのボトルネックを修正するのに役立ちます。ただし、シャッフルには通常 Spark エグゼキュター間でのデータのコピーが含まれるため、シャッフルは複雑でコストがかかる操作です。例えば、シャッフルでは次のコストが発生します。

  • ディスク I/O:

    • ディスク上に多数の中間ファイルを生成します。

  • ネットワーク I/O:

    • 多くのネットワーク接続が必要です (接続数 = Mapper × Reducer)。

    • レコードは別の Spark エグゼキュターでホストされている可能性のある新しいRDDパーティションに集約されるため、データセットの大部分がネットワーク経由で Spark エグゼキュター間で移動する可能性があります。

  • CPU およびメモリ負荷:

    • 値をソートし、データセットをマージします。これらのオペレーションはエグゼキュターで計画され、エグゼキュターに負荷がかかります。

シャッフルは、Spark アプリケーションのパフォーマンス低下における最も重要な要因の 1 つです。中間データの保存中に、エグゼキュターのローカルディスクの領域が枯渇し、Spark ジョブが失敗する可能性があります。

シャッフルパフォーマンスは、 CloudWatch メトリクスと Spark UI で評価できます。

CloudWatch メトリクス

シャッフルバイト書き込み値がシャッフルバイト読み取り と比較して高い場合、Spark ジョブは join()や などのシャッフルオペレーションを使用することがありますgroupByKey()

書き込まれたシャッフルバイトのスパイクを示すエグゼキュター間のデータシャッフル (バイト) グラフ。

Spark UI

Spark UI のステージタブで、シャッフル読み取りサイズ/レコードの値を確認できます。エグゼキュタータブでも確認できます。

次のスクリーンショットでは、各エグゼキュターはシャッフルプロセスで約 18.6 GB/4020000 レコードを交換でき、合計シャッフル読み取りサイズは約 75 GB) です。

Shuffle Spill (Disk) 列には大量のデータスピルメモリがディスクに表示され、ディスクがいっぱいになったり、パフォーマンスの問題が発生したりする可能性があります。

""

これらの症状を観察し、パフォーマンス目標と比較してステージに時間がかかりすぎる場合、または Out Of Memoryまたは No space left on device エラーで失敗する場合は、次の解決策を検討してください。

結合の最適化

テーブルを結合する join()オペレーションは、最も一般的に使用されるシャッフルオペレーションですが、多くの場合、パフォーマンスのボトルネックです。結合はコストのかかる操作であるため、ビジネス要件に不可欠な場合を除き、使用しないことをおすすめします。次の質問をして、データパイプラインを効率的に使用していることを再確認します。

  • 再利用できる他のジョブでも実行される結合を再計算していますか?

  • 外部キーを出力のコンシューマーが使用していない値に解決するために参加していますか?

参加オペレーションがビジネス要件に不可欠であることを確認したら、要件を満たす方法で参加を最適化するための以下のオプションを参照してください。

参加前にプッシュダウンを使用する

結合を実行する DataFrame 前に、 で不要な行と列を除外します。これには次の利点があります。

  • シャッフル中のデータ転送量を削減

  • Spark エグゼキュターの処理量を削減します。

  • データスキャンの量を削減

# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])

Join DataFrame を使用する

RDD API または DynamicFrame 結合の代わりに Spark 、、データセットなどの Spark の高レベルAPIの使用を試してください。SQL DataFrameなどのメソッド呼び出し DataFrame を使用して DynamicFrame に変換できますdyf.toDF()Apache Spark の「キートピック」セクションで説明したように、これらの結合オペレーションは、Catalyst オプティマイザによるクエリ最適化を内部的に活用します。

ハッシュ結合とヒントのシャッフルとブロードキャスト

Spark は、シャッフル結合とブロードキャストハッシュ結合の 2 種類の結合をサポートしています。ブロードキャストハッシュ結合にはシャッフルは不要で、シャッフル結合よりも処理が少なくて済みます。ただし、小さなテーブルを大きなテーブルに結合する場合にのみ適用されます。単一の Spark エグゼキュターのメモリに収まるテーブルを結合する場合は、ブロードキャストハッシュ結合の使用を検討してください。

次の図は、ブロードキャストハッシュ結合とシャッフル結合の大まかな構造とステップを示しています。

テーブルと結合テーブル間の直接接続によるブロードキャスト結合、およびテーブルと結合テーブル間の 2 つのシャッフルフェーズによるシャッフル結合。

各結合の詳細は次のとおりです。

  • シャッフル結合:

    • シャッフルハッシュ結合は、ソートせずに 2 つのテーブルを結合し、2 つのテーブル間に結合を分散します。Spark エグゼキューターのメモリに保存できる小さなテーブルの結合に適しています。

    • ソートマージ結合は、結合する 2 つのテーブルをキーで分散し、結合する前にソートします。大きなテーブルの結合に適しています。

  • ブロードキャストハッシュ結合:

    • ブロードキャストハッシュ結合は、小さい RDDまたは テーブルを各ワーカーノードにプッシュします。次に、大きな RDDまたはテーブルの各パーティションとマップ側の結合を行います。

      これは、 RDDsまたは テーブルのいずれかがメモリに収まるか、メモリに収まるように作成できる結合に適しています。ブロードキャストハッシュ結合はシャッフルを必要としないため、可能な限り行うことをお勧めします。結合ヒントを使用して、次のように Spark からブロードキャスト結合をリクエストできます。

      # DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

      結合ヒントの詳細については、「結合ヒント」を参照してください。

AWS Glue 3.0 以降では、アダプティブクエリの実行と追加のパラメータを有効にすることで、ブロードキャストハッシュ結合を自動的に利用できます。アダプティブクエリの実行は、いずれかの結合側のランタイム統計がアダプティブブロードキャストハッシュ結合しきい値よりも小さい場合に、ソートマージ結合をブロードキャストハッシュ結合に変換します。

AWS Glue 3.0 では、 を設定することで、アダプティブクエリの実行を有効にできますspark.sql.adaptive.enabled=true。アダプティブクエリの実行は、 Glue AWS 4.0 でデフォルトで有効になっています。

シャッフルとブロードキャストハッシュ結合に関連する追加のパラメータを設定できます。

  • spark.sql.adaptive.localShuffleReader.enabled

  • spark.sql.adaptive.autoBroadcastJoinThreshold

関連するパラメータの詳細については、「ソートマージ結合をブロードキャスト結合に変換する」を参照してください。

AWS Glue 3.0 以降では、シャッフルに他の結合ヒントを使用して動作を調整できます。

-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

バケット化を使用する

ソートマージ結合には、シャッフルとソートの 2 つのフェーズが必要で、その後マージします。これらの 2 つのフェーズでは、Spark エグゼキュターが過負荷になり、一部のエグゼキュターがマージしていて、他のエグゼキュターが同時にソートされている場合に および OOMのパフォーマンスの問題が発生する可能性があります。このような場合、バケット を使用して効率的に結合できる場合があります。バケット化は、結合キーの入力を事前にシャッフルして事前にソートし、ソートされたデータを中間テーブルに書き込みます。ソートされた中間テーブルを事前に定義することで、大きなテーブルを結合する場合のシャッフルステップとソートステップのコストを削減できます。

ソートマージ結合には、追加のシャッフルステップとソートステップがあります。

バケット化されたテーブルは、次の場合に便利です。

  • などの同じキーで頻繁に結合されるデータ account_id

  • 共通列にバケット化できるベーステーブルやデルタテーブルなどの日次累積テーブルをロードする

次のコードを使用して、バケット化されたテーブルを作成できます。

df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")

結合前の結合キー DataFrames の再パーティション

結合の前に結合キー DataFrames で 2 つの を再パーティションするには、次のステートメントを使用します。

df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")

これにより、結合を開始する前に、結合キーRDDsで 2 つ (まだ分離されています) がパーティション化されます。2 つの RDDsが同じパーティション化コードを持つ同じキーでパーティション化されている場合、結合する計画RDDが結合のシャッフル前に同じワーカーにコロケーションされる可能性が高くなります。これにより、結合中のネットワークアクティビティとデータスキューが減少し、パフォーマンスが向上する可能性があります。

データスキューを克服する

データスキューは、Spark ジョブのボトルネックの最も一般的な原因の 1 つです。これは、データがRDDパーティション間で均一に分散されていない場合に発生します。これにより、そのパーティションのタスクが他のパーティションよりもはるかに時間がかかり、アプリケーションの全体的な処理時間が遅延します。

データスキューを特定するには、Spark UI で次のメトリクスを評価します。

  • Spark UI のステージタブで、イベントタイムラインページを確認します。次のスクリーンショットでは、タスクの不均等な分布を確認できます。不均等に分散されているタスクや実行に時間がかかりすぎるタスクは、データスキューを示している可能性があります。

    エグゼキュターのコンピューティング時間は、あるタスクでは他のタスクよりもはるかに長くなります。
  • もう 1 つの重要なページは、Spark タスクの統計を表示する概要メトリクス です。次のスクリーンショットは、期間 GC 時間 、スピル (メモリ)、スピル (ディスク) などのパーセンタイルを持つメトリクスを示しています。

    期間行が強調表示されたサマリーメトリクステーブル。

    タスクが均等に分散されると、すべてのパーセンタイルに同様の数値が表示されます。データスキューがあると、各パーセンタイルに非常にバイアスされた値が表示されます。この例では、最小 、25 番目のパーセンタイル 中央値 、および 75 番目のパーセンタイル のタスク期間は 13 秒未満です。 Max タスクは 75 パーセンタイル の 100 倍多くのデータを処理しましたが、6.4 分の期間は約 30 倍長くなります。つまり、少なくとも 1 つのタスク (またはタスクの 25% まで) が残りのタスクよりもはるかに長くかかったということです。

データスキューが表示された場合は、以下を試してください。

  • AWS Glue 3.0 を使用する場合は、 を設定してアダプティブクエリの実行を有効にしますspark.sql.adaptive.enabled=true。アダプティブクエリの実行は AWS Glue 4.0 でデフォルトで有効になっています。

    次の関連パラメータを設定することで、結合によって導入されたデータスキューに Adaptive Query Execution を使用することもできます。

    • spark.sql.adaptive.skewJoin.skewedPartitionFactor

    • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

    • spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good)

    • spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)

    詳細については、「Apache Spark ドキュメント」を参照してください。

  • 結合キーには、幅広い値を持つキーを使用します。シャッフル結合では、パーティションはキーのハッシュ値ごとに決定されます。結合キーのカーディナリティが低すぎる場合、ハッシュ関数はパーティション間でデータを分散する不正なジョブを実行する可能性が高くなります。したがって、アプリケーションとビジネスロジックでサポートされている場合は、より高い基数キーまたは複合キーを使用することを検討してください。

    # Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])

キャッシュを使用する

反復的な を使用する場合は DataFrames、 または を使用して計算結果を各 Spark エグゼキューターのメモリとディスクにdf.persist()キャッシュすることで、追加のシャッフルdf.cache()や計算を避けます。Spark は、ディスクRDDsでの保持または複数のノード (ストレージレベル ) でのレプリケーションもサポートしています。

例えば、 DataFrames を追加して を保持できますdf.persist()。キャッシュが不要になった場合は、 unpersistを使用してキャッシュされたデータを破棄できます。

df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()

不要な Spark アクションを削除する

count、、showまたは などの不要なアクションを実行することは避けてくださいcollect「Apache Spark の主要トピック」セクションで説明したように、Spark は遅延しています。変換された各 は、アクションを実行するたびに再計算RDDされる場合があります。Spark アクションを多数使用すると、アクションごとに複数のソースアクセス、タスク計算、シャッフル実行が呼び出されます。

商用環境で collect()やその他のアクションが必要ない場合は、それらを削除することを検討してください。

注記

Spark collect()は商用環境でできるだけ使用しないでください。collect() アクションは、Spark エグゼキュターの計算のすべての結果を Spark ドライバーに返します。これにより、Spark ドライバーがOOMエラーを返す可能性があります。OOM エラーを回避するために、Spark spark.driver.maxResultSize = 1GBはデフォルトで を設定します。これにより、Spark ドライバーに返される最大データサイズが 1 GB に制限されます。