本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
優化任務復原和擴展操作的作業重新啟動時間
當任務失敗或發生擴展操作時,Flink 會嘗試從最後一個完成的檢查點重新執行任務。根據檢查點狀態的大小和平行任務數量,重新啟動程序可能需要一分鐘或更長的時間才能執行。在重新啟動期間,作業的積壓任務可能會累積。不過,Flink 有一些方法可以優化執行圖表的復原和重新啟動速度,以提高作業穩定性。
本頁說明 Amazon EMR Flink 在任務復原或擴展操作期間縮短作業重新啟動時間的一些方法。
任務本機復原
注意
Amazon EMR 6.0.0 及更高版本支援任務本機復原。
透過 Flink 檢查點,每個任務皆會產生其狀態的快照,Flink 會將其寫入分散式儲存體 (如 Amazon S3)。在復原的情況下,任務會從分散式儲存體還原其狀態。分散式儲存提供容錯能力,並且由於所有節點皆可存取分散式儲存體,因此可以在重新擴展期間重新分配狀態。
但是,遠端分散式存放區也有一個缺點:所有任務均須透過網路從遠端位置讀取其狀態。這可能會導致任務復原或擴展操作期間大型狀態的復原時間較長。
您可透過任務本機復原來解決復原時間較長的問題。任務會將檢查點上的狀態寫入任務本機的次要儲存體,例如本機磁碟上。同時還會將其狀態存放在主要儲存體中 (在此例中為 Amazon S3)。在復原期間,排程器會在較早執行任務的相同 Task Manager 上排定任務,以便其可以從本機狀態存放區復原,而不是從遠端狀態存放區讀取。如需詳細資訊,請參閱《Apache Flink 文件》中的任務本機復原
我們對範例作業的基準測試指出,啟用任務本機復原後,復原時間已從幾分鐘縮短至幾秒鐘。
若要啟用任務本機復原,請在 flink-conf.yaml
檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。
state.backend.local-recovery: true
state.backend: hasmap or rocksdb
state.checkpoints.dir: s3://storage-location-bucket-path
/checkpoint
execution.checkpointing.interval: 15000
一般日誌型增量檢查點
注意
Amazon EMR 6.10.0 及更高版本支援一般日誌型增量檢查點。
Flink 1.16 中新增了一般日誌型增量檢查點,以提高檢查點的速度。較快的檢查點間隔通常可減少復原工作,因為復原後需要重新處理的事件較少。如需詳細資訊,請參閱 Apache Flink 部落格上的 Improving speed and stability of checkpointing with generic log-based incremental checkpoints
我們對範例作業的基準測試指出,使用一般日誌型增量檢查點時,檢查點時間已從幾分鐘縮短至幾秒鐘。
若要啟用一般日誌型增量檢查點,請在 flink-conf.yaml
檔案中設定下列組態。指定檢查點間隔值 (以毫秒為單位)。
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://
bucket-path
/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path
/checkpoint execution.checkpointing.interval:15000
精細復原
注意
Amazon EMR 6.0.0 及更高版本提供對預設排程器的精細復原支援。Amazon EMR 6.15.0 及更高版本提供調整式排程器中的精細復原支援。
當任務在執行過程中失敗時,Flink 會重設整個執行圖表,並從最後一個完成的檢查點觸發完整的重新執行。相較於僅重新執行失敗的任務,此方式的成本更高。精細復原僅會重新啟動失敗任務的管道連接元件。在下列範例中,作業圖表有 5 個頂點 (A
至 E
)。頂點之間的所有連接均採用逐點分佈的管道方式,且作業的 parallelism.default
設定為 2
。
A → B → C → D → E
在此範例中,總共有 10 個任務正在執行。第一個管道 (a1
至 e1
) 在 TaskManager (TM1
) 上執行,而第二個管道 (a2
至 e2
) 在另一個 TaskManager (TM2
) 上執行。
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
有兩個管道連接的元件:a1 → e1
和 a2 →
e2
。如果 TM1
或 TM2
失敗,則失敗僅會影響正在執行 TaskManager 之管道中的 5 個任務。重新啟動策略只會啟動受影響的管道元件。
精細復原僅適用於完全平行的 Flink 作業。keyBy()
或 redistribute()
操作不支援。如需詳細資訊,請參閱 Flink Improvement Proposal Jira 專案中的 FLIP-1: Fine Grained Recovery from Task Failures
若要啟用精細復原,請在 flink-conf.yaml
檔案中設定下列組態。
jobmanager.execution.failover-strategy: region
restart-strategy: exponential-delay or fixed-delay
調整式排程器中的合併重新啟動機制
注意
Amazon EMR 6.15.0 及更高版本支援調整式排程器中的合併重新啟動機制。
調整式排程器可根據可用的插槽調整作業的並行度。如果沒有足夠的插槽以符合設定的作業並行度,則會自動降低並行度。如果有新的插槽可用,作業就會再次縱向擴展至設定的作業並行度。在沒有足夠的可用資源時,調整式排程器可以避免作業停機。這是 Flink Autoscaler 支援的排程器。基於這些原因,我們建議使用 Amazon EMR Flink 搭配調整式排程器。但是,調整式排程器可能會在短時間內進行多次重新啟動,每新增一個新資源就會重新啟動一次。這可能會導致作業效能下降。
在 Amazon EMR 6.15.0 及更高版本中,Flink 在調整式排程器中具有合併重新啟動機制,該機制會在新增第一個資源時開啟重新啟動時段,然後等到設定的預設 1 分鐘時段間隔為止。當有足夠的資源可使用設定的並行度執行作業時,或當間隔逾時,其會執行單次重新啟動。
我們對範例作業的基準測試指出,當您使用調整式排程器和 Flink 自動擴展器時,此功能比預設行為多處理 10% 的記錄。
若要啟用合併重新啟動機制,請在 flink-conf.yaml
檔案中設定下列組態。
jobmanager.adaptive-scheduler.combined-restart.enabled: true
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m