優化任務復原和擴展操作的作業重新啟動時間 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

優化任務復原和擴展操作的作業重新啟動時間

當任務失敗或發生擴展操作時,Flink 會嘗試從最後一個完成的檢查點重新執行任務。根據檢查點狀態的大小和平行任務數量,重新啟動程序可能需要一分鐘或更長的時間才能執行。在重新啟動期間,作業的積壓任務可能會累積。不過,Flink 有一些方法可以優化執行圖表的復原和重新啟動速度,以提高作業穩定性。

本頁說明 Amazon EMR Flink 在任務復原或擴展操作期間縮短作業重新啟動時間的一些方法。

注意

Amazon EMR 6.0.0 及更高版本支援任務本機復原。

透過 Flink 檢查點,每個任務皆會產生其狀態的快照,Flink 會將其寫入分散式儲存體 (如 Amazon S3)。在復原的情況下,任務會從分散式儲存體還原其狀態。分散式儲存提供容錯能力,並且由於所有節點皆可存取分散式儲存體,因此可以在重新擴展期間重新分配狀態。

但是,遠端分散式存放區也有一個缺點:所有任務均須透過網路從遠端位置讀取其狀態。這可能會導致任務復原或擴展操作期間大型狀態的復原時間較長。

您可透過任務本機復原來解決復原時間較長的問題。任務會將檢查點上的狀態寫入任務本機的次要儲存體,例如本機磁碟上。同時還會將其狀態存放在主要儲存體中 (在此例中為 Amazon S3)。在復原期間,排程器會在較早執行任務的相同 Task Manager 上排定任務,以便其可以從本機狀態存放區復原,而不是從遠端狀態存放區讀取。如需詳細資訊,請參閱《Apache Flink 文件》中的任務本機復原

我們對範例作業的基準測試指出,啟用任務本機復原後,復原時間已從幾分鐘縮短至幾秒鐘。

若要啟用任務本機復原,請在 flink-conf.yaml 檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。

state.backend.local-recovery: true state.backend: hasmap or rocksdb state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint execution.checkpointing.interval: 15000
注意

Amazon EMR 6.10.0 及更高版本支援一般日誌型增量檢查點。

Flink 1.16 中新增了一般日誌型增量檢查點,以提高檢查點的速度。較快的檢查點間隔通常可減少復原工作,因為復原後需要重新處理的事件較少。如需詳細資訊,請參閱 Apache Flink 部落格上的 Improving speed and stability of checkpointing with generic log-based incremental checkpoints

我們對範例作業的基準測試指出,使用一般日誌型增量檢查點時,檢查點時間已從幾分鐘縮短至幾秒鐘。

若要啟用一般日誌型增量檢查點,請在 flink-conf.yaml 檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。

state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket-path/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path/checkpoint execution.checkpointing.interval: 15000
注意

Amazon EMR 6.0.0 及更高版本提供對預設排程器的精細復原支援。Amazon EMR 6.15.0 及更高版本提供調整式排程器中的精細復原支援。

當任務在執行過程中失敗時,Flink 會重設整個執行圖表,並從最後一個完成的檢查點觸發完整的重新執行。相較於僅重新執行失敗的任務,此方式的成本更高。精細復原僅會重新啟動失敗任務的管道連接元件。在下列範例中,作業圖表有 5 個頂點 (AE)。頂點之間的所有連接均採用逐點分佈的管道方式,且作業的 parallelism.default 設定為 2

A → B → C → D → E

在此範例中,總共有 10 個任務正在執行。第一個管道 (a1e1) 在 TaskManager (TM1) 上執行,而第二個管道 (a2e2) 在另一個 TaskManager (TM2) 上執行。

a1 → b1 → c1 → d1 → e1 a2 → b2 → c2 → d2 → e2

有兩個管道連接的元件:a1 → e1a2 → e2。如果 TM1TM2 失敗,則失敗僅會影響正在執行 TaskManager 之管道中的 5 個任務。重新啟動策略只會啟動受影響的管道元件。

精細復原僅適用於完全平行的 Flink 作業。keyBy()redistribute() 操作不支援。如需詳細資訊,請參閱 Flink Improvement Proposal Jira 專案中的 FLIP-1: Fine Grained Recovery from Task Failures

若要啟用精細復原,請在 flink-conf.yaml 檔案中設定下列組態。

jobmanager.execution.failover-strategy: region restart-strategy: exponential-delay or fixed-delay
注意

Amazon EMR 6.15.0 及更高版本支援調整式排程器中的合併重新啟動機制。

調整式排程器可根據可用的插槽調整作業的並行度。如果沒有足夠的插槽以符合設定的作業並行度,則會自動降低並行度。如果有新的插槽可用,作業就會再次縱向擴展至設定的作業並行度。在沒有足夠的可用資源時,調整式排程器可以避免作業停機。這是 Flink Autoscaler 支援的排程器。基於這些原因,我們建議使用 Amazon EMR Flink 搭配調整式排程器。但是,調整式排程器可能會在短時間內進行多次重新啟動,每新增一個新資源就會重新啟動一次。這可能會導致作業效能下降。

在 Amazon EMR 6.15.0 及更高版本中,Flink 在調整式排程器中具有合併重新啟動機制,該機制會在新增第一個資源時開啟重新啟動時段,然後等到設定的預設 1 分鐘時段間隔為止。當有足夠的資源可使用設定的並行度執行作業時,或當間隔逾時,其會執行單次重新啟動。

我們對範例作業的基準測試指出,當您使用調整式排程器和 Flink 自動擴展器時,此功能比預設行為多處理 10% 的記錄。

若要啟用合併重新啟動機制,請在 flink-conf.yaml 檔案中設定下列組態。

jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m