翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Spark パフォーマンスの最適化
Amazon EMRは Spark に複数のパフォーマンス最適化機能を提供します。このトピックでは、それぞれの最適化機能について詳しく説明します。
Spark の設定方法に関する詳細については、「Spark の設定」を参照してください。
アダプティブクエリ実行
アダプティブクエリ実行は、ランタイム統計に基づいてクエリプランを再最適化するためのフレームワークです。Amazon EMR 5.30.0 以降、Apache Spark 3 からの次のアダプティブクエリ実行最適化は、Spark 2 の Apache Amazon EMR Runtime で利用できます。
-
アダプティブ結合変換
-
シャッフルパーティションのアダプティブ結合
アダプティブ結合変換
アダプティブ結合変換は、クエリステージのランタイムサイズに基づいてオペレーションを broadcast-hash-joinsオペレーションに変換 sort-merge-joinすることで、クエリのパフォーマンスを向上させます。 Broadcast-hash-joins は、結合の片側がすべてのエグゼキューターに効率的に出力をブロードキャストできるほど小さい場合、パフォーマンスが向上する傾向があるため、交換をシャッフルして結合の両側をソートする必要はありません。適応結合変換は、Spark が自動的に を実行する場合のケースの範囲を広げます broadcast-hash-joins。
この機能は、デフォルトでご利用になれます。これは、spark.sql.adaptive.enabled
を false
に設定することで無効にできます。これにより、アダプティブクエリ実行フレームワークも無効になります。Spark は、結合側の 1 つのランタイムサイズの統計が を超えない場合spark.sql.autoBroadcastJoinThreshold
、 sort-merge-joinを broadcast-hash-joinに変換することを決定します。デフォルトでは 10,485,760 バイト (10 MiB) です。
シャッフルパーティションのアダプティブ結合
シャッフルパーティションのアダプティブ結合により、小さな連続するシャッフルパーティションが結合され、小さなタスクが多すぎることによるオーバーヘッドを回避することで、クエリのパフォーマンスが向上します。これにより、事前に初期シャッフルパーティションの数をより高く設定することができ、ランタイムでターゲットサイズまで削減されるため、シャッフルパーティションがより均等に分散される可能性が高まります。
この機能は、spark.sql.shuffle.partitions
が明示的に設定されていない限り、デフォルトで有効になっています。これは、spark.sql.adaptive.coalescePartitions.enabled
を true
に設定することで有効にできます。シャッフルパーティションの初期数とターゲットパーティションのサイズは、それぞれ spark.sql.adaptive.coalescePartitions.minPartitionNum
プロパティと spark.sql.adaptive.advisoryPartitionSizeInBytes
プロパティを使用して調整できます。この機能に関連する Spark プロパティの詳細については、以下の表を参照してください。
プロパティ | デフォルト値 | 説明 |
---|---|---|
|
|
true であり、かつ、spark.sql.adaptive.enabled が true の場合、Spark はターゲットサイズ ( |
|
64MB |
結合時のシャッフルパーティションのアドバイザリサイズ (バイト単位)。この設定は、 |
|
25 |
結合後のシャッフルパーティションの最小数。この設定は、 |
|
1,000 |
結合前のシャッフルパーティションの初期数。この設定は、 |
ダイナミックパーティションプルーニング
ダイナミックパーティションプルーニングでは、特定のパーティションを特定のクエリを読み取って処理するテーブル内でより正確に選択し、ジョブパフォーマンスを改善します。読み取って処理するデータ量を減らすと、ジョブの実行において大幅な時間の節約になります。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon 5.24.0 EMR および 5.25.0 では、Spark spark.sql.dynamicPartitionPruning.enabled
内またはクラスターの作成時に Spark プロパティを設定することで、この機能を有効にできます。
プロパティ | デフォルト値 | 説明 |
---|---|---|
|
|
true の場合、ダイナミックパーティションプルーニングが有効になります。 |
|
|
|
この最適化は、予定時間に解決される固定述語のプッシュダウンにのみ対応している Spark 2.4.2 の既存の機能を改良するものです。
Spark 2.4.2 での固定述語のプッシュダウンの例を以下に示します。
partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3
ダイナミックパーティションプルーニングにより、Spark エンジンでランタイムに積極的に情報から推計でき、パーティションが読み取り、安全に削除できます。たとえば、以下のクエリには、すべてのストアの販売合計を含み、リージョンごとに分けられた store_sales
テーブルと、各国のリージョンのマッピングを含む store_regions
テーブルという、2 つのテーブルが含まれます。テーブルには世界中に分散しているストアに関するデータが含まれていますが、北米のデータのみをクエリします。
select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'
ダイナミックパーティションプルーニングを使用しない場合、このクエリはサブクエリ結果に一致する地域のサブセットをフィルタリングする前に、すべての地域のデータを読み取ります。ダイナミックパーティションプルーニングを使用すると、このクエリはサブクエリで返された地域のパーティションのみを読み取り処理します。このため、ストレージのデータの読み取りや履歴の処理が少なくなり、時間とリソースが削減されます。
スカラサブクエリの平坦化
この最適化では、同じテーブルにスカラサブクエリのあるクエリのパフォーマンスを向上させます。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon 5.24.0 EMR および 5.25.0 では、Spark spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled
内またはクラスターの作成時に Spark プロパティを設定することで有効にできます。このプロパティを true に設定すると、可能な場合に、クエリオプティマイザが同じ関係式を使用する集約スカラサブクエリを平坦化します。スカラサブクエリは、サブクエリ内に存在する述語を集約関数にプッシュし、リレーションごとにすべての集約関数を使用して 1 つの集約を実行することによって平坦化されます。
以下は、この最適化を行うメリットがあるクエリの例です。
select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3
最適化により、以前のクエリが次のように書き換えられます。
select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);
書き換えられたクエリは一度だけ student テーブルを読み取ります。また、3 つのサブクエリの述語が avg
関数にプッシュされます。
DISTINCT より前 INTERSECT
この最適化は、 を使用する際の結合を最適化しますINTERSECT。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon 5.24.0 EMR および 5.25.0 では、Spark spark.sql.optimizer.distinctBeforeIntersect.enabled
内またはクラスターの作成時に Spark プロパティを設定することで有効にできます。を使用するクエリINTERSECTは、左半結合を使用するように自動的に変換されます。このプロパティが true に設定されている場合、クエリオプティマイザは、DISTINCTオペレータが BroadcastHashJoin の代わりに左半結合を にできることを検出INTERSECTした場合、 の子にDISTINCTオペレータをプッシュします SortMergeJoin。
以下は、この最適化を行うメリットがあるクエリの例です。
(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)
このプロパティが有効になっていないと spark.sql.optimizer.distinctBeforeIntersect.enabled
、クエリは以下のように書き換えられます。
select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand
このプロパティを有効にすると spark.sql.optimizer.distinctBeforeIntersect.enabled
、クエリは以下のように書き換えられます。
select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand
ブルームフィルター結合
この最適化では、結合の一方の側の値から生成されたBloom フィルターspark.sql.bloomFilterJoin.enabled
Spark 内true
またはクラスターの作成時に に設定することで、この機能を有効にできます。
以下は、Bloom フィルターを使用することによってメリットがあるクエリの例です。
select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)
この機能を有効にすると、Bloom フィルターが、クエリの対象となるカテゴリのセットに属するカテゴリを持つすべてのアイテム ID から構築されます。sales テーブルのスキャン中に Bloom フィルターを使用して、Bloom フィルターで定義されたセット内に確実に含まれていないアイテムの売上を洗い出します。こうすることで、特定されたこれらの売上をできるだけ早い段階で除外できます。
結合順序の最適化
この最適化では、フィルターを持つテーブルを含む結合の順序を変更することでクエリのパフォーマンスを向上させることができます。Amazon 5.26.0 EMR では、この機能はデフォルトで有効になっています。Amazon EMR5.25.0 では、Spark 設定パラメータを spark.sql.optimizer.sizeBasedJoinReorder.enabled
true に設定することで、この機能を有効にできます。Spark のデフォルトでは、クエリにリストされている順に、テーブルと左から右に結合するようになっています。この手法では、フィルターを含む小さい結合を最初に実行し、コストの高い結合を後で行うというチャンスを逃してしまいます。
このクエリの例では、ある国のすべての店舗に返品されたすべての商品が報告されます。結合順序を最適化しなかった場合、Spark は、大きなテーブル 2 つ (store_sales
と store_returns
) を最初に結合した後、これを store
と結合し、最後に item
と結合します。
select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'
結合順序を最適化した場合、Spark は、まず store_sales
をstore
と結合します。store
にはフィルターがあり、store_returns
および broadcastable
よりも小さいためです。次に store_returns
、最後に item
と結合します。item
にフィルターがあり、プロードキャストが可能な場合は、結合順序の変更の対象となるため、store_sales
が store
と結合され、続いてitem
、最後に store_returns
と結合されます。