优化任务恢复和扩展操作的作业重启时间 - Amazon EMR

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

优化任务恢复和扩展操作的作业重启时间

当任务失败或发生扩展操作时,Flink 会尝试从上一次完成的检查点重新执行任务。重启过程可能需要一分钟或更长时间才能执行,具体取决于检查点状态的大小以及并行任务的数量。重启期间,可以累积作业的积压任务。但是,Flink 可以通过一些方法来优化执行图的恢复和重启速度,从而提高作业稳定性。

本页介绍了 Amazon EMR Flink 可以在任务恢复或扩展操作期间缩短作业的重启时间的一些方法。

注意

Amazon EMR 6.0.0 及更高版本都支持任务的本地恢复。

使用 Flink 检查点,每个任务都会生成其状态的快照,Flink 会将该快照写入分布式存储(如 Amazon S3)。在恢复的情况下,任务会从分布式存储中恢复其状态。分布式存储提供容错能力,并且可以在重新扩展期间重新分配状态,因为它可供所有节点访问。

但远程分布式存储也有一个缺点:所有任务都必须通过网络从远程位置读取其状态。在任务恢复或扩展操作期间,这可能会导致大规模状态的恢复时间很长。

通过任务本地恢复可以解决恢复时间长这一问题。任务将其在检查点上的状态写入任务本地的辅助存储(例如本地磁盘)。它们还将状态存储在主存储中,或者存储在 Amazon S3 中(在本例中)。恢复期间,计划程序将任务计划在任务之前运行所在的同一个任务管理器上,这样它们就可以从本地状态存储中恢复,而不是从远程状态存储中读取。有关更多信息,请参阅 Apache Flink 文档中的任务本地恢复

我们对示例作业进行的基准测试表明,启用任务本地恢复后,恢复时间已从几分钟缩短到几秒。

要启用任务本地恢复,请在 flink-conf.yaml 文件中设置以下配置。指定检查点间隔值,以毫秒为单位。

state.backend.local-recovery: true state.backend: hasmap or rocksdb state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint execution.checkpointing.interval: 15000
注意

Amazon EMR 6.10.0 及更高版本支持基于日志的通用增量检查点。

Flink 1.16 中添加了基于日志的通用增量检查点功能,以提高检查点的速度。较快的检查点间隔通常会导致恢复工作减少,因为恢复后需要重新处理的事件较少。有关更多信息,请参阅 Apache Flink 博客上的使用基于日志的通用增量检查点提高检查点的速度和稳定性

对于示例作业,我们的基准测试表明,使用基于日志的通用增量检查点时,检查点时间从几分钟缩短到几秒。

要启用基于日志的通用增量检查点,请在 flink-conf.yaml 文件中设置以下配置。指定检查点间隔值,以毫秒为单位。

state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket-path/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path/checkpoint execution.checkpointing.interval: 15000
注意

Amazon EMR 6.0.0 及更高版本提供默认计划程序的精细恢复支持。Amazon EMR 6.15.0 及更高版本提供自适应计划程序的精细恢复支持。

当任务在执行过程中失败时,Flink 会重置整个执行图,并从上次完成的检查点触发完整的重新执行。这比仅重新执行失败的任务更昂贵。精细恢复仅重新启动失败的任务与管道连接的组件。在以下示例中,作业图有 5 个顶点(AE)。顶点之间的所有连接都使用逐点分布进行管道化处理,作业的 parallelism.default 设置为 2

A → B → C → D → E

在本示例中,总共有 10 个任务在运行。第一个管道(a1e1)在 TaskManager(TM1)上运行,第二个管道(a2e2)在另一个 TaskManager(TM2)上运行。

a1 → b1 → c1 → d1 → e1 a2 → b2 → c2 → d2 → e2

有两个管道连接的组件:a1 → e1a2 → e2。如果 TM1TM2 其中一个失败,则故障仅影响 TaskManager 正在其中运行的管道中的 5 个任务。重启策略仅会启动受影响的管道化组件。

精细恢复仅适用于完全并行的 Flink 作业。keyBy()redistribute() 操作不支持。有关更多信息,请参阅 Flink 改进提案 Jira 项目中的 FLIP-1:从任务失败中进行精细恢复

要启用精细恢复,请在 flink-conf.yaml 文件中设置以下配置。

jobmanager.execution.failover-strategy: region restart-strategy: exponential-delay or fixed-delay
注意

Amazon EMR 6.15.0 及更高版本支持自适应计划程序中的组合重启机制。

自适应计划程序可以根据可用插槽调整作业的并行度。如果没有足够的插槽来适应配置的作业并行度,它将自动降低并行度。如果有新的插槽可用,则任务将再次纵向扩展到配置的作业并行度。当没有足够的可用资源时,自适应计划程序将避免作业停机。这是 Flink Autoscaler 支持的计划程序。出于这些原因,我们建议在 Amazon EMR Flink 中使用自适应计划程序。但是,自适应计划程序可能会在短时间内进行多次重启,每添加一个新资源就会重启一次。这可能导致作业性能下降。

在 Amazon EMR 6.15.0 及更高版本中,Flink 在自适应计划程序中具有组合重启机制,可在添加第一个资源时打开一个重启窗口,然后等到配置的默认 1 分钟窗口间隔时。当有足够的资源可用来运行具有配置并行性的作业时,或者当间隔超时时,它会执行一次重启。

对于示例作业,我们的基准测试表明,当您使用自适应计划程序和 Flink Autoscaler 时,此功能处理的记录比默认行为多 10%。

要启用组合重启机制,请在 flink-conf.yaml 文件中设置以下配置。

jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m