タスクを並列化する -

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

タスクを並列化する

パフォーマンスを最適化するには、データのロードと変換のタスクを並列化することが重要です。Apache Spark の主要なトピックで説明したように、並列処理の程度を決定するため、回復力のある分散データセット (RDD) パーティションの数は重要です。Spark が作成する各タスクは、1:1 ベースで RDD パーティションに対応します。最高のパフォーマンスを実現するには、RDD パーティションの数の決定方法と、その数の最適化方法を理解する必要があります。

十分な並列処理がない場合、次の症状は CloudWatch メトリクスと Spark UI に記録されます。

CloudWatch メトリクス

CPUロードメモリ使用率を確認します。一部のエグゼキュターがジョブのフェーズで処理されていない場合は、並列処理を改善するのが適切です。この場合、視覚化された期間中に、エグゼキュター 1 はタスクを実行していましたが、残りのエグゼキュター (2、3、4) は実行しませんでした。これらのエグゼキュターには Spark ドライバーによってタスクが割り当てられていないと推測できます。

ドライバーとエグゼキュターが 1 つだけ表示されているグラフ。

Spark UI

Spark UI のステージタブで、ステージ内のタスク の数を確認できます。 この場合、Spark は 1 つのタスクのみを実行しています。

""

さらに、イベントタイムラインには、Executor 1 が 1 つのタスクを処理することが表示されます。つまり、このステージの作業は 1 つのエグゼキュターで完全に実行され、他のエグゼキュターはアイドル状態でした。

1 つのタスクのみを表示するイベントタイムライン。

これらの症状に気付いた場合は、データソースごとに次のソリューションを試してください。

Amazon S3 からのデータロードを並列化する

Amazon S3 からのデータロードを並列化するには、まずパーティションのデフォルト数を確認します。その後、パーティションのターゲット数を手動で決定できますが、パーティションが多すぎないように注意してください。

パーティションのデフォルト数を決定する

Amazon S3 の場合、Spark RDD パーティションの初期数 (それぞれが Spark タスクに対応) は、Amazon S3 データセットの特徴 (形式、圧縮、サイズなど) によって決まります。Amazon S3 に保存されている CSV オブジェクトから an AWS Glue DynamicFrame または Spark DataFrame を作成する場合、RDD パーティションの初期数 (NumPartitions) は、ほぼ次のように計算できます。

  • オブジェクトサイズ <= 64 MB: NumPartitions = Number of Objects

  • オブジェクトサイズ > 64 MB: NumPartitions = Total Object Size / 64 MB

  • 分割不可 (gzip): NumPartitions = Number of Objects

「データスキャン量を減らす」セクションで説明したように、Spark は大きな S3 オブジェクトを分割して並列処理できます。オブジェクトが分割サイズより大きい場合、Spark はオブジェクトを分割し、分割ごとに RDD パーティション (およびタスク) を作成します。Spark の分割サイズはデータ形式とランタイム環境に基づいていますが、これは妥当な開始近似値です。一部のオブジェクトは gzip などの分割不可能な圧縮形式を使用して圧縮されるため、Spark はそれらを分割できません。

NumPartitions 値は、データ形式、圧縮、 AWS Glue バージョン、 AWS Glue ワーカー数、Spark 設定によって異なる場合があります。

例えば、Spark DataFrame を使用して単一の 10 GB csv.gz オブジェクトをロードすると、gzip は分割できないため、Spark ドライバーは 1 つの RDD パーティション (NumPartitions=1) のみを作成します。これにより、次の図に示すように、特定の Spark エグゼキュターに負荷がかかり、残りのエグゼキュターにタスクが割り当てられません。

Spark Web UI ステージタブでステージの実際のタスク数 (NumPartitions) を確認するか、コードdf.rdd.getNumPartitions()で を実行して並列処理を確認します。

10 GB の gzip ファイルが発生した場合は、そのファイルを生成するシステムが分割可能な形式で生成できるかどうかを調べます。これがオプションでない場合は、ファイルを処理するためにクラスター容量をスケーリングする必要がある場合があります。ロードしたデータに対して変換を効率的に実行するには、再パーティションを使用してクラスター内のワーカー間で RDD を再調整する必要があります。

