在 Apache Flink 的托管服务中实现容错能力 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Apache Flink 的托管服务中实现容错能力

检查点是用于在 Amazon Managed Service for Apache Flink 中实施容错功能的方法。检查点是正在运行的应用程序的 up-to-date 备份,用于从意外的应用程序中断或故障转移中立即恢复。

有关 Apache Flink 应用程序中检查点操作的详细信息,请参阅 Apache Flink 文档中的检查点

快照 是手动创建和管理的应用程序状态备份。通过使用快照,您可以调用 UpdateApplication 以将应用程序还原到以前的状态。有关更多信息,请参阅 使用快照管理应用程序备份

如果为应用程序启用了检查点,该服务将创建应用程序数据备份,并在应用程序意外重新启动时加载该备份以提供容错功能。这些意外的应用程序重新启动可能是由意外的任务重新启动、实例故障等引起的。这会在这些重新启动期间为应用程序提供与无故障执行相同的语义。

如果为应用程序启用了快照并使用应用程序的快照进行配置 ApplicationRestoreConfiguration,则该服务将在应用程序更新期间或与服务相关的扩展或维护期间提供一次性处理语义。

在 Apache Flink 的托管服务中配置检查点

您可以配置应用程序的检查点行为。您可以定义它是否永久保存检查点状态、将其状态保存到检查点的频率以及一个检查点操作结束到另一个检查点操作开始之间的最小间隔。

您可以使用CreateApplicationUpdateApplicationAPI操作配置以下设置:

  • CheckpointingEnabled — 指示是否在应用程序中启用了检查点。

  • CheckpointInterval — 包含检查点(持久性)操作之间的时间(以毫秒为单位)。

  • ConfigurationType — 可以将该值设置为 DEFAULT 以使用默认检查点行为;将该值设置为 CUSTOM 以配置其他值。

    注意

    默认检查点行为如下所示:

    • CheckpointingEnabled: 真的

    • CheckpointInterval: 6000 0

    • MinPauseBetweenCheckpoints: 5000

    如果设置ConfigurationTypeDEFAULT,则将使用前面的值,即使使用或通过在应用程序代码中设置值将它们设置为其他值。 AWS Command Line Interface

    注意

    对于 Flink 1.15 及更高版本,Managed Service for Apache Flink 将在自动创建快照stop-with-savepoint期间使用,即应用程序更新、缩放或停止。

  • MinPauseBetweenCheckpoints — 从一个检查点操作结束到另一个检查点操作开始之间的最短时间(以毫秒为单位)。如果设置该值,则可以防止应用程序在检查点操作所花的时间超过 CheckpointInterval 时继续执行检查点操作。

查看检查点示例 API

本节包括为应用程序配置检查点API操作的示例请求。有关如何使用JSON文件作为API操作输入的信息,请参见适用于 Apache 的托管服务 Flink API 示例代码

为新应用程序配置检查点

CreateApplication 操作的以下示例请求在您创建应用程序时配置检查点:

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_19", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "true", "CheckpointInterval": 20000, "ConfigurationType": "CUSTOM", "MinPauseBetweenCheckpoints": 10000 } } }

为新应用程序禁用检查点功能

CreateApplication 操作的以下示例请求在您创建应用程序时禁用检查点:

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_19", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "false" } } }

为现有应用程序配置检查点

UpdateApplication 操作的以下示例请求为现有应用程序配置检查点:

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": true, "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }

对现有应用程序禁用检查点功能

UpdateApplication 操作的以下示例请求为现有应用程序禁用检查点:

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": false, "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }