在 Apache Flink 的受管理服務中實作容錯能力 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Apache Flink 的受管理服務中實作容錯能力

檢查點是用於在 Amazon Managed Service for Apache Flink 中實作容錯能力的方法。檢查點是執行中應用程式的 up-to-date 備份,可用來從意外的應用程式中斷或容錯移轉中立即復原。

如需 Apache Flink 應用程式中檢查點的詳細資訊,請參閱 Apache Flink 文件中的檢查點

快照是手動建立和管理的應用程式狀態備份。快照可讓您透過呼叫 UpdateApplication 將應用程式還原到先前的狀態。如需詳細資訊,請參閱使用快照管理應用程式備

如果應用程式已啟用檢查點,則服務會在意外的應用程式重新啟動時建立並載入應用程式資料的備份,藉此提供容錯能力。這些意外的應用程式重新啟動可能是由於意外的作業重新啟動、執行個體失敗等原因造成。這為應用程式提供了與在這些重新啟動期間無故障執行時相同的語義。

如果已為應用程式啟用快照,並使用應用程式進行設定 ApplicationRestoreConfiguration,則服務會在應用程式更新期間,或在與服務相關的調整或維護期間提供僅一次的處理語意。

在 Apache Flink 的受管理服務中設定檢查點

您可以設定應用程式的檢查點行為。您可以定義應用程式是否保持檢查點狀態、應用程式將狀態儲存至檢查點的頻率,以及某個檢查點操作結束與另一個檢查點操作開始之間的最小間隔。

您可以使用CreateApplicationUpdateApplicationAPI作業來設定下列設定:

  • CheckpointingEnabled:指示是否已在應用程式中啟用檢查點。

  • CheckpointInterval:包含檢查點 (持續性) 作業之間的時間 (毫秒)。

  • ConfigurationType:將此值設定為 DEFAULT 以使用預設檢查點行為。將此值設定為 CUSTOM 以設定其他值。

    注意

    預設的檢查點行為如下:

    • CheckpointingEnabled:

    • CheckpointInterval: 60000

    • MinPauseBetweenCheckpoints: 5000

    如果ConfigurationType設定為DEFAULT,則會使用前面的值,即使使用,或透過在應用程式碼中設定值設定為其他值。 AWS Command Line Interface

    注意

    對於 Flink 1.15 以上版本,Managed Service for Apache Flink 將在自動建立快照期間 (也就是應用程式更新、擴展或停止時) 使用 stop-with-savepoint

  • MinPauseBetweenCheckpoints:檢查點操作結束與另一個檢查點操作開始之間的最短時間 (毫秒)。設定此值可防止在檢查點操作時間超過 CheckpointInterval 時,應用程式持續執行檢查點。

檢閱檢查點範例 API

本節包括設定應用程式檢查點的API動作要求範例。若要取得有關如何使用JSON檔案進API行動作輸入的資訊,請參閱〈〉適用於阿帕奇 Flink API 範例程式碼的託管服務

設定新應用程式的檢查點

CreateApplication 動作的下列請求範例會在您建立應用程式時設定檢查點:

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_19", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "true", "CheckpointInterval": 20000, "ConfigurationType": "CUSTOM", "MinPauseBetweenCheckpoints": 10000 } } }

停用新應用程式的檢查點

CreateApplication 動作的下列請求範例會在您建立應用程式時停用檢查點:

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_19", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::mybucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "false" } } }

設定現有應用程式的檢查點

UpdateApplication 動作的下列範例請求會為現有的應用程式設定檢查點:

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": true, "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }

停用現有應用程式的檢查點

UpdateApplication 動作的下列範例請求會為現有的應用程式停用檢查點:

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": false, "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }