優化洗牌 -

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

優化洗牌

某些作業 (例如join()groupByKey()) 需要 Spark 才能執行隨機播放。隨機播放是 Spark 重新分配資料的機制,以便在RDD分割區之間以不同的方式分組資料。洗牌可協助修正效能瓶頸。但是,由於洗牌通常涉及在 Spark 執行程序之間複製數據,因此隨機播放是一個複雜且昂貴的操作。例如,隨機播放會產生下列成本:

  • 磁碟 I/O:

    • 在磁碟上產生大量的中繼檔案。

  • 網路 I/O:

    • 需要許多網絡連接(連接數 =Mapper × Reducer)。

    • 由於記錄彙總到可能託管在不同 Spark 執行程序上的新分RDD區,因此您的數據集中很大一部分可能會在網絡上的 Spark 執行程序之間移動。

  • CPU和內存負載:

    • 排序值並合併資料集。這些操作計劃在執行人身上,對執行人造成沉重的負荷。

隨機播放是 Spark 應用程式效能下降的最重要因素之一。在存儲中間數據時,它可能會耗盡執行程序的本地磁盤上的空間,這會導致 Spark 作業失敗。

您可以在 CloudWatch 指標和 Spark UI 中評估隨機播放效能。

CloudWatch 度量

如果隨機播放字節寫入值與隨機字節讀取相比較高,您的 Spark 作業可能會使用隨機播放操作,例如或。join() groupByKey()

跨執行器(字節)圖形的數據混洗顯示寫入的隨機字節尖峰。

Spark UI

在 Spark UI 的「舞台」索引標籤上,您可以檢查「隨機播放讀取大小/記錄」值。您也可以在「執行者」選項卡上看到它。

在下面的屏幕截圖中,每個執行者與隨機播放過程交換大約 18.6GB/4020000 條記錄,總隨機播放讀取大小約為 75 GB)。

隨機溢出 (磁碟) 欄會顯示大量的資料溢滿記憶體至磁碟,這可能會造成磁碟已滿或效能問題。

""

如果您觀察到這些症狀,而且階段與您的績效目標相比需要太長的時間,或者失敗Out Of MemoryNo space left on device錯誤,請考慮下列解決方案。

優化加入

連接資料表的join()作業是最常用的隨機播放作業,但通常是效能瓶頸。由於 join 是一項昂貴的操作,因此我們建議您不要使用它,除非它對您的業務需求至關重要。請提出以下問題,仔細檢查您是否有效地使用資料管線:

  • 您是否正在重新計算也在其他可重複使用的工作中執行的連接?

  • 您是否加入以將外鍵解析為輸出消費者未使用的值?

確認加入作業對您的業務需求至關重要之後,請參閱下列選項,以符合您需求的方式最佳化您的聯結。

加入前使用下推

在執行聯結 DataFrame 之前,請先篩選出中不必要的列和欄。這具有以下優點:

  • 減少隨機播放期間的資料傳輸量

  • 減少了 Spark 執行程序處理的量

  • 減少資料掃描量

# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])

使用 DataFrame 加入

嘗試使用 Spark 高級別,API例如 Spark SQL DataFrame,和數據集,而不是RDDAPI或 DynamicFrame 加入。您可以 DataFrame 使 DynamicFrame 用方法調用轉換為dyf.toDF()。如 Apache Spark 中「關鍵主題」一節所述,這些聯結作業會在內部利用 Catalyst 最佳化工具的查詢最佳化功能。

隨機播放和廣播雜湊連接和提示

星火支持兩種類型的連接:隨機連接和廣播哈希加入。廣播哈希連接不需要混洗,並且與隨機連接相比,它可能需要更少的處理。但是,僅在將小桌子連接到大桌子時才適用。當加入可容納單個 Spark 執行程序內存的表時,請考慮使用廣播哈希聯接。

下圖顯示了廣播哈希聯接和隨機連接的高級結構和步驟。

廣播連接表和連接表之間的直接連接,以及與表和連接表之間的兩個洗牌階段隨機連接。

每個聯結的詳細資訊如下:

  • 隨機加入:

    • 洗牌哈希連接連接兩個表,而不排序和分配兩個表之間的連接。它適用於可以存儲在 Spark 執行程序內存中的小表的連接。

    • 排序合併連接分配兩個表通過鍵連接和加入之前對它們進行排序。它適用於大型表格的連接。

  • 廣播哈希加入:

    • 廣播雜湊聯結會將較小的RDD或資料表推送至每個工作節點。然後它會將地圖側與較大RDD或表格的每個分區結合在一起。

      當您的RDDs或表中的一個可以放入內存或可以設置為適合內存時,它適用於連接。盡可能進行廣播哈希連接是有益的,因為它不需要隨機播放。您可以使用連接提示從 Spark 請求廣播加入,如下所示。

      # DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

      如需有關聯結提示的詳細資訊,請參閱聯結提示

在 AWS Glue 3.0 及更新版本中,您可以啟用「調適性查詢執行」和其他參數,以自動利用廣播雜湊聯結。當任一聯結端的執行階段統計資料小於調適性廣播雜湊聯結臨界值時,調適性查詢執行會將排序合併聯結轉換為廣播雜湊聯結。

在 AWS Glue 3.0 中,您可以通過設置啟用自適應查詢執行spark.sql.adaptive.enabled=true。在 AWS Glue 4.0 中預設會啟用調適性查詢執行。

