翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Spark の主なトピック
このセクションでは、Apache Spark のパフォーマンスを調整するための Apache AWS Glue Spark の基本概念と主要なトピックについて説明します。実際の調整戦略について説明する前に、これらの概念とトピックを理解することが重要です。
アーキテクチャ
Spark ドライバーは主に、Spark アプリケーションを個々のワーカーで達成できるタスクに分割する役割を担います。Spark ドライバーには以下の責任があります。
-
コード
main()
での実行 -
実行プランの生成
-
クラスター上のリソースを管理するクラスターマネージャーと連動した Spark エグゼキュターのプロビジョニング
-
Spark エグゼキュターのタスクのスケジュール設定とタスクのリクエスト
-
タスクの進行状況と復旧の管理
SparkContext
オブジェクトを使用して、ジョブ実行用の Spark ドライバーを操作します。
Spark エグゼキュターは、Spark ドライバーから渡されるデータを保持し、タスクを実行するワーカーです。Spark エグゼキュターの数は、クラスターのサイズに応じて増減します。
注記
Spark エグゼキュターには複数のスロットがあるため、複数のタスクを並行して処理できます。Spark は、デフォルトで仮想 CPU (vCPU) コアごとに 1 つのタスクをサポートします。例えば、エグゼキュターに 4 つの CPU コアがある場合、4 つの同時タスクを実行できます。
回復力のある分散データセット
Spark は、Spark エグゼキュター全体で大規模なデータセットを保存および追跡する複雑なジョブを実行します。Spark ジョブのコードを記述するときは、ストレージの詳細について考える必要はありません。Spark は、回復力のある分散データセット (RDD) 抽象化を提供します。RDD 抽象化は、並列で操作でき、クラスターの Spark エグゼキュター間でパーティション化できる要素のコレクションです。
次の図は、Python スクリプトが一般的な環境で実行されているときと、Spark フレームワーク () で実行されているときに、データをメモリに保存する方法の違いを示していますPySpark。
-
Python – Python スクリプト
val = [1,2,3...N]
に書き込むと、コードが実行されている単一マシンのメモリにデータが保持されます。 -
PySpark – Spark は、複数の Spark エグゼキュターのメモリ全体に分散されたデータをロードして処理するための RDD データ構造を提供します。などのコードで RDD を生成
rdd = sc.parallelize[1,2,3...N]
できます。Spark は複数の Spark エグゼキュター間でデータをメモリに自動的に分散して保持できます。多くの AWS Glue ジョブでは、 DynamicFramesと Spark を介して AWS Glue RDDs を使用しますDataFrames。これらは、RDD 内のデータのスキーマを定義し、その追加情報を使用して上位レベルのタスクを実行できるようにする抽象化です。RDDs は内部的に使用されるため、データは透過的に分散され、次のコードの複数のノードにロードされます。
-
DynamicFrame
dyf= glueContext.create_dynamic_frame.from_options( 's3', {"paths": [ "s3://<YourBucket>/<Prefix>/"]}, format="parquet", transformation_ctx="dyf" )
-
DataFrame
df = spark.read.format("parquet") .load("s3://<YourBucket>/<Prefix>")
-
RDD には次の機能があります。
-
RDDsは、パーティション と呼ばれる複数のパートに分割されたデータで構成されます。各 Spark エグゼキュターは 1 つ以上のパーティションをメモリに保存し、データは複数のエグゼキュターに分散されます。
-
RDDs はイミュータブルな です。つまり、作成後に変更することはできません。を変更するには DataFrame、次のセクションで定義されている変換 を使用できます。
-
RDDs利用可能なノード間でデータをレプリケートするため、ノード障害から自動的に復旧できます。
遅延評価
RDDs、既存のデータセットから新しいデータセットを作成する変換 と、データセットで計算を実行した後にドライバープログラムに値を返すアクション の 2 種類のオペレーションをサポートします。
-
変換 — RDDsはイミュータブルであるため、変換を使用してのみ変更できます。
例えば、
map
は各データセット要素を関数に渡し、結果を表す新しい RDD を返す変換です。map
メソッドは出力を返さないことに注意してください。Spark は、結果を操作するのではなく、将来の抽象変換を保存します。Spark は、 アクションを呼び出すまで変換を処理しません。 -
アクション — 変換を使用して、論理的な変換計画を構築します。計算を開始するには、、
write
、、count
show
などのアクションを実行しますcollect
。Spark のすべての変換は、すぐに結果を計算しないという点で遅延しています。代わりに、Spark は Amazon Simple Storage Service (Amazon S3) オブジェクトなど、一部のベースデータセットに適用された一連の変換を記憶します。変換は、アクションで結果をドライバーに返す必要がある場合にのみ計算されます。この設計により、Spark をより効率的に実行できます。例えば、
map
変換によって作成されたデータセットが、 などの行数を大幅に減らす変換によってのみ消費される状況を考えてみましょうreduce
。その後、マッピングされた大きなデータセットを渡す代わりに、両方の変換を行った小さなデータセットをドライバーに渡すことができます。
Spark アプリケーションの用語
このセクションでは、Spark アプリケーションの用語について説明します。Spark ドライバーは実行プランを作成し、いくつかの抽象化でアプリケーションの動作を制御します。以下の用語は、Spark UI での開発、デバッグ、パフォーマンスチューニングに重要です。
-
アプリケーション — Spark セッション (Spark コンテキスト) に基づきます。などの一意の ID で識別されます
<application_XXX>
。 -
ジョブ – RDD 用に 作成されたアクションに基づきます。ジョブは 1 つ以上のステージ で構成されます。
-
ステージ – RDD 用に作成されたシャッフル に基づきます。ステージは 1 つ以上のタスクで構成されます。シャッフルは、データを RDD パーティション間で異なる方法でグループ化するように再分散するための Spark のメカニズムです。など、特定の変換にはシャッフル
join()
が必要です。シャッフルについては、シャッフルの最適化のチューニングの実践で詳しく説明します。 -
タスク – タスクは、Spark によってスケジュールされる処理の最小単位です。タスクは RDD パーティションごとに作成され、タスクの数はステージ内の同時実行の最大数です。
注記
タスクは並列処理を最適化する際に考慮すべき最も重要なものです。RDD の数に応じてスケーリングされるタスクの数
並列処理
Spark は、データをロードおよび変換するためのタスクを並列化します。
Amazon S3 でアクセスログファイル ( という名前accesslog1 ... accesslogN
) の分散処理を実行する例を考えてみましょう。次の図は、分散処理フローを示しています。
-
Spark ドライバーは、多くの Spark エグゼキュターに分散処理の実行計画を作成します。
-
Spark ドライバーは、実行プランに基づいて各エグゼキュターにタスクを割り当てます。デフォルトでは、Spark ドライバーは S3 オブジェクト () ごとに RDD パーティション (それぞれ Spark タスクに対応) を作成します
Part1 ... N
。次に、Spark ドライバーは各エグゼキュターにタスクを割り当てます。 -
各 Spark タスクは、割り当てられた S3 オブジェクトをダウンロードし、RDD パーティションのメモリに保存します。このようにして、複数の Spark エグゼキュターが割り当てられたタスクを並行してダウンロードして処理します。
パーティションの初期数と最適化の詳細については、「タスクの並列化」セクションを参照してください。
カタリストオプティマイザ
内部的には、Spark は Catalyst オプティマイザ
Catalyst オプティマイザは RDD API と直接連携しないため、高レベル APIs は一般的に低レベル RDD API よりも高速です。複雑な結合の場合、Catalyst オプティマイザはジョブ実行プランを最適化することでパフォーマンスを大幅に向上させることができます。Spark ジョブの最適化された計画は、Spark UI の SQL タブで確認できます。
アダプティブクエリの実行
Catalyst オプティマイザは、アダプティブクエリ実行 と呼ばれるプロセスを通じてランタイム最適化を実行します。アダプティブクエリ実行は、ランタイム統計を使用して、ジョブの実行中にクエリの実行プランを再最適化します。アダプティブクエリの実行には、次のセクションで説明するように、シャッフル後のパーティションの結合、ソートマージ結合のブロードキャスト結合への変換、スキュー結合の最適化など、パフォーマンスの課題に対するいくつかのソリューションが用意されています。
アダプティブクエリ実行は AWS Glue 3.0 以降で利用可能で、 AWS Glue 4.0 (Spark 3.3.0) 以降ではデフォルトで有効になっています。コードspark.conf.set("spark.sql.adaptive.enabled", "true")
で を使用すると、アダプティブクエリの実行をオンまたはオフにできます。
シャッフル後のパーティションの結合
この機能は、map
出力統計に基づいて、シャッフルするたびに RDD パーティション (合体) を減らします。これにより、クエリを実行する際のシャッフルパーティション番号のチューニングが簡素化されます。データセットに合わせてシャッフルパーティション番号を設定する必要はありません。Spark は、シャッフルパーティションの初期数が十分多い場合、実行時に適切なシャッフルパーティション番号を選択できます。
シャッフル後のパーティションの結合は、 spark.sql.adaptive.enabled
と の両方spark.sql.adaptive.coalescePartitions.enabled
が true に設定されている場合に有効になります。詳細については、「Apache Spark ドキュメント
ソートマージ結合をブロードキャスト結合に変換する
この機能は、実質的に異なるサイズの 2 つのデータセットを結合する場合を認識し、その情報に基づいてより効率的な結合アルゴリズムを採用します。詳細については、「Apache Spark ドキュメント
スキュー結合の最適化
データスキューは、Spark ジョブの最も一般的なボトルネックの 1 つです。これは、データが特定の RDD パーティション (ひいては特定のタスク) に偏り、アプリケーションの全体的な処理時間が遅れる状況を示しています。これにより、結合オペレーションのパフォーマンスがダウングレードされることがよくあります。スキュー結合最適化機能は、スキューされたタスクをほぼ同じサイズのタスクに分割 (および必要に応じてレプリケート) することで、ソートマージ結合のスキューを動的に処理します。
この機能は、 が true spark.sql.adaptive.skewJoin.enabled
に設定されている場合に有効になります。詳細については、「Apache Spark ドキュメント