Apache Spark の主なトピック -

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Spark の主なトピック

このセクションでは、Apache Spark のパフォーマンスを調整するための Apache AWS Glue Spark の基本概念と主要なトピックについて説明します。実際の調整戦略について説明する前に、これらの概念とトピックを理解することが重要です。

アーキテクチャ

Spark ドライバーは主に、Spark アプリケーションを個々のワーカーで達成できるタスクに分割する役割を担います。Spark ドライバーには以下の責任があります。

  • コードmain()での実行

  • 実行プランの生成

  • クラスター上のリソースを管理するクラスターマネージャーと連動した Spark エグゼキュターのプロビジョニング

  • Spark エグゼキュターのタスクのスケジュール設定とタスクのリクエスト

  • タスクの進行状況と復旧の管理

SparkContext オブジェクトを使用して、ジョブ実行用の Spark ドライバーを操作します。

Spark エグゼキュターは、Spark ドライバーから渡されるデータを保持し、タスクを実行するワーカーです。Spark エグゼキュターの数は、クラスターのサイズに応じて増減します。

Spark ドライバー、クラスターマネージャー、およびワーカーノード内の JVM エグゼキュターとのワーカーノード接続。
注記

Spark エグゼキュターには複数のスロットがあるため、複数のタスクを並行して処理できます。Spark は、デフォルトで仮想 CPU (vCPU) コアごとに 1 つのタスクをサポートします。例えば、エグゼキュターに 4 つの CPU コアがある場合、4 つの同時タスクを実行できます。

回復力のある分散データセット

Spark は、Spark エグゼキュター全体で大規模なデータセットを保存および追跡する複雑なジョブを実行します。Spark ジョブのコードを記述するときは、ストレージの詳細について考える必要はありません。Spark は、回復力のある分散データセット (RDD) 抽象化を提供します。RDD 抽象化は、並列で操作でき、クラスターの Spark エグゼキュター間でパーティション化できる要素のコレクションです。

次の図は、Python スクリプトが一般的な環境で実行されているときと、Spark フレームワーク () で実行されているときに、データをメモリに保存する方法の違いを示していますPySpark

Python val [1,2,3 N]、Apache Spark rdd = sc.parallelize[1,2,3 N]。
  • Python – Python スクリプトval = [1,2,3...N]に書き込むと、コードが実行されている単一マシンのメモリにデータが保持されます。

  • PySpark – Spark は、複数の Spark エグゼキュターのメモリ全体に分散されたデータをロードして処理するための RDD データ構造を提供します。などのコードで RDD を生成rdd = sc.parallelize[1,2,3...N]できます。Spark は複数の Spark エグゼキュター間でデータをメモリに自動的に分散して保持できます。

    多くの AWS Glue ジョブでは、 DynamicFramesと Spark を介して AWS Glue RDDs を使用しますDataFrames。これらは、RDD 内のデータのスキーマを定義し、その追加情報を使用して上位レベルのタスクを実行できるようにする抽象化です。RDDs は内部的に使用されるため、データは透過的に分散され、次のコードの複数のノードにロードされます。

    • DynamicFrame

      dyf= glueContext.create_dynamic_frame.from_options( 's3', {"paths": [ "s3://<YourBucket>/<Prefix>/"]}, format="parquet", transformation_ctx="dyf" )
    • DataFrame

      df = spark.read.format("parquet") .load("s3://<YourBucket>/<Prefix>")

RDD には次の機能があります。

  • RDDsは、パーティション と呼ばれる複数のパートに分割されたデータで構成されます。各 Spark エグゼキュターは 1 つ以上のパーティションをメモリに保存し、データは複数のエグゼキュターに分散されます。

  • RDDs はイミュータブルな です。つまり、作成後に変更することはできません。を変更するには DataFrame、次のセクションで定義されている変換 を使用できます。

  • RDDs利用可能なノード間でデータをレプリケートするため、ノード障害から自動的に復旧できます。

遅延評価

RDDs、既存のデータセットから新しいデータセットを作成する変換 と、データセットで計算を実行した後にドライバープログラムに値を返すアクション の 2 種類のオペレーションをサポートします。

  • 変換 — RDDsはイミュータブルであるため、変換を使用してのみ変更できます。

    例えば、 mapは各データセット要素を関数に渡し、結果を表す新しい RDD を返す変換です。map メソッドは出力を返さないことに注意してください。Spark は、結果を操作するのではなく、将来の抽象変換を保存します。Spark は、 アクションを呼び出すまで変換を処理しません。

  • アクション — 変換を使用して、論理的な変換計画を構築します。計算を開始するには、、write、、 count showなどのアクションを実行しますcollect

    Spark のすべての変換は、すぐに結果を計算しないという点で遅延しています。代わりに、Spark は Amazon Simple Storage Service (Amazon S3) オブジェクトなど、一部のベースデータセットに適用された一連の変換を記憶します。変換は、アクションで結果をドライバーに返す必要がある場合にのみ計算されます。この設計により、Spark をより効率的に実行できます。例えば、map変換によって作成されたデータセットが、 などの行数を大幅に減らす変換によってのみ消費される状況を考えてみましょうreduce。その後、マッピングされた大きなデータセットを渡す代わりに、両方の変換を行った小さなデータセットをドライバーに渡すことができます。

Spark アプリケーションの用語

