View a markdown version of this page

Flink 2.2 升級的狀態相容性指南 - Managed Service for Apache Flink

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Flink 2.2 升級的狀態相容性指南

從 Flink 1.x 升級到 Flink 2.2 時,狀態相容性問題可能會使您的應用程式無法從快照還原。本指南可協助您識別潛在的相容性問題,並提供遷移策略。

了解狀態相容性變更

Amazon Managed Service for Apache Flink 2.2 引進數個會影響狀態相容性的序列化變更。以下是主要項目:

  • Kryo 版本升級:Apache Flink 2.2 會將綁定的 Kryo 序列化程式從第 2 版升級到第 5 版。由於 Kryo v5 使用與 Kryo v2 不同的二進位編碼格式,因此無法在 Flink 2.2 中還原透過 Kryo 在 Flink 1.x 儲存點中序列化的任何運算子狀態。

  • Java 集合序列化:在 Flink 1.x 中,POJOs 內的 Java 集合 (例如 ArrayListHashMapHashSet) 使用 Kryo 序列化。Flink 2.2 推出集合特定的最佳化序列化程式,其與來自 1.x 的 Kryo 序列化狀態不相容。在 1.x 中使用 Java 集合搭配 POJO 或 Kryo 序列化程式的應用程式無法在 Flink 2.2 中還原此狀態。如需資料類型和序列化的詳細資訊,請參閱 Flink 文件

  • Kinesis 連接器相容性:低於 5.0 的 Kinesis Data Streams (KDS) 連接器版本會維持與 Flink 2.2 Kinesis 連接器 6.0 版不相容的狀態。您必須先遷移至連接器 5.0 版或更新版本,才能升級。

序列化相容性參考

檢閱應用程式中的所有狀態宣告,並將序列化類型與下表比對。如果有任何狀態類型不相容,請先參閱 狀態遷移一節,再繼續升級。

序列化相容性參考
序列化類型 相容? 詳細資訊
Avro (SpecificRecordGenericRecord) 使用自己的二進位格式,獨立於 Kryo。確保您使用的是 Flink 的原生 Avro 類型資訊,而不是註冊為 Kryo 序列化程式的 Avro。
Protobuf 獨立於 Kryo 使用自己的二進位編碼。驗證結構描述變更遵循回溯相容的演變規則。
沒有集合POJOs 由 Flink 的 POJO 序列化程式處理 - 但前提是類別符合所有 POJO 條件:公有類別、公有無錯誤建構函數、所有公開或可透過 getter/setters 存取的欄位,以及所有欄位類型本身可透過 Flink 序列化。違反上述任何一項的 POJO 無提示地落回 Kryo,並變得不相容。
自訂 TypeSerializers 只有在您的序列化程式未在內部委派給 Kryo 時才相容。
SQL 和資料表 API 狀態 是 (具有警告) 使用 Flink 的內部序列化程式。不過,Apache Flink 不保證資料表 API 應用程式的主要版本之間的狀態相容性。首先在非生產環境中進行測試。
具有 Java 集合的 POJOs (HashMapArrayListHashSet) 在 Flink 1.x 中,POJOs 內的集合是透過 Kryo v2 序列化。Flink 2.2 推出專用集合序列化程式,其二進位格式與 Kryo v2 格式不相容。
Scala 案例類別 在 Flink 1.x 中透過 Kryo 序列化。Kryo v2 到 v5 升級會變更二進位格式。
Java 記錄 通常回到 Flink 1.x 中的 Kryo 序列化。透過使用 進行測試進行驗證disableGenericTypes()
第三方程式庫類型 沒有已註冊自訂序列化程式的類型會回到 Kryo。Kryo v2 到 v5 二進位格式變更會中斷相容性。
使用 Kryo 備用的任何類型 如果 Flink 無法處理具有內建或已註冊序列化程式的類型,則會回到 Kryo。來自 1.x 的所有 Kryo 序列化狀態與 2.2 不相容。

診斷方法

您可以在 UpdateApplication API 操作後查看應用程式日誌或檢查日誌,以主動識別狀態相容性問題。

識別應用程式中的 Kryo 備用

您可以在日誌中使用下列 regex 模式來識別應用程式中的 Kryo 備用:

Class class (?<className>[^\s]+) cannot be used as a POJO type

範例日誌:

Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

如果使用 UpdateApplication API 升級失敗,下列例外狀況可能表示您遇到序列化程式型狀態不相容:

IndexOutOfBoundsException

Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923) ... 23 more

StateMigrationException (POJOSerializer)

Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be incompatible with the old state serializer (org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).

升級前檢查清單

  • 檢閱應用程式中的所有狀態宣告

  • 使用集合 (HashMapArrayListHashSet) 檢查 POJOs

  • 驗證每種狀態類型的序列化方法

  • 在此複本上使用 UpdateApplication API 建立生產複本應用程式並測試狀態相容性

  • 如果狀態不相容,請從 選取策略 狀態遷移

  • 在生產 Flink 應用程式組態中啟用自動轉返

狀態遷移

重建完成狀態

最適合可以從來源資料重建狀態的應用程式。

如果您的應用程式可以從來源資料重建狀態:

  1. 停止 Flink 1.x 應用程式

  2. 使用更新的程式碼升級至 Flink 2.x

  3. 從 開始 SKIP_RESTORE_FROM_SNAPSHOT

  4. 允許應用程式重建狀態

aws kinesisanalyticsv2 start-application \ --application-name MyApplication \ --run-configuration '{ "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT" } }'

最佳實務

  1. 一律將 Avro 或 Protobuf 用於複雜狀態 — 這些可提供結構描述演變,且與 Kryo 無關

  2. 避免 POJOs中的集合 — 請MapState改用 Flink 的原生 ListState和 。

  3. 在本機測試狀態還原 - 在生產升級之前,使用實際快照進行測試

  4. 經常拍攝快照 - 特別是在主要版本升級之前

  5. 啟用自動轉返 — 將您的 MSF 應用程式設定為在失敗時自動轉返

  6. 記錄您的狀態類型 — 維護所有狀態類型及其序列化方法的文件

  7. 監控檢查點大小 — 不斷增長的檢查點大小可能表示序列化問題

後續步驟

規劃升級:請參閱 升級到 Flink 2.2:完成指南

如需遷移期間的問題,請參閱 Managed Service for Apache Flink 故障診斷或聯絡 AWS Support。