在 Managed Service for Apache Flink 中實作容錯能力 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Managed Service for Apache Flink 中實作容錯能力

檢查點是用於在 Amazon Managed Service for Apache Flink 中實作容錯能力的方法。檢查點是 up-to-date執行中應用程式的備份,用於從非預期的應用程式中斷或容錯移轉中立即復原。

如需在 Apache Flink 應用程式中進行檢查點的詳細資訊,請參閱 Apache Flink 文件中的檢查點

快照是手動建立和管理的應用程式狀態備份。快照可讓您透過呼叫 UpdateApplication 將應用程式還原到先前的狀態。如需詳細資訊,請參閱使用快照管理應用程式備

如果應用程式已啟用檢查點,則服務會在意外的應用程式重新啟動時建立並載入應用程式資料的備份,藉此提供容錯能力。這些意外的應用程式重新啟動可能是由於意外的作業重新啟動、執行個體失敗等原因造成。這為應用程式提供了與在這些重新啟動期間無故障執行時相同的語義。

如果為應用程式啟用快照,並使用應用程式的 進行設定ApplicationRestoreConfiguration,則服務會在應用程式更新期間或服務相關的擴展或維護期間提供完全一次的處理語義。

在 Managed Service for Apache Flink 中設定檢查點

您可以設定應用程式的檢查點行為。您可以定義應用程式是否保持檢查點狀態、應用程式將狀態儲存至檢查點的頻率,以及某個檢查點操作結束與另一個檢查點操作開始之間的最小間隔。

您可以使用 CreateApplicationUpdateApplicationAPI操作設定下列設定:

  • CheckpointingEnabled:指示是否已在應用程式中啟用檢查點。

  • CheckpointInterval:包含檢查點 (持續性) 作業之間的時間 (毫秒)。

  • ConfigurationType:將此值設定為 DEFAULT 以使用預設檢查點行為。將此值設定為 CUSTOM 以設定其他值。

    注意

    預設的檢查點行為如下:

    • CheckpointingEnabled: true

    • CheckpointInterval:60000

    • MinPauseBetweenCheckpoints:5000

    如果 ConfigurationType 設定為 DEFAULT,即使使用 或應用程式程式碼中的值,將上述值設定為其他值 AWS Command Line Interface,也會使用上述值。

    注意

    對於 Flink 1.15 以上版本,Managed Service for Apache Flink 將在自動建立快照期間 (也就是應用程式更新、擴展或停止時) 使用 stop-with-savepoint

  • MinPauseBetweenCheckpoints:檢查點操作結束與另一個檢查點操作開始之間的最短時間 (毫秒)。設定此值可防止在檢查點操作時間超過 CheckpointInterval 時,應用程式持續執行檢查點。

檢閱檢查點API範例

本節包含設定應用程式檢查點API的動作請求範例。如需如何使用 JSON 檔案輸入 API 動作的資訊,請參閱 Managed Service for Apache Flink API範例程式碼

設定新應用程式的檢查點

CreateApplication 動作的下列請求範例會在您建立應用程式時設定檢查點:

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_19", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "true", "CheckpointInterval": 20000, "ConfigurationType": "CUSTOM", "MinPauseBetweenCheckpoints": 10000 } } }

停用新應用程式的檢查點

CreateApplication 動作的下列請求範例會在您建立應用程式時停用檢查點:

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_19", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "false" } } }

設定現有應用程式的檢查點

UpdateApplication 動作的下列範例請求會為現有的應用程式設定檢查點:

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": true, "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }

停用現有應用程式的檢查點

UpdateApplication 動作的下列範例請求會為現有的應用程式停用檢查點:

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": false, "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }