Apache Spark の主要なトピック -

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Spark の主要なトピック

このセクションでは、Apache Spark のパフォーマンスを調整するための AWS Glue Apache Spark の基本概念と主要なトピックについて説明します。実際の調整戦略について議論する前に、これらの概念とトピックを理解することが重要です。

アーキテクチャ

Spark ドライバーは、主に Spark アプリケーションを個々のワーカーで達成できるタスクに分割します。Spark ドライバーには次の責任があります。

  • コードmain()で を実行する

  • 実行プランの生成

  • クラスター上のリソースを管理するクラスターマネージャーと連動した Spark エグゼキュターのプロビジョニング

  • Spark エグゼキュターのタスクのスケジュールとリクエスト

  • タスクの進行状況と復旧の管理

SparkContext オブジェクトを使用して、ジョブ実行の Spark ドライバーを操作します。

Spark エグゼキュターは、Spark ドライバーから渡されたデータを保持し、タスクを実行するワーカーです。Spark エグゼキュターの数は、クラスターのサイズに応じて増減します。

Spark ドライバー、クラスターマネージャー、およびワーカーノード内の JVM エグゼキュターとのワーカーノード接続。
注記

Spark エグゼキュターには複数のスロットがあるため、複数のタスクを並行して処理できます。Spark は、デフォルトで仮想 CPU (vCPU) コアごとに 1 つのタスクをサポートします。例えば、エグゼキュターに 4 つの CPU コアがある場合、4 つの同時タスクを実行できます。

回復力のある分散データセット

Spark は、Spark エグゼキュター全体で大規模なデータセットを保存および追跡する複雑なジョブを実行します。Spark ジョブのコードを記述するときは、ストレージの詳細について考える必要はありません。Spark は、回復力のある分散データセット (RDD) 抽象化を提供します。RDD 抽象化は、並列で操作でき、クラスターの Spark エグゼキュター間でパーティション化できる要素のコレクションです。

次の図は、Python スクリプトが一般的な環境で実行されたときと、Spark フレームワーク (PySpark) で実行されたときに、データをメモリに保存する方法の違いを示しています。

Python val [1,2,3 N]、Apache Spark rdd = sc.parallelize[1,2,3 N]。
  • Python – Python スクリプトval = [1,2,3...N]に書き込むと、コードが実行されている単一のマシンのメモリにデータが保持されます。

  • PySpark – Spark は、複数の Spark エグゼキュターのメモリに分散されたデータをロードして処理するための RDD データ構造を提供します。などのコードで RDD を生成できrdd = sc.parallelize[1,2,3...N]、Spark は複数の Spark エグゼキュター間でデータをメモリに自動的に分散して保持できます。

    多くの AWS Glue ジョブでは、DynamicFrames と Spark DataFrames を介して AWS Glue RDDs を使用します。これらは、RDD でデータのスキーマを定義し、その追加情報を使用して上位レベルのタスクを実行できるようにする抽象化です。内部的に RDDs を使用するため、データは透過的に分散され、次のコードで複数のノードにロードされます。

    • DynamicFrame

      dyf= glueContext.create_dynamic_frame.from_options( 's3', {"paths": [ "s3://<YourBucket>/<Prefix>/"]}, format="parquet", transformation_ctx="dyf" )
    • DataFrame

      df = spark.read.format("parquet") .load("s3://<YourBucket>/<Prefix>")

RDD には次の機能があります。

  • RDDsは、パーティションと呼ばれる複数のパートに分割されたデータで構成されます。各 Spark エグゼキュターは 1 つ以上のパーティションをメモリに保存し、データは複数のエグゼキュターに分散されます。

  • RDDs はイミュータブルです。つまり、作成後に変更することはできません。DataFrame を変更するには、次のセクションで定義されている変換を使用できます。

  • RDDs使用可能なノード間でデータをレプリケートするため、ノード障害から自動的に復旧できます。

遅延評価

RDDsは、既存のものから新しいデータセットを作成する変換と、データセットで計算を実行した後にドライバープログラムに値を返すアクションの 2 種類のオペレーションをサポートしています。

  • 変換 – RDDs はイミュータブルであるため、変換を使用してのみ変更できます。

    例えば、 mapは各データセット要素を関数に渡して、結果を表す新しい RDD を返す変換です。map メソッドは出力を返さないことに注意してください。Spark は、結果を操作できるようにするのではなく、将来の抽象変換を保存します。Spark は、アクションを呼び出すまで変換に対して動作しません。

  • アクション – 変換を使用して、論理的な変換プランを構築します。計算を開始するには、write、、countshowなどのアクションを実行しますcollect

    Spark のすべての変換は、すぐに結果を計算しないという点で遅延します。代わりに、Spark は Amazon Simple Storage Service (Amazon S3) オブジェクトなど、一部の基本データセットに適用された一連の変換を記憶します。変換は、アクションが結果をドライバーに返す必要がある場合にのみ計算されます。この設計により、Spark をより効率的に実行できます。例えば、map変換によって作成されたデータセットが、 などの行数を大幅に減らす変換によってのみ消費される状況を考えてみましょうreduce。その後、マッピングされた大きなデータセットを渡す代わりに、両方の変換を行った小さなデータセットをドライバーに渡すことができます。

Spark アプリケーションの用語

このセクションでは、Spark アプリケーションの用語について説明します。Spark ドライバーは実行プランを作成し、いくつかの抽象化でアプリケーションの動作を制御します。以下の用語は、Spark UI での開発、デバッグ、パフォーマンスチューニングに重要です。

  • アプリケーション – Spark セッション (Spark コンテキスト) に基づきます。などの一意の ID で識別されます<application_XXX>

  • ジョブ – RDD 用に 作成されたアクションに基づきます。ジョブは 1 つ以上のステージで構成されます。

  • ステージ – RDD 用に作成されたシャッフル に基づきます。ステージは 1 つ以上のタスクで構成されます。シャッフルは、RDD パーティション間で異なるグループにデータを再分散するための Spark のメカニズムです。など、特定の変換にはシャッフルjoin()が必要です。シャッフルの詳細については、「シャッフルの最適化」のチューニング方法を参照してください。

  • タスク – タスクは、Spark によってスケジュールされた処理の最小単位です。タスクは RDD パーティションごとに作成され、タスクの数はステージ内の同時実行の最大数です。

ジョブ、ステージ、シャッフル、タスクを含む実行計画。
注記

タスクは、並列処理を最適化する際に考慮すべき最も重要なものです。RDD の数でスケールするタスクの数

並列処理

Spark は、データをロードおよび変換するためのタスクを並列化します。

Amazon S3 でアクセスログファイル ( という名前accesslog1 ... accesslogN) の分散処理を実行する例を考えてみましょう。次の図は、分散処理フローを示しています。

""
  1. Spark ドライバーは、多くの Spark エグゼキュターに分散処理の実行計画を作成します。

  2. Spark ドライバーは、実行プランに基づいて各エグゼキュターにタスクを割り当てます。デフォルトでは、Spark ドライバーは S3 オブジェクト () ごとに RDD パーティション (それぞれ Spark タスクに対応) を作成しますPart1 ... N。次に、Spark ドライバーは各エグゼキュターにタスクを割り当てます。

  3. 各 Spark タスクは、割り当てられた S3 オブジェクトをダウンロードし、RDD パーティションのメモリに保存します。このようにして、複数の Spark エグゼキュターが割り当てられたタスクを並行してダウンロードして処理します。

パーティションと最適化の初期数の詳細については、「タスクの並列化」セクションを参照してください。

Catalyst オプティマイザ

内部的には、Spark は Catalyst オプティマイザと呼ばれるエンジンを使用して実行プランを最適化します。Catalyst には、次の図に示すように、Spark SQL、DataFrameDataFrame 、データセットなどの高レベルの Spark APIs を実行するときに使用できるクエリオプティマイザがあります。

論理プランは Catalyst オプティマイザを通過し、最適化されたプランを出力して RDDsに送信されます。

Catalyst オプティマイザは RDD API と直接連携しないため、高レベル APIsは一般的に低レベル RDD API よりも高速です。複雑な結合の場合、Catalyst オプティマイザはジョブ実行計画を最適化することでパフォーマンスを大幅に向上させることができます。Spark ジョブの最適化された計画は、Spark UI の SQL タブで確認できます。

アダプティブクエリの実行

Catalyst オプティマイザは、アダプティブクエリ実行と呼ばれるプロセスを通じてランタイム最適化を実行します。アダプティブクエリの実行では、ランタイム統計を使用して、ジョブの実行中にクエリの実行プランを再最適化します。アダプティブクエリの実行では、以下のセクションで説明するように、シャッフル後のパーティションの結合、ソートマージ結合のブロードキャスト結合への変換、スキュー結合の最適化など、パフォーマンスの課題に対するいくつかのソリューションが提供されます。

アダプティブクエリ実行は AWS Glue 3.0 以降で使用でき、デフォルトでは AWS Glue 4.0 (Spark 3.3.0) 以降で有効になっています。コードspark.conf.set("spark.sql.adaptive.enabled", "true")で を使用すると、アダプティブクエリの実行をオンまたはオフにできます。

シャッフル後のパーティションの結合

この機能は、map出力統計に基づいて、各シャッフル後の RDD パーティション (結合) を減らします。クエリを実行する際のシャッフルパーティション番号のチューニングが簡素化されます。データセットに合わせてシャッフルパーティション番号を設定する必要はありません。Spark は、シャッフルパーティションの初期数が十分多い場合、実行時に適切なシャッフルパーティション番号を選択できます。

シャッフル後のパーティションの結合は、 spark.sql.adaptive.enabledと の両方spark.sql.adaptive.coalescePartitions.enabledが true に設定されている場合に有効になります。詳細については、Apache Spark のドキュメントを参照してください。

ソートマージ結合をブロードキャスト結合に変換する

この機能は、サイズが大きく異なる 2 つのデータセットを結合するときに を認識し、その情報に基づいてより効率的な結合アルゴリズムを採用します。詳細については、Apache Spark のドキュメントを参照してください。結合戦略については、「シャッフルの最適化」セクションで説明します。

スキュー結合の最適化

データスキューは、Spark ジョブの最も一般的なボトルネックの 1 つです。特定の RDD パーティション (ひいては特定のタスク) にデータが偏り、アプリケーションの全体的な処理時間が遅れる状況について説明します。これにより、多くの場合、結合オペレーションのパフォーマンスがダウングレードされます。スキュー結合の最適化機能は、スキューされたタスクをほぼ均等なサイズのタスクに分割 (および必要に応じてレプリケート) することで、ソートマージ結合のスキューを動的に処理します。

この機能は、 spark.sql.adaptive.skewJoin.enabledが true に設定されている場合に有効になります。詳細については、Apache Spark のドキュメントを参照してください。データスキューについては、「シャッフルの最適化」セクションで詳しく説明します。