KCL 1.x 和 2.x 資訊 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

KCL 1.x 和 2.x 資訊

注意

Kinesis Client Library (KCL) 1.x 和 2.x 版已過期。我們建議您遷移至 KCL 3.x 版,這可提供更佳的效能和新功能。如需最新的KCL文件和遷移指南,請參閱 使用 Kinesis 用戶端程式庫

開發可處理KDS資料串流資料的自訂消費者應用程式的方法之一,就是使用 Kinesis Client Library (KCL)。

注意

對於 KCL 1.x 和 KCL 2.x,建議您升級至最新的 KCL 1.x 版本或 KCL 2.x 版本,視您的使用案例而定。1.x KCL 和 KCL 2.x 都會定期更新為較新的版本,其中包含最新的相依性和安全性修補程式、錯誤修正和回溯相容的新功能。如需詳細資訊,請參閱 https://github.com/awslabs/amazon-kinesis-client/ 版本

關於 KCL(先前版本)

KCL 透過處理與分散式運算相關聯的許多複雜任務,協助您使用和處理來自 Kinesis 資料串流的資料。這其中包含跨多個取用者應用程式執行個體的負載平衡、對取用者應用程式執行個體失敗的回應、檢查點處理記錄,以及對重新分片的反應。KCL 負責所有這些子任務,讓您可以專注於撰寫自訂記錄處理邏輯。

與 中APIs提供的 Kinesis Data Streams KCL不同 AWS SDKs。Kinesis Data Streams APIs可協助您管理 Kinesis Data Streams 的許多層面,包括建立串流、重新分片,以及放置和取得記錄。為所有這些子任務KCL提供抽象層,特別是 ,以便您可以專注於消費者應用程式的自訂資料處理邏輯。如需 Kinesis Data Streams 的相關資訊API,請參閱 Amazon Kinesis API參考

重要

KCL 是 Java 程式庫。使用稱為 的多語言界面來支援 Java 以外的語言 MultiLangDaemon。此協助程式以 Java 為基礎,當您使用 Java 以外的KCL語言時,會在背景執行。例如,如果您安裝 KCL for Python 並完全在 Python 中寫入取用者應用程式,則由於 ,您仍需要在系統上安裝 Java MultiLangDaemon。此外, MultiLangDaemon 具有一些預設設定,您可能需要針對您的使用案例自訂這些設定,例如,其連線 AWS 的區域。如需 MultiLangDaemon 上的詳細資訊 GitHub,請參閱 KCL MultiLangDaemon 專案

充KCL當記錄處理邏輯和 Kinesis Data Streams 之間的媒介。

KCL 舊版本

目前,您可以使用下列其中一個支援的 版本KCL來建置自訂消費者應用程式:

您可以使用 KCL 1.x 或 KCL 2.x 來建置使用共用輸送量的消費者應用程式。如需詳細資訊,請參閱使用 開發具有共用輸送量的自訂消費者 KCL

若要建置使用專用輸送量的消費者應用程式 (增強型扇出消費者),您只能使用 KCL 2.x。如需詳細資訊,請參閱開發具有專用輸送量的增強型扇出消費者

如需 1.x KCL 與 KCL 2.x 之間的差異資訊,以及如何從 1.x KCL 遷移至 KCL 2.x 的指示,請參閱 將消費者從 KCL 1.x 遷移至 KCL 2.x