このセクションでは、Spark アプリケーションの用語について説明します。Spark ドライバーは実行プランを作成し、いくつかの抽象化でアプリケーションの動作を制御します。以下の用語は、Spark UI での開発、デバッグ、パフォーマンスチューニングに重要です。

  • アプリケーション — Spark セッション (Spark コンテキスト) に基づきます。などの一意の ID で識別されます<application_XXX>

  • ジョブ – RDD 用に 作成されたアクションに基づきます。ジョブは 1 つ以上のステージ で構成されます。

  • ステージ – RDD 用に作成されたシャッフル に基づきます。ステージは 1 つ以上のタスクで構成されます。シャッフルは、データを RDD パーティション間で異なる方法でグループ化するように再分散するための Spark のメカニズムです。など、特定の変換にはシャッフルjoin()が必要です。シャッフルについては、シャッフルの最適化のチューニングの実践で詳しく説明します。

  • タスク – タスクは、Spark によってスケジュールされる処理の最小単位です。タスクは RDD パーティションごとに作成され、タスクの数はステージ内の同時実行の最大数です。

ジョブ、ステージ、シャッフル、タスクを含む実行プラン。
注記

タスクは並列処理を最適化する際に考慮すべき最も重要なものです。RDD の数に応じてスケーリングされるタスクの数

並列処理

Spark は、データをロードおよび変換するためのタスクを並列化します。

Amazon S3 でアクセスログファイル ( という名前accesslog1 ... accesslogN) の分散処理を実行する例を考えてみましょう。次の図は、分散処理フローを示しています。

""
  1. Spark ドライバーは、多くの Spark エグゼキュターに分散処理の実行計画を作成します。

  2. Spark ドライバーは、実行プランに基づいて各エグゼキュターにタスクを割り当てます。デフォルトでは、Spark ドライバーは S3 オブジェクト () ごとに RDD パーティション (それぞれ Spark タスクに対応) を作成しますPart1 ... N。次に、Spark ドライバーは各エグゼキュターにタスクを割り当てます。

  3. 各 Spark タスクは、割り当てられた S3 オブジェクトをダウンロードし、RDD パーティションのメモリに保存します。このようにして、複数の Spark エグゼキュターが割り当てられたタスクを並行してダウンロードして処理します。

パーティションの初期数と最適化の詳細については、「タスクの並列化」セクションを参照してください。

カタリストオプティマイザ

内部的には、Spark は Catalyst オプティマイザと呼ばれるエンジンを使用して実行プランを最適化します。Catalyst にはクエリオプティマイザがあり、次の図に示すように、Spark SQL 、、データセット などの高レベルの Spark APIs を実行するときに使用できます。 DataFrame

論理計画は Catalyst オプティマイザを通過し、RDDs。

Catalyst オプティマイザは RDD API と直接連携しないため、高レベル APIs は一般的に低レベル RDD API よりも高速です。複雑な結合の場合、Catalyst オプティマイザはジョブ実行プランを最適化することでパフォーマンスを大幅に向上させることができます。Spark ジョブの最適化された計画は、Spark UI の SQL タブで確認できます。

アダプティブクエリの実行

Catalyst オプティマイザは、アダプティブクエリ実行 と呼ばれるプロセスを通じてランタイム最適化を実行します。アダプティブクエリ実行は、ランタイム統計を使用して、ジョブの実行中にクエリの実行プランを再最適化します。アダプティブクエリの実行には、次のセクションで説明するように、シャッフル後のパーティションの結合、ソートマージ結合のブロードキャスト結合への変換、スキュー結合の最適化など、パフォーマンスの課題に対するいくつかのソリューションが用意されています。

アダプティブクエリ実行は AWS Glue 3.0 以降で利用可能で、 AWS Glue 4.0 (Spark 3.3.0) 以降ではデフォルトで有効になっています。コードspark.conf.set("spark.sql.adaptive.enabled", "true")で を使用すると、アダプティブクエリの実行をオンまたはオフにできます。

シャッフル後のパーティションの結合

この機能は、map出力統計に基づいて、シャッフルするたびに RDD パーティション (合体) を減らします。これにより、クエリを実行する際のシャッフルパーティション番号のチューニングが簡素化されます。データセットに合わせてシャッフルパーティション番号を設定する必要はありません。Spark は、シャッフルパーティションの初期数が十分多い場合、実行時に適切なシャッフルパーティション番号を選択できます。

シャッフル後のパーティションの結合は、 spark.sql.adaptive.enabledと の両方spark.sql.adaptive.coalescePartitions.enabledが true に設定されている場合に有効になります。詳細については、「Apache Spark ドキュメント」を参照してください。

ソートマージ結合をブロードキャスト結合に変換する

この機能は、実質的に異なるサイズの 2 つのデータセットを結合する場合を認識し、その情報に基づいてより効率的な結合アルゴリズムを採用します。詳細については、「Apache Spark ドキュメント」を参照してください。結合戦略については、シャッフルの最適化セクションで説明します。

スキュー結合の最適化

データスキューは、Spark ジョブの最も一般的なボトルネックの 1 つです。これは、データが特定の RDD パーティション (ひいては特定のタスク) に偏り、アプリケーションの全体的な処理時間が遅れる状況を示しています。これにより、結合オペレーションのパフォーマンスがダウングレードされることがよくあります。スキュー結合最適化機能は、スキューされたタスクをほぼ同じサイズのタスクに分割 (および必要に応じてレプリケート) することで、ソートマージ結合のスキューを動的に処理します。

この機能は、 が true spark.sql.adaptive.skewJoin.enabled に設定されている場合に有効になります。詳細については、「Apache Spark ドキュメント」を参照してください。データスキューについては、シャッフルの最適化セクションで詳しく説明します。