パーティションのターゲット数を手動で決定する

データのプロパティと Spark による特定の機能の実装によっては、基礎となる作業を引き続き並列化できる場合でも、低いNumPartitions値になる場合があります。NumPartitions が小さすぎる場合は、 を実行してパーティションの数df.repartition(N)を増やし、処理を複数の Spark エグゼキュターに分散できるようにします。

この場合、 を実行するdf.repartition(100)と 1 NumPartitionsから 100 に増加し、データのパーティションが 100 個作成されます。各パーティションには、他のエグゼキュターに割り当てることができるタスクがあります。

オペレーションは、データ全体を均等にrepartition(N)分割し (10 GB/100 パーティション = 100 MB/パーティション)、特定のパーティションへのデータスキューを回避します。

注記

などのシャッフルオペレーションjoinを実行すると、パーティションの数は spark.sql.shuffle.partitionsまたは の値に応じて動的に増減しますspark.default.parallelism。これにより、Spark エグゼキュター間のデータのより効率的な交換が容易になります。詳細については、Spark ドキュメントを参照してください。

パーティションのターゲット数を決定する際の目標は、プロビジョニングされた AWS Glue ワーカーの使用を最大化することです。 AWS Glue ワーカーの数と Spark タスクの数は、vCPUs の数によって関連しています。Spark は vCPU コアごとに 1 つのタスクをサポートします。 AWS Glue バージョン 3.0 以降では、次の式を使用してパーティションのターゲット数を計算できます。

# Calculate NumPartitions by WorkerType numExecutors = (NumberOfWorkers - 1) numSlotsPerExecutor = 4 if WorkerType is G.1X 8 if WorkerType is G.2X 16 if WorkerType is G.4X 32 if WorkerType is G.8X NumPartitions = numSlotsPerExecutor * numExecutors # Example: Glue 4.0 / G.1X / 10 Workers numExecutors = ( 10 - 1 ) = 9 # 1 Worker reserved on Spark Driver numSlotsPerExecutor = 4 # G.1X has 4 vCpu core ( Glue 3.0 or later ) NumPartitions = 9 * 4 = 36

この例では、各 G.1X ワーカーは Spark エグゼキューター () に 4 つの vCPU コアを提供しますspark.executor.cores = 4。Spark は vCPU Core ごとに 1 つのタスクをサポートするため、G.1X Spark エグゼキュターは 4 つのタスクを同時に実行できます (numSlotPerExecutor)。この数のパーティションは、タスクに同じ時間がかかった場合にクラスターを最大限に活用します。ただし、一部のタスクは他のタスクよりも時間がかかり、アイドルコアが作成されます。この場合、2 または 3 numPartitionsを掛けて、ボトルネックタスクを分割して効率的にスケジュールすることを検討してください。

パーティションが多すぎる

パーティションの数が多すぎると、タスクの数が多すぎます。これにより、管理タスクや Spark エグゼキュター間のデータ交換など、分散処理に関連するオーバーヘッドにより、Spark ドライバーに負荷がかかります。

ジョブ内のパーティション数がターゲットパーティション数よりも大幅に大きい場合は、パーティション数を減らすことを検討してください。以下のオプションを使用してパーティションを減らすことができます。

  • ファイルサイズが非常に小さい場合は、 AWS Glue groupFiles を使用します。Apache Spark タスクの起動による過剰な並列処理を減らして、各ファイルを処理できます。

  • coalesce(N) を使用してパーティションをマージします。これは低コストのプロセスです。パーティションの数を減らす場合、 coalesce(N)は よりも優先されます。 repartition(N)repartition(N)はシャッフルを実行して各パーティションのレコード量を均等に分散するためです。これにより、コストと管理オーバーヘッドが増加します。

  • Spark 3.x アダプティブクエリ実行を使用します。Apache Spark セクションの「キートピック」で説明したように、アダプティブクエリ実行はパーティションの数を自動的に結合する関数を提供します。このアプローチは、実行するまでパーティションの数がわからない場合に使用できます。