KCL 概念 (先前版本)

  • KCL 取用者應用程式 – 使用 自訂建置的應用程式,KCL旨在從資料串流讀取和處理記錄。

  • 取用者應用程式執行個體 - 取用KCL者應用程式通常會分散,其中一或多個應用程式執行個體會同時執行,以便協調故障和動態載入平衡資料記錄處理。

  • 工作者 – KCL取用者應用程式執行個體用來開始處理資料的高階類別。

    重要

    每個KCL取用者應用程式執行個體都有一個工作者。

    工作者會初始化並監督各種任務,包括同步處理碎片和租用資訊、追蹤碎片指派,以及處理來自碎片的資料。工作者KCL會提供取用者應用程式的組態資訊,例如資料串流的名稱,其資料記錄此KCL取用者應用程式將要處理的資料,以及存取此資料串流所需的 AWS 登入資料。工作者也會啟動該特定KCL消費者應用程式執行個體,將資料串流中的資料記錄交付到記錄處理器。

    重要

    在 KCL 1.x 中,此類別稱為工作者。如需詳細資訊,(這些是 Java KCL 儲存庫),請參閱 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java。在 KCL 2.x 中,此類別稱為 Scheduler。排程器在 KCL 2.x 中的目的與 1.x KCL 中工作者的目的相同。如需 2.x KCL 中排程器類別的詳細資訊,請參閱 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java

  • 租用 – 定義工作者與碎片之間繫結的資料。分散式KCL消費者應用程式使用租用來分割整個工作者機群的資料記錄處理。在任何指定時間,資料記錄的每個碎片都由leaseKey變數識別的租用繫結至特定工作者。

    根據預設,工作者可以同時保留一或多個租用 (取決於maxLeasesFor工作者變數的值)。

    重要

    每個工作者都將爭奪保留資料串流中,所有可用碎片的所有可用租用。但是,只有一名工作者可以在任何時間成功持有每個租用。

    例如,如果您有一個含有工作者 A 的取用者應用程式執行個體 A 正在處理具有 4 個碎片的資料串流,則工作者 A 可以同時持有對碎片 1、2、3 和 4 的租用。但是,如果您有兩個取用者應用程式執行個體:A 和 B 具有工作者 A 和工作者 B,而且這些執行個體正在處理具有 4 個碎片的資料串流,則工作者 A 和工作者 B 無法同時持有對碎片 1 的租用。一個工作者會持有特定碎片的租用,直到準備好停止處理此碎片的資料記錄,或直到失敗為止。當一名工作者停止持有租用時,另一名工作者佔用並持有租用。

    如需詳細資訊 (這些是 Java KCL 儲存庫),請參閱 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java for KCL 1.x 和 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java for KCL 2.x。

  • 租用資料表 - 唯一的 Amazon DynamoDB 資料表,用於追蹤資料串流中的碎片,該KDS資料串流由KCL消費者應用程式的工作者租用和處理。租用資料表必須與KCL消費者應用程式執行時來自資料串流的最新碎片資訊保持同步 (在工作者內和所有工作者之間)。如需詳細資訊,請參閱使用租用資料表追蹤KCL消費者應用程式處理的碎片

  • 記錄處理器 – 定義KCL消費者應用程式如何處理從資料串流取得之資料的邏輯。在執行時間,KCL消費者應用程式執行個體會執行個體化工作者,而此工作者會執行個體化其持有租用的每個碎片的一個記錄處理器。

使用租用資料表追蹤KCL消費者應用程式處理的碎片

什麼是租用資料表

對於每個 Amazon Kinesis Data Streams 應用程式, KCL會使用唯一的租用資料表 (存放在 Amazon DynamoDB 資料表中) 來追蹤KCL消費者應用程式工作者正在租用和處理KDS的資料串流中的碎片。

重要

KCL 使用取用者應用程式的名稱來建立此取用者應用程式使用的租用資料表名稱,因此每個取用者應用程式名稱都必須是唯一的。

您可以在取用者應用程式執行時使用 Amazon DynamoDB 主控台檢視其租用資料表。

如果應用程式啟動時,KCL消費者應用程式的租用資料表不存在,則其中一個工作者會為此應用程式建立租用資料表。

重要

您的帳戶除須支付 Kinesis Data Streams 本身的相關費用外,另將收取與 DynamoDB 資料表關聯的費用。

租用資料表內的每一列代表您的取用者應用程式的工作者所處理的某個碎片。如果您的KCL取用者應用程式僅處理一個資料串流,leaseKey則租用資料表的雜湊金鑰為碎片 ID。如果您是 使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流,則 的結構 leaseKey 如下所示:account-id:StreamName:streamCreationTimestamp:ShardId。例如:111111111:multiStreamTest-1:12345:shardId-000000000336