您可以設置與洗牌和廣播哈希連接相關的其他參數:

  • spark.sql.adaptive.localShuffleReader.enabled

  • spark.sql.adaptive.autoBroadcastJoinThreshold

如需相關參數的詳細資訊,請參閱將排序合併聯結轉換為廣播聯結

在 AWS Glue 3.0 和更高版本中,您可以使用其他連接提示進行隨機播放來調整您的行為。

-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

使用分段設定

排序合併連接需要兩個階段,隨機排序和排序,然後合併。這兩個階段可能會超載 Spark 執行程序,並導致OOM和性能問題,當一些執行者正在合併和其他執行程序同時排序。在這種情況下,可能可以通過使用分來有效地加入。Bucketing 將在聯接鍵上預先洗牌和預先排序您的輸入,然後將排序的數據寫入中間表。透過預先定義已排序的中繼資料表,在連接大型資料表時,可以減少隨機排序和排序步驟的成本。

排序合併聯結具有額外的隨機播放和排序步驟。

分組表格對下列項目非常有用:

  • 經常透過同一個金鑰關連的資料,例如 account_id

  • 載入每日累計資料表,例如可在公用資料欄上分組的基礎和差異表格

您可以通過使用下面的代碼創建一個分組的表。

df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")

在聯接之前 DataFrames 對聯接鍵進行重新分區

若要 DataFrames 在連接之前重新分割兩個連接鍵,請使用下列陳述式。

df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")

這將RDDs在啟動聯接之前對連接鍵進行兩個分區(仍然是分開的)。如果兩者在具有相同分區代碼的同一個密鑰上進行分區,則您計劃聯接在一起的RDD記錄將RDDs很有可能在混洗聯接之前共同位於同一個 Worker 上。這可能會透過減少聯結期間的網路活動和資料偏差來改善效能。

克服資料偏斜

資料偏斜是 Spark 工作瓶頸的最常見原因之一。當數據不是跨RDD分區均勻分佈時,就會發生這種情況。這會導致該分區的任務比其他分區花費更長的時間,從而延遲應用程序的整體處理時間。

若要識別資料偏差,請在 Spark UI 中評估下列量度:

  • 在 Spark UI 的「舞台」索引標籤上,檢查「事件時間軸」頁面。您可以在下面的屏幕截圖中看到任務分佈不均勻。分佈不均或花費太長時間執行的工作可能表示資料偏斜。

    執行程序計算時間對於一個任務比其他任務長得多。
  • 另一個重要頁面是「摘要量度」,它會顯示 Spark 工作的統計資料。下列螢幕擷取畫面顯示「續時間」、「GC 時間」、「溢出 (記憶體)」、「溢出 (磁碟)」等百分位數的度量。

    反白顯示持續時間列的摘要測量結果表格。

    當任務均勻分佈時,您將在所有百分位數中看到相似的數字。當數據偏斜時,您將在每個百分位數中看到非常偏差的值。在此範例中,工作持續時間小於 13 秒,以最小值、第 25 個百位數、中位數和第 75 個百位數為單位。雖然「最大」工作處理的資料量是第 75 個百分位數的 100 倍,但其 6.4 分鐘的持續時間大約是 30 倍。這意味著至少有一個任務(或多達 25% 的任務)花費的時間遠遠超過其餘任務。

如果您看到資料偏斜,請嘗試下列動作:

  • 如果您使用 AWS Glue 3.0,請透過設定啟用調適性查詢執行spark.sql.adaptive.enabled=true。自適應查詢執行默認情況下在 AWS Glue 4.0 中啟用。

    您也可以透過設定下列相關參數,對聯結引入的資料偏斜使用調適性查詢執行:

    • spark.sql.adaptive.skewJoin.skewedPartitionFactor

    • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

    • spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good)

    • spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)

    如需詳細資訊,請參閱 Apache 星火文件

  • 使用具有大範圍值的鍵作為連接鍵。在隨機連接,分區是為一個鍵的每個哈希值確定。如果聯接鍵的基數太低,則散列函數更有可能在分區之間分配數據做了不好的工作。因此,如果您的應用程式和商務邏輯支援它,請考慮使用較高的基數索引鍵或複合索引鍵。

    # Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])

使用快取

使用重複性時 DataFrames,請使用或df.persist()將計算結果緩存在每個 Spark 執行程序的內存和磁盤上,避免額外的隨機播放df.cache()或計算。Spark 還支持RDDs在磁盤上保存或跨多個節點(存儲級別)進行複製。

例如,您可以 DataFrames 通過添加來保留df.persist(). 當不再需要緩存時,您可以使unpersist用丟棄緩存的數據。

df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()

移除不需要的火花動作

避免執行不必要的動作count,例如show、或collect。正如在 Apache 星火一節中的關鍵主題討論的那樣,星火是懶惰的。每次對其執行動作時,都RDD可能會重新計算每個轉換。當您使用許多 Spark 動作時,會呼叫每個動作的多個來源存取、工作計算和隨機播放執行。

如果您不需要collect()或在商業環境中執行其他動作,請考慮將其移除。

注意

盡可能避免collect()在商業環境中使用 Spark。collect()動作會將 Spark 執行程式中計算的所有結果傳回給 Spark 驅動程式,這可能會造成 Spark 驅動程式傳回錯OOM誤。為了避免錯OOM誤,Spark spark.driver.maxResultSize = 1GB 默認設置,這將返回給 Spark 驅動程序的最大數據大小限制為 1 GB。