優化 Spark 效能 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

優化 Spark 效能

Amazon 為 Spark EMR提供多種效能最佳化功能。本主題將詳細說明每個最佳化功能。

如需如何設定 Spark 組態的詳細資訊,請參閱 設定 Spark

自適應查詢執行

自適應查詢執行是一種依據執行期統計資料對查詢計畫進行重新優化的框架。從 Amazon EMR 5.30.0 開始,下列 Apache Spark 3 的適應性查詢執行最佳化可在 Apache Amazon EMR Runtime for Spark 2 上使用。

  • 自適應聯結轉換

  • 隨機分割區的自適應合併

自適應聯結轉換

自適應聯結轉換根據查詢階段的執行期大小將 broadcast-hash-joins操作轉換為 sort-merge-join操作,藉此改善查詢效能。 Broadcast-hash-joins當聯結的一側小到可有效率地在所有執行器中廣播其輸出時, 往往會更好,因此不需要隨機交換和排序聯結的兩側。自適應聯結轉換可擴大 Spark 自動執行 時的案例範圍 broadcast-hash-joins。

此功能預設為啟用。它可透過將 spark.sql.adaptive.enabled 設為 false 停用,但也會停用自適應查詢執行框架。當其中一個聯結端的執行期大小統計資料不超過 時,Spark 決定將 sort-merge-join broadcast-hash-join轉換為 spark.sql.autoBroadcastJoinThreshold,其預設為 10,485,760 位元組 (10 MiB。

隨機分割區的自適應合併

隨機分割區的自適應合併小型接續隨機分割區,以避免過多小型任務產生額外負荷,從而提升查詢的效能。這可讓您預先設定較高數量的初始隨機分割區,然後在執行期縮減為目標大小,提高使分散式隨機分割區變得更均勻的機率。

除非明確設定 spark.sql.shuffle.partitions,否則此功能預設為啟用。它可透過將 spark.sql.adaptive.coalescePartitions.enabled 設為 true 啟用。使用 spark.sql.adaptive.coalescePartitions.minPartitionNumspark.sql.adaptive.advisoryPartitionSizeInBytes 屬性,可分別調整隨機分割區的初始數量和目標分割區大小。請見下表,了解此功能相關 Spark 屬性的更詳細資訊。

Spark 自適應合併分割區屬性
屬性 預設值 描述

spark.sql.adaptive.coalescePartitions.enabled

true,除非明確設定 spark.sql.shuffle.partitions

當為 true 且 spark.sql.adaptive.enabled 也設為 true 時,Spark 會依據目標大小合併接續的隨機分割區 (由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定),以避免有過多小型任務。

spark.sql.adaptive.advisoryPartitionSizeInBytes

64 MB

合併時隨機分割區的建議大小 (以位元組為單位)。此組態僅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 均為 true 時有影響。

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

合併後隨機分割區的最小數量。此組態僅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 均為 true 時有影響。

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1000

合併前隨機分割區的初始數量。此組態僅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 均為 true 時有影響。

動態分割區剔除

動態分割區清除可透過更準確選取表格中需要供特定查詢讀取和處理的特定分割區,來改善任務效能。透過降低讀取和處理的資料量,即可省下任務執行所用的大量時間。使用 Amazon EMR 5.26.0 時,此功能預設為啟用。透過 Amazon EMR 5.24.0 和 5.25.0,您可以在 Spark spark.sql.dynamicPartitionPruning.enabled內或在建立叢集時設定 Spark 屬性,以啟用此功能。

Spark 動態分割區剔除分割區屬性
屬性 預設值 描述

spark.sql.dynamicPartitionPruning.enabled

true

如果為 true,啟用動態分割區剔除。

spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse

true

若為 true,Spark 會在查詢執行前執行防禦性檢查,以確保動態剔除篩選條件中廣播交換的重複使用不會被後續的準備規則 (例如使用者定義的單欄規則) 中斷。當重複使用被中斷且此組態為 true,Spark 會移除受影響的動態剔除篩選條件,以防範發生效能與正確性問題。當動態剔除篩選條件的廣播交換產生與對應聯結操作的廣播交換不同或不一致的結果時,即可能發生正確性問題。將此組態設為 false 時應保持謹慎;它支援規避一些情形,例如重複使用被使用者定義的單欄規則中斷。啟用「自適應查詢執行」時,始終強制執行廣播重複使用。

此最佳化可改善 Spark 2.4.1 現有功能,此版本僅支援下推可在計劃時間解析的靜態述詞。

以下是在 Spark 2.4.2 中下推的靜態述詞範例。

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

動態分割區清除允許 Spark 引擎在執行時間動態推斷需要讀取哪些分割區,以及可安全消除哪些分割區。例如,以下查詢包含兩個表格:store_sales 表格,其中包含所有商店的所有總銷售,並依區域分區,而 store_regions 表格包含每個國家的區域對應。此表格包含分佈於世界各地的商店相關資料,但我們只會查詢北美洲的資料。

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

在沒有使用動態分割區清除時,這個查詢會讀取所有區域,再篩選符合子查詢結果的區域子集。使用動態分割區清除時,這個查詢只會為在子查詢中傳回的區域讀取和處理分割區。這可透過讀取較少儲存區的資料和處理較少記錄來節省時間和資源。

扁平化純量子查詢

此最佳化可改善在相同表格具備純量子查詢的查詢效能。使用 Amazon EMR 5.26.0 時,此功能預設為啟用。透過 Amazon EMR 5.24.0 和 5.25.0,您可以在 Spark spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled內或在建立叢集時設定 Spark 屬性來啟用它。當此屬性設為 true,如有可能,此查詢最佳化工具會將使用相同關係的彙總純量子查詢扁平化。純量子查詢的扁平化是透過將子查詢中存在的任何述詞推送至彙總函數,然後再使用每個關係的所有彙總函數來執行一個彙總。

以下是將受益於此最佳化的查詢範本。

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

此最佳化會透過以下方式重寫先前的查詢:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

請注意,重寫查詢只會讀取一次學生的表格,三個子查詢的述詞會被推送至該 avg 函數。

DISTINCT 之前 INTERSECT

使用 時,此最佳化會最佳化聯結INTERSECT。使用 Amazon EMR 5.26.0 時,此功能預設為啟用。透過 Amazon EMR 5.24.0 和 5.25.0,您可以在 Spark spark.sql.optimizer.distinctBeforeIntersect.enabled內或建立叢集時設定 Spark 屬性來啟用它。使用 的查詢INTERSECT會自動轉換為使用左半聯結。當此屬性設為 true 時,INTERSECT如果查詢最佳化工具偵測到DISTINCT運算子可以讓左半聯結 BroadcastHashJoin 而不是 ,則會將DISTINCT運算子推送至 的子伺服器 SortMergeJoin。

以下是將受益於此最佳化的查詢範本。

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

沒有啟用此屬性 spark.sql.optimizer.distinctBeforeIntersect.enabled 時,查詢的重寫方式如下。

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

啟用此屬性 spark.sql.optimizer.distinctBeforeIntersect.enabled 時,查詢的重寫方式如下。

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Bloom 篩選條件聯結

此最佳化可以使用從聯結另一側之值產生的 Bloom 篩選條件,預先篩選聯結的一側,進而提升某些聯結的效能。使用 Amazon EMR 5.26.0 時,此功能預設為啟用。使用 Amazon EMR 5.25.0,您可以透過在 Spark 內將 Spark 屬性設定為 true 或建立叢集時spark.sql.bloomFilterJoin.enabled,來啟用此功能。

以下是受益於 Bloom 篩選條件的範例查詢。

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

啟用此功能時,Bloom 篩選條件是以其類別落於正接受查詢之類別集合的所有項目 ID 所建立。掃描銷售資料表時,Bloom 篩選條件用於判定哪些銷售是肯定不在 Bloom Filter 所定義之集合中的項目。因此能夠盡早篩選掉這些已識別的銷售。

優化的聯結重新排序

此最佳化可透過篩選條件重新排序涉及資料表的聯結,以提升查詢效能。使用 Amazon EMR 5.26.0 時,此功能預設為啟用。使用 Amazon EMR 5.25.0,您可以透過將 Spark 組態參數spark.sql.optimizer.sizeBasedJoinReorder.enabled設定為 true 來啟用此功能。如在查詢中所列出,在 Spark 中的預設行為是由左至右聯結資料表。此策略會以篩選條件跳過執行較小聯結的機會,以受益於之後成本較高的聯結。

以下範例查詢回報一國內所有商店的全部退貨項目。若無最佳化的聯結重新排序功能,Spark 會先聯結兩張大型資料表 store_salesstore_returns,然後將這些資料表與 store 聯結,最後與 item 聯結。

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

有了最佳化的聯結重新排序功能,由於 store 有篩選條件且小於 store_returnsbroadcastable,因此 Spark 會先將 store_salesstore 聯結。然後,Spark 與 store_returns 聯結,最後與 item 聯結。如果 item 有篩選條件且是 broadcastable,它也會符合重新排序的資格,因而讓 store_salesstore 聯結,然後與 item 聯結,最後再與 store_returns 聯結。