JDBC からのデータロードを並列化する

Spark RDD パーティションの数は設定によって決まります。デフォルトでは、SELECTクエリを通じてソースデータセット全体をスキャンするために実行されるタスクは 1 つだけであることに注意してください。

AWS Glue DynamicFrames と Spark DataFrames はどちらも、複数のタスクにわたる並列 JDBC データロードをサポートしています。これは、where述語を使用して 1 つのSELECTクエリを複数のクエリに分割することによって行われます。JDBC からの読み取りを並列化するには、次のオプションを設定します。

  • For AWS Glue DynamicFrame の場合は、 hashfield (または hashexpression)hashpartition。詳細については、JDBC テーブルからの並列読み取り」を参照してください。

    connection_mysql8_options = { "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test", "dbtable": "medicare_tb", "user": "test", "password": "XXXXXXXXX", "hashexpression":"id", "hashpartitions":"10" } datasource0 = glueContext.create_dynamic_frame.from_options( 'mysql', connection_options=connection_mysql8_options, transformation_ctx= "datasource0" )
  • Spark DataFrame の場合は、numPartitionspartitionColumnlowerBound、および を設定しますupperBound。詳細については、JDBC to Other Databases」を参照してください。

    df = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \ .option("dbtable", "medicare_tb") \ .option("user", "test") \ .option("password", "XXXXXXXXXX") \ .option("partitionColumn", "id") \ .option("numPartitions", "10") \ .option("lowerBound", "0") \ .option("upperBound", "1141455") \ .load() df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")

ETL コネクタの使用時に DynamoDB からのデータロードを並列化する

Spark RDD パーティションの数は、 dynamodb.splitsパラメータによって決まります。Amazon DynamoDB からの読み取りを並列化するには、次のオプションを設定します。

Kinesis Data Streams からのデータロードを並列化する

Spark RDD パーティションの数は、ソース Amazon Kinesis Data Streams データストリーム内のシャードの数によって決まります。データストリームにシャードがわずかしかない場合、Spark タスクはわずかしかありません。これにより、ダウンストリームプロセスでの並列処理が低下する可能性があります。Kinesis Data Streams からの読み取りを並列化するには、次のオプションを設定します。

  • Kinesis Data Streams からデータをロードするときに、シャードの数を増やして並列性を高めます。

  • マイクロバッチのロジックが十分に複雑な場合は、不要な列を削除した後、バッチの最初にデータを再パーティションすることを検討してください。

詳細については、AWS Glue 「ストリーミング ETL ジョブのコストとパフォーマンスを最適化するためのベストプラクティス」を参照してください。

データロード後にタスクを並列化する

データロード後にタスクを並列化するには、次のオプションを使用して RDD パーティションの数を増やします。

  • データを再パーティションして、特にロード自体を並列化できなかった場合は、初期ロードの直後に、より多くのパーティションを生成します。

    on DynamicFrame または DataFrame repartition()を呼び出し、パーティションの数を指定します。適切な経験則は、使用可能なコア数の 2~3 倍です。

    ただし、パーティション化されたテーブルを記述すると、ファイルが爆発する可能性があります (各パーティションは、各テーブルパーティションにファイルを生成する可能性があります)。これを回避するには、列ごとに DataFrame を再パーティションできます。これにより、テーブルパーティション列が使用され、書き込み前にデータが整理されます。テーブルパーティションで小さなファイルを取得せずに、より多くのパーティションを指定できます。ただし、データスキューが発生しないように注意してください。データスキューでは、一部のパーティション値がほとんどのデータになり、タスクの完了が遅延します。

  • シャッフルがある場合は、spark.sql.shuffle.partitions値を増やします。これは、シャッフル時のメモリの問題にも役立ちます。

    2,001 個を超えるシャッフルパーティションがある場合、Spark は圧縮メモリ形式を使用します。それに近い数値がある場合は、その制限よりもspark.sql.shuffle.partitions値を設定して、より効率的な表現を得ることもできます。