處理重複的記錄 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

處理重複的記錄

有兩個主要原因可能會導致多次將記錄交付至您的 Amazon Kinesis Data Streams:生產者重試和取用者重試。您的應用程式必須預料並妥善因應多次處理個別記錄的問題。

生產者重試

試想有一個生產者,其呼叫 PutRecord 後但仍未能收到 Amazon Kinesis Data Streams 的確認便遇到了與網路相關的逾時情況。此生產者無法確定記錄是否已交付至 Kinesis Data Streams。假設每一筆記錄對應用程式都很重要,生產者即會撰寫成使用相同的資料重試呼叫。如果就相同的資料呼叫兩次 PutRecord 均已成功遞交至 Kinesis Data Streams,則將會有兩筆 Kinesis Data Streams 記錄。儘管這兩筆記錄具有相同的資料,但其序號各不相同。需要嚴格保證的應用程式應於記錄中嵌入主索引鍵,以便稍後進行處理時移除重複項目。請注意,因生產者重試而造成的重複項目數通常會比因消費者重試而造成的重複項目數來得少。

注意

如果您使用 AWS SDKPutRecord,請在 SDK 和工具使用者指南中AWS 瞭解 SDK 重試行為

消費者重試

消費者 (資料處理應用程式) 重試是在記錄處理器重新啟動時發生。同一碎片的記錄處理器將於以下情況重新啟動:

  1. 工作者意外終止

  2. 新增或移除工作者執行個體

  3. 碎片合併或分割

  4. 部署應用程式

在所有這些情況下, shards-to-worker-to-record 處理器映射會持續更新為負載平衡處理。已遷移至其他執行個體的碎片處理器將從最後一個檢查點重新啟動處理記錄。這會導致重複的記錄處理,如以下範例所示。如需負載平衡的詳細資訊,請參閱重新分片、擴展和平行處理

範例:消費者重試導致再次交付記錄

本範例中的應用程式將持續從串流讀取記錄、彙整記錄至本機檔案,然後上傳該檔案到 Amazon S3。為求簡化,假設只有 1 個碎片並由 1 個工作者處理此碎片。試想以下發生的一系列範例事件,假設最後一個檢查點位於記錄編號 10000 處:

  1. 工作者從碎片讀取下一批次的記錄,即記錄 10001 到 20000。

  2. 工作者隨後將該批次記錄傳遞至關聯的記錄處理器。

  3. 記錄處理器彙總資料、建立 Amazon S3 檔案並成功將該檔案上傳到 Amazon S3。

  4. 工作者在新的檢查點到達之前意外終止。

  5. 應用程式、工作者和記錄處理器重新啟動。

  6. 工作者現在開始從上次成功的檢查點 (本例中為 10001) 進行讀取。

因此,記錄 10001-20000 取用了一次以上。

彈性因應消費者重試

即使記錄可能經過多次處理,您的應用程式也許希望能體現副作用,猶如只處理一次記錄那樣 (等冪處理)。此問題的解決方法因複雜度與準確度而異。如果最終資料的目的地能夠妥善處理重複項目,建議您憑藉最終目的地以實現等冪處理。例如,透過 Opensearch,您可以組合運用版本控制和唯一 ID 避免重複進行處理。

回顧前一節的範例應用程式,其持續從串流讀取記錄、彙整記錄至本機檔案,然後上傳該檔案到 Amazon S3。如該節所示,記錄 10001-20000 取用了一次以上,導致多個 Amazon S3 檔案具有相同的資料。化解該範例發生重複情況的一種方法是確保步驟 3 使用以下機制:

  1. 記錄處理器就每個 Amazon S3 檔案使用固定數目的記錄,例如 5000。

  2. 檔案名稱使用以下結構描述:Amazon S3 字首、碎片 ID 和 First-Sequence-Num。在本例中,其可能類似於 sample-shard000001-10001

  3. 上傳 Amazon S3 檔案後,透過指定 Last-Sequence-Num 執行檢查點作業。本例應於記錄編號 15000 處執行檢查點作業。

利用上述機制,即使記錄經過多次處理,產生的 Amazon S3 檔案也會具有相同的名稱和相同的資料。重試只會導致多次將相同的資料寫入同一個檔案。

若發生重新分片操作的情況,留存於碎片中的記錄數目可能少於您所需要的固定數目。就本例而言,您必須透過 shutdown() 方法將檔案排清至 Amazon S3 並對最後一個序號執行檢查點作業。上述機制同樣相容於重新分片操作。