除了碎片 ID 外,每一列還包含以下資料:

  • checkpoint:碎片的最新檢查點序號。資料串流中所有碎片的此值皆為獨一無二。

  • checkpointSubSequence號碼:使用 Kinesis Producer Library 的彙總功能時,這是檢查點的延伸,可追蹤 Kinesis 記錄中的個別使用者記錄。

  • leaseCounter:用於租用版本控制,以便工作者可以偵測他們的租用已由其他工作者所取得。

  • leaseKey:租用的唯一識別符。每項租用特屬於資料串流中的某個碎片,一次由一個工作者所持有。

  • leaseOwner:保留此租用的工作者。

  • ownerSwitchesSince檢查點:自上次撰寫檢查點以來,此租用變更工作者的次數。

  • parentShardId:用於確保在子碎片上開始處理之前,父碎片已完全處理。這可確保按照記錄放入串流中的相同順序處理記錄。

  • hashrange:PeriodicShardSyncManager 用於執行週期性同步以尋找租用資料表中遺失的碎片,並在需要時為其建立租用。

    注意

    每個以 1.14 和 KCL2.3 KCL 開頭的碎片,此資料都會出現在租用資料表中。如需有關 PeriodicShardSyncManager 和租用與碎片之間的定期同步的詳細資訊,請參閱 租用資料表如何與 Kinesis 資料串流中的碎片同步

  • childshards:LeaseCleanupManager 用於檢閱子碎片的處理狀態,並決定是否可以從租用資料表中刪除父碎片。

    注意

    每個以 1.14 和 KCL2.3 KCL 開頭的碎片,此資料都會出現在租用資料表中。

  • shardID:碎片的 ID。

    注意

    如果您是 使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流,則此資料僅存在於租用資料表中。這僅在適用於 Java 的 KCL 2.x 中受支援,從適用於 Java 的 KCL 2.3 和更新版本開始。

  • 串流名稱資料串流的識別碼,格式如下:account-id:StreamName:streamCreationTimestamp

    注意

    如果您是 使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流,則此資料僅存在於租用資料表中。這僅在適用於 Java 的 KCL 2.x 中受支援,從適用於 Java 的 KCL 2.3 和更新版本開始。

輸送量

如果您的 Amazon Kinesis Data Streams 應用程式收到佈建輸送量例外狀況,則您即應提升 DynamoDB 資料表的佈建輸送量。KCL 建立的資料表其佈建傳輸量為每秒 10 次讀取和每秒 10 次寫入,但這對您的應用程式而言可能不夠。例如,若您的 Amazon Kinesis Data Streams 經常執行檢查點作業或對由多個碎片構成的串流進行操作,您可能就需要更多的輸送量。

如需 DynamoDB 中佈建輸送量的相關資訊,請參閱《Amazon DynamoDB 開發人員指南》中的讀取/寫入容量模式使用資料表和資料

租用資料表如何與 Kinesis 資料串流中的碎片同步

KCL 消費者應用程式中的工作者使用租用來處理來自指定資料串流的碎片。在任何給定時間,哪個工作者正在租用哪個碎片的資訊存儲在租用資料表中。在KCL取用者應用程式執行時,租用資料表必須與資料串流的最新碎片資訊保持同步。 KCL 會將租用資料表與取用者應用程式引導期間 (初始化或重新啟動取用者應用程式時) 向 Kinesis Data Streams 服務取得的碎片資訊同步,以及每當處理中的碎片達到結束 (重新分片) 時同步。換言之,工作者或KCL取用者應用程式會與其在初始取用者應用程式引導期間以及每當取用者應用程式遇到資料串流碎片事件時所處理的資料串流同步。

1.0 - 1.13 KCL 和 KCL 2.0 - 2.2 中的同步

在 KCL 1.0 - 1.13 和 KCL 2.0 - 2.2 中,在消費者應用程式的引導期間,以及在每個資料串流碎片事件期間, 會透過叫用 ListShardsDescribeStream 探索,將租用資料表與從 Kinesis Data Streams 服務取得的碎片資訊KCL同步APIs。在上述所有KCL版本中,KCL消費者應用程式的每位工作者都會完成下列步驟,以在消費者應用程式的引導期間和每個串流重新碎片事件執行租用/碎片同步程序:

  • 擷取正在處理的資料串流的所有碎片

  • 從租用資料表中擷取所有碎片租用

  • 篩選出租用資料表中沒有租用的每個開放碎片

  • 逐一查看所有找到的開放碎片以及每個沒有開放父級的開放碎片:

    • 透過其祖先路徑遍歷樹狀結構,以確定碎片是否為子代。如果正在處理祖系碎片 (租用資料表中存在祖系碎片的租用項目),或者應處理祖系碎片 (例如,如果初始位置為 TRIM_HORIZONAT_TIMESTAMP),則碎片即視為子代

    • 如果內容中開啟的碎片是子系, 會根據初始位置對碎片進行KCL檢查點,並視需要為其父系建立租用

2.x KCL 中的同步,從 2.3 KCL 及更新版本開始

從最新支援的 KCL 2.x (KCL 2.3) 和更新版本開始,程式庫現在支援同步程序的下列變更。這些租用/碎片同步變更可大幅減少KCL消費者應用程式對 Kinesis Data Streams 服務的API呼叫次數,並最佳化KCL消費者應用程式中的租用管理。

  • 在應用程式的引導期間,如果租用資料表是空的, KCL會使用 ListShard API的篩選選項 (ShardFilter選用的請求參數) 來擷取和建立租用,僅限在 ShardFilter 參數指定的時間開啟的碎片快照。ShardFilter 參數可讓您篩選掉 ListShards 的回應API。ShardFilter 參數的唯一必要屬性是 Type。 KCL使用Type篩選條件屬性及其下列有效值來識別和傳回可能需要新租用的開啟碎片快照:

    • AT_TRIM_HORIZON - 回應包括所有在 TRIM_HORIZON 打開的碎片。

    • AT_LATEST - 回應僅包含目前開放的資料串流碎片。

    • AT_TIMESTAMP - 回應包含開始時間戳記小於或等於指定時間戳記,且結束時間戳記大於或等於指定時間戳記或仍處於開放狀態的所有碎片。

    ShardFilter 用於為空租用資料表建立租用,以針對在 RetrievalConfig#initialPositionInStreamExtended 指定之碎片的快照初始化租用。

    如需有關 ShardFilter 的詳細資訊,請參閱 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html

  • 而不是所有執行lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard同步的工作者。

  • KCL 2.3 使用 GetRecords和 的ChildShards傳回參數SubscribeToShardAPIs,對封閉碎片執行SHARD_END在 發生的租用/碎片同步,允許KCL工作者僅為已完成處理之碎片的子碎片建立租用。對於在整個取用者應用程式中共用,此租用/碎片同步的最佳化會使用 GetRecordsChildShards 參數API。對於專用輸送量 (增強的扇出) 消費者應用程式,此租用/碎片同步最佳化會使用 SubscribeToShardChildShards 參數API。如需詳細資訊,請參閱GetRecordsSubscribeToShardsChildShard

  • 隨著上述變更, 的行為KCL會從學習所有現有碎片的所有員工模型,轉移到僅學習每個工作者所擁有碎片子碎片的工作者模型。因此,除了在消費者應用程式引導和重新碎片事件期間發生的同步之外, KCL 現在還會執行額外的定期碎片/碎片掃描,以識別租用資料表中的任何潛在孔洞 (換言之,了解所有新碎片),以確保資料處理串流的完整雜湊範圍,並視需要為其建立租用。 PeriodicShardSyncManager 是負責執行定期租用/碎片掃描的元件。

    如需 2.3 KCL PeriodicShardSyncManager中的詳細資訊,請參閱 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213

    在 KCL 2.3 版中,新的組態選項可用於在 PeriodicShardSyncManager中設定LeaseManagementConfig

    名稱 預設值 描述
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000 (2 分鐘)

    稽核人員工作掃描租用資料表中部分租用的頻率 (以毫秒為單位)。如果稽核人員偵測到串流租用中的任何漏洞,則會根據 leasesRecoveryAuditorInconsistencyConfidenceThreshold 觸發碎片同步處理。

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期稽核人員工作的信賴閾值,用於確定租用資料表中資料串流的租用是否不一致。如果稽核人員多次連續發現同一組資料串流的不一致,則會觸發碎片同步處理。

    現在也會發出新的 CloudWatch 指標來監控 的運作狀態PeriodicShardSyncManager。如需詳細資訊,請參閱PeriodicShardSyncManager

  • 包括 HierarchicalShardSyncer 的最佳化,以僅為一層碎片建立租用。

1.x KCL 中的同步,從 1.14 KCL 及更新版本開始

從 1.x KCL (KCL 1.14) 及更新版本的最新支援版本開始,程式庫現在支援同步程序的下列變更。這些租用/碎片同步變更可大幅減少KCL消費者應用程式對 Kinesis Data Streams 服務的API呼叫次數,並最佳化KCL消費者應用程式中的租用管理。

  • 在應用程式的引導期間,如果租用資料表是空的, KCL會使用 ListShard API的篩選選項 (ShardFilter選用的請求參數) 來擷取和建立租用,僅限在 ShardFilter 參數指定的時間開啟的碎片快照。ShardFilter 參數可讓您篩選掉 ListShards 的回應API。ShardFilter 參數的唯一必要屬性是 Type。 KCL使用Type篩選條件屬性及其下列有效值來識別和傳回可能需要新租用的開啟碎片快照:

    • AT_TRIM_HORIZON - 回應包括所有在 TRIM_HORIZON 打開的碎片。

    • AT_LATEST - 回應僅包含目前開放的資料串流碎片。

    • AT_TIMESTAMP - 回應包含開始時間戳記小於或等於指定時間戳記,且結束時間戳記大於或等於指定時間戳記或仍處於開放狀態的所有碎片。

    ShardFilter 用於為空租用資料表建立租用,以針對在 KinesisClientLibConfiguration#initialPositionInStreamExtended 指定之碎片的快照初始化租用。

    如需有關 ShardFilter 的詳細資訊,請參閱 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html

  • 而不是所有執行lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard同步的工作者。

  • KCL 1.14 使用 GetRecords和 的ChildShards傳回參數SubscribeToShardAPIs,對封閉碎片執行SHARD_END在 發生的租用/碎片同步,允許KCL工作者僅為已完成處理之碎片的子碎片建立租用。如需詳細資訊,請參閱 GetRecordsChildShard

  • 隨著上述變更, 的行為KCL會從學習所有現有碎片的所有員工模型,轉移到僅學習每個工作者所擁有碎片子碎片的工作者模型。因此,除了在消費者應用程式引導和重新碎片事件期間發生的同步之外, KCL 現在還會執行額外的定期碎片/碎片掃描,以識別租用資料表中的任何潛在孔洞 (換言之,了解所有新碎片),以確保資料處理串流的完整雜湊範圍,並視需要為其建立租用。 PeriodicShardSyncManager 是負責執行定期租用/碎片掃描的元件。

    KinesisClientLibConfiguration#shardSyncStrategyType 設定為 ShardSyncStrategyType.SHARD_END 時,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold 用於確定包含租用資料表中漏洞的連續掃描數目臨界值,之後強制執行碎片同步化。當 KinesisClientLibConfiguration#shardSyncStrategyType 設定為 ShardSyncStrategyType.PERIODIC 時,會忽略 leasesRecoveryAuditorInconsistencyConfidenceThreshold

    如需 KCL 1.14 PeriodicShardSyncManager中的詳細資訊,請參閱 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999

    在 KCL 1.14 版中,新的組態選項可用於在 PeriodicShardSyncManager中設定 LeaseManagementConfig

    名稱 預設值 描述
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期稽核人員工作的信賴閾值,用於確定租用資料表中資料串流的租用是否不一致。如果稽核人員多次連續發現同一組資料串流的不一致,則會觸發碎片同步處理。

    現在也會發出新的 CloudWatch 指標來監控 的運作狀態PeriodicShardSyncManager。如需詳細資訊,請參閱PeriodicShardSyncManager

  • KCL 1.14 現在也支援延遲租用清理。當碎片超過資料串流的保留期限或因重新分片操作而關閉時,達到 SHARD_END 時,LeaseCleanupManager 會以非同步方式刪除租用。

    新組態選項可用於設定 LeaseCleanupManager

    名稱 預設值 描述
    leaseCleanupInterval米利斯

    1 分鐘

    執行租用清除執行緒的間隔。

    completedLeaseCleanupIntervalMillis 5 分鐘

    檢查租用是否完成的間隔。

    garbageLeaseCleanupIntervalMillis 30 分鐘

    檢查租用是否為垃圾 (亦即超過資料串流保留期所進行的修剪) 的間隔。

  • 包括 KinesisShardSyncer 的最佳化,以僅為一層碎片建立租用。

使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流

本節說明 2.x for Java KCL 中的下列變更,可讓您建立可同時處理多個資料串流的KCL取用者應用程式。

重要

2.x for Java KCL 僅支援多串流處理,從 KCL 2.3 for Java 和更新版本開始。

可實作 2.x KCL 的任何其他語言都NOT支援多串流處理。

任何 1.x KCL 版本都NOT支援多串流處理。

  • MultistreamTracker 介面

    若要建置可同時處理多個串流的消費者應用程式,您必須實作稱為 的新界面MultistreamTracker。此界面包含傳回資料串流及其組態清單streamConfigList的方法,以供KCL取用者應用程式處理。請注意,正在處理的資料串流可以在取用者應用程式執行階段期間變更。 streamConfigList會定期呼叫 KCL,以了解要處理的資料串流中的變更。

    streamConfigList 方法會填入StreamConfig清單。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    請注意,StreamIdentifierInitialPositionInStreamExtended 是必填欄位,而 consumerArn 是選填欄位。consumerArn 只有在您使用 KCL 2.x 實作增強型扇出消費者應用程式時,才必須提供 。

    如需 的詳細資訊StreamIdentifier,請參閱 https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129。若要建立 StreamIdentifier,我們建議您從 streamArnstreamCreationEpoch v2.5.0 及更新版本中提供的 建立多串流執行個體。在不支援 的 v2KCL.3 和 v2.4 中streamArm,使用 格式 建立多串流執行個體。 account-id:StreamName:streamCreationTimestamp從下一個主要版本開始,此格式將被取代,不再受支援。

    MultistreamTracker 還包括刪除租用資料表 (formerStreamsLeasesDeletionStrategy) 中舊串流的租用的策略。請注意,策略CANNOT會在取用者應用程式執行時間期間變更。如需詳細資訊,請參閱 https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java

  • ConfigsBuilder 是全應用程式類別,可用來指定建置KCL取用者應用程式時要使用的所有 KCL 2.x 組態設定。 ConfigsBuilder類別現在支援 MultistreamTracker 界面。您可以使用一個資料串流的名稱初始化 ConfigsBuilder其中一個 ,以取用來自下列項目的記錄:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    或者, ConfigsBuilder MultiStreamTracker如果您想要實作同時處理多個串流的KCL消費者應用程式,您可以使用 初始化 。

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • 隨著針對KCL消費者應用程式實作多串流支援,應用程式租用資料表的每一列現在都包含碎片 ID 和此應用程式處理的多個資料串流的串流名稱。

  • 實作KCL消費者應用程式的多串流支援時, leaseKey 會採用下列結構:account-id:StreamName:streamCreationTimestamp:ShardId。例如:111111111:multiStreamTest-1:12345:shardId-000000000336

    重要

    當您現有的KCL取用者應用程式設定為僅處理一個資料串流時, leaseKey (這是租用資料表的雜湊金鑰) 是碎片 ID。如果您重新設定此現有KCL取用者應用程式來處理多個資料串流,它會中斷您的租用資料表,因為使用多串流支援, leaseKey 結構必須如下所示:account-id:StreamName:StreamCreationTimestamp:ShardId

KCL 搭配 AWS Glue 結構描述登錄檔使用

您可以將 Kinesis 資料串流與 AWS Glue 結構描述登錄檔整合。 AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展結構描述,同時確保已註冊結構描述持續驗證產生的資料。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue結構描述登錄檔可讓您改善 end-to-end串流應用程式內的資料品質和資料控管。如需詳細資訊,請參閱 AWS Glue 結構描述登錄檔。設定此整合的其中一種方法是透過 Java KCL中的 。

重要

目前,Kinesis Data Streams 和 AWS Glue Schema Registry 整合僅支援使用在 Java 中實作KCL的 2.3 消費者的 Kinesis 資料串流。不提供多語言支援。 KCL 不支援 1.0 消費者。 KCL 不支援 2.3 之前的 KCL 2.x 消費者。

如需如何使用 設定 Kinesis Data Streams 與結構描述登錄檔整合的詳細資訊KCL,請參閱「使用 KPL/KCL 程式庫與資料互動」一節:將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合