

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Kinesis 用戶端程式庫
<a name="kcl"></a>

## 什麼是 Kinesis Client Library？
<a name="kcl-library-what-is"></a>

Kinesis Client Library (KCL) 是獨立的 Java 軟體程式庫，旨在簡化從 Amazon Kinesis Data Streams 取用和處理資料的程序。KCL 會處理許多與分散式運算相關的複雜任務，讓開發人員專注於實作其商業邏輯以處理資料。它會管理各種活動，例如跨多個工作者的負載平衡、回應工作者故障、檢查點處理過的記錄，以及回應串流中碎片數量的變更。

KCL 經常更新，以納入基礎程式庫的較新版本、安全性改善和錯誤修正。我們建議您使用最新版本的 KCL，以避免已知問題並從所有最新的改進中獲益。若要尋找最新的 KCL 版本，請參閱 [ KCL Github](https://github.com/awslabs/amazon-kinesis-client)。

**重要**  
我們建議您使用最新的 KCL 版本，以避免已知的錯誤和問題。如果您使用的是 KCL 2.6.0 或更早版本，請升級至 KCL 2.6.1 或更新版本，以避免在串流容量變更時封鎖碎片處理的罕見情況。
KCL 是 Java 程式庫。使用名為 MultiLangDaemon 的 Java 型協助程式來支援 Java 以外的語言。MultiLangDaemon 透過 STDIN 和 STDOUT 與 KCL 應用程式互動。如需 GitHub 上 MultiLangDaemon 的詳細資訊，請參閱 [使用非 Java 語言開發具有 KCL 的消費者](develop-kcl-consumers-non-java.md)。
請勿將 2.27.19 到 2.27.23 適用於 Java 的 AWS SDK 版與 KCL 3.x 搭配使用。這些版本包含導致與 KCL DynamoDB 用量相關的例外狀況錯誤的問題。我們建議您使用 2 適用於 Java 的 AWS SDK .28.0 版或更新版本，以避免此問題。

## KCL 主要功能和優點
<a name="kcl-benefits"></a>

以下是 KCL 的主要功能和相關優點：
+ **可擴展性**：KCL 可將處理負載分散到多個工作者，讓應用程式能夠動態擴展。您可以手動或使用自動擴展來擴展應用程式，而無需擔心負載重新分佈。
+ **負載平衡**：KCL 會自動平衡可用工作者的處理負載，進而讓工作者的工作分佈均勻。
+ **檢查點**：KCL 管理已處理記錄的檢查點，讓應用程式能夠從上次成功處理的位置繼續處理。
+ **容錯**能力：KCL 提供內建容錯能力機制，確保即使個別工作者失敗，資料處理仍會繼續。KCL 也提供at-least-once的交付。
+ **處理串流層級變更**：KCL 會適應因資料磁碟區變更而可能發生的碎片分割和合併。它透過確保子碎片僅在其父碎片完成和檢查點之後才處理來維持排序。
+ **監控**：KCL 與 Amazon CloudWatch 整合，以進行消費者層級監控。
+ **多語言支援**：KCL 原生支援 Java，並透過 MultiLangDaemon 啟用多種非 Java 程式設計語言。

# KCL 概念
<a name="kcl-concepts"></a>

本節說明 Kinesis Client Library (KCL) 的核心概念和互動。這些概念是開發和管理 KCL 消費者應用程式的基礎。
+ **KCL 取用者應用程式** – 自訂建置的應用程式，旨在使用 Kinesis Client Library 從 Kinesis 資料串流讀取和處理記錄。
+ **工作者** – KCL 取用者應用程式通常會分散，並同時執行一或多個工作者。KCL 會協調工作者以分散式方式使用來自串流的資料，並在多個工作者之間平均平衡負載。
+ **排程器** – KCL 工作者用來開始處理資料的高階類別。每個 KCL 工作者都有一個排程器。排程器會初始化和監督各種任務，包括從 Kinesis 資料串流同步碎片資訊、追蹤工作者之間的碎片指派，以及根據指派給工作者的碎片處理串流中的資料。排程器可以採用各種會影響排程器行為的組態，例如要處理 和 AWS 憑證的串流名稱。排程器會啟動將資料記錄從串流交付至記錄處理器。
+ **記錄處理器** – 定義 KCL 取用者應用程式如何處理從資料串流接收之資料的邏輯。您必須在記錄處理器中實作自己的自訂資料處理邏輯。KCL 工作者會執行個體化排程器。排程器接著會針對其持有租用的每個碎片，執行個體化一個記錄處理器。工作者可以執行多個記錄處理器。
+ **租用** – 定義工作者與碎片之間的指派。KCL 取用者應用程式使用租用將資料記錄處理分散到多個工作者。每個碎片在任何指定時間只能透過租用繫結至一個工作者，而且每個工作者可以同時保留一或多個租用。當工作者因為停止或失敗而停止保留租用時，KCL 會指派另一個工作者來接受租用。若要進一步了解租用，請參閱 [Github 文件：租用生命週期](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle)。
+ **租用資料表** – 是唯一的 Amazon DynamoDB 資料表，用於追蹤 KCL 取用者應用程式的所有租用。每個 KCL 取用者應用程式都會建立自己的租用資料表。租用資料表用於維護所有工作者的狀態，以協調資料處理。如需詳細資訊，請參閱[KCL 中的 DynamoDB 中繼資料表和負載平衡](kcl-dynamoDB.md)。
+ **檢查點** – 是將上次成功處理記錄的位置持續儲存在碎片中的程序。KCL 會管理檢查點，以確保在工作者失敗或應用程式重新啟動時，可以從最後一個檢查點位置繼續處理。檢查點會存放在 DynamoDB 租用資料表中，做為租用中繼資料的一部分。這可讓工作者從上一個工作者停止的位置繼續處理。

# KCL 中的 DynamoDB 中繼資料表和負載平衡
<a name="kcl-dynamoDB"></a>

KCL 會管理工作者的中繼資料，例如租用和 CPU 使用率指標。KCL 會使用 DynamoDB 資料表追蹤這些中繼資料。對於每個 Amazon Kinesis Data Streams 應用程式，KCL 會建立三個 DynamoDB 資料表來管理中繼資料：租用資料表、工作者指標資料表和協調器狀態資料表。

**注意**  
KCL 3.x 推出了兩個新的中繼資料表：*工作者指標*和*協調器狀態*表。

**重要**  
 您必須為 KCL 應用程式新增適當的許可，才能在 DynamoDB 中建立和管理中繼資料表。如需詳細資訊，請參閱[KCL 取用者應用程式所需的 IAM 許可](kcl-iam-permissions.md)。  
KCL 取用者應用程式不會自動移除這三個 DynamoDB 中繼資料資料表。當您停用取用者應用程式時，請務必移除 KCL 取用者應用程式建立的這些 DynamoDB 中繼資料表，以避免不必要的成本。

## 租用資料表
<a name="kcl-leasetable"></a>

租用資料表是唯一的 Amazon DynamoDB 資料表，用於追蹤 KCL 取用者應用程式的排程器所租用和處理的碎片。每個 KCL 取用者應用程式都會建立自己的租用資料表。根據預設，KCL 會使用取用者應用程式的名稱做為租用資料表的名稱。您可以使用組態設定自訂資料表名稱。KCL 也會使用 leaseOwner 的分割區索引鍵在租用資料表上建立[全域次要索引](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html)，以實現高效的租用探索。全域次要索引會從基本租用資料表鏡像 leaseKey 屬性。如果應用程式啟動時 KCL 取用者應用程式的租用資料表不存在，則其中一個工作者會為您的應用程式建立租用資料表。

您可以在取用者應用程式執行時使用 [Amazon DynamoDB 主控台](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html)檢視其租用資料表。

**重要**  
每個 KCL 取用者應用程式名稱都必須是唯一的，以防止重複的租用資料表名稱。
您的帳戶除須支付 Kinesis Data Streams 本身的相關費用外，另將收取與 DynamoDB 資料表關聯的費用。

租用資料表中的每一列都代表您取用者應用程式的排程器正在處理的碎片。金鑰欄位包括下列項目：
+ **leaseKey：**對於單一串流處理，這是碎片 ID。對於使用 KCL 的多串流處理，其結構為 `account-id:StreamName:streamCreationTimestamp:ShardId`。 leaseKey 是租用資料表的分割區索引鍵。如需多串流處理的詳細資訊，請參閱 [使用 KCL 進行多串流處理](kcl-multi-stream.md)。
+ **checkpoint：**碎片的最新檢查點序號。
+ **checkpointSubSequenceNumber：**使用 Kinesis Producer Library 的彙整功能時，此為 **checkpoint** 的延伸，將追蹤 Kinesis 記錄內的個別使用者記錄。
+ **leaseCounter：**用於檢查工作者目前是否正在主動處理租用。如果將租用所有權轉移給其他工作者，則 leaseCounter 會增加。
+ **leaseOwner：**目前持有此租用的工作者。
+ **ownerSwitchesSinceCheckpoint：**自上次檢查點以來，此租用變更工作者的次數。
+ **parentShardId：**此碎片父項的 ID。在子碎片上開始處理之前，請確定父碎片已完全處理，並維持正確的記錄處理順序。
+ **childShardId：**此碎片分割或合併所產生的子碎片 IDs 清單。用於在重新分片操作期間追蹤碎片歷程和管理處理順序。
+ **startingHashKey：**此碎片的雜湊金鑰範圍下限。
+ **endingHashKey：**此碎片的雜湊金鑰範圍上限。

如果您搭配 KCL 使用多串流處理，您會在租用資料表中看到下列兩個額外欄位。如需詳細資訊，請參閱[使用 KCL 進行多串流處理](kcl-multi-stream.md)。
+ **shardID：**碎片的 ID。
+ **streamName：**資料串流的識別符，格式如下：`account-id:StreamName:streamCreationTimestamp`。

## 工作者指標資料表
<a name="kcl-worker-metrics-table"></a>

工作者指標資料表是每個 KCL 應用程式的唯一 Amazon DynamoDB 資料表，用於記錄每個工作者的 CPU 使用率指標。KCL 將使用這些指標來執行有效的租用指派，以產生工作者之間的資源使用率平衡。KCL 預設會將 `KCLApplicationName-WorkerMetricStats`用於工作者指標表的名稱。

## 協調器狀態資料表
<a name="kcl-coordinator-state-table"></a>

協調器狀態資料表是每個 KCL 應用程式的唯一 Amazon DynamoDB 資料表，用於存放工作者的內部狀態資訊。例如，協調器狀態資料表會儲存有關領導者選擇的資料，或與 KCL 2.x 就地遷移至 KCL 3.x 相關聯的中繼資料。根據預設，KCL 會使用 `KCLApplicationName-CoordinatorState`做為協調器狀態資料表的名稱。

## KCL 所建立中繼資料資料表的 DynamoDB 容量模式
<a name="kcl-capacity-mode"></a>

根據預設，Kinesis Client Library (KCL) 會使用[隨需容量模式](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html)建立 DynamoDB 中繼資料表，例如租用表、工作者指標表和協調器狀態表。此模式會自動擴展讀取和寫入容量，以容納流量，而不需要容量規劃。我們強烈建議您將容量模式保留為隨需模式，以便更有效率地操作這些中繼資料表。

如果您決定將租用資料表切換為[佈建容量模式](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html)，請遵循下列最佳實務：
+ 分析用量模式：
  + 使用 Amazon CloudWatch 指標監控應用程式的讀取和寫入模式和用量 (RCU、WCU)。
  + 了解尖峰和平均輸送量需求。
+ 計算所需的容量：
  + 根據您的分析估計讀取容量單位 RCUs) 和寫入容量單位 WCUs)。
  + 考慮碎片數量、檢查點頻率和工作者計數等因素。
+ 實作自動擴展：
  + 使用 [DynamoDB Auto Scaling](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling) 自動調整佈建容量，並設定適當的最小和最大容量限制。
  + DynamoDB Auto Scaling 有助於避免 KCL 中繼資料資料表達到容量限制並受到調節。
+ 定期監控和最佳化：
  + 持續監控 的 CloudWatch 指標`ThrottledRequests`。
  + 隨著工作負載隨著時間的變化調整容量。

如果您在 KCL 取用者應用程式的中繼資料 DynamoDB 資料表`ProvisionedThroughputExceededException`中遇到 ，則必須增加 DynamoDB 資料表的佈建輸送量。如果您在第一次建立取用者應用程式時設定特定層級的讀取容量單位 (RCU) 和寫入容量單位 (WCU)，則隨著用量的增加，可能還不夠。例如，如果您的 KCL 取用者應用程式經常檢查點，或在具有許多碎片的串流上操作，您可能需要更多容量單位。如需有關 DynamoDB 中佈建輸送量的資訊，請參閱《Amazon [DynamoDB 開發人員指南》中的 DynamoDB 輸送量容量](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html)和[更新資料表](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable)。 DynamoDB 

## KCL 如何指派租用給工作者並平衡負載
<a name="kcl-assign-leases"></a>

KCL 會持續從執行工作者的運算主機收集和監控 CPU 使用率指標，以確保工作負載分佈均勻。這些 CPU 使用率指標會存放在 DynamoDB 的工作者指標表中。如果 KCL 偵測到某些工作者的 CPU 使用率高於其他工作者，則會在工作者之間重新指派租用，以降低高度使用工作者的負載。目標是更平均地平衡整個取用者應用程式機群的工作負載，防止任何單一工作者超載。隨著 KCL 將 CPU 使用率分配到整個取用者應用程式機群，您可以選擇正確的工作者數量或使用自動擴展來有效管理運算容量，以實現更低的成本，從而調整取用者應用程式機群容量的大小。

**重要**  
只有在符合特定先決條件時，KCL 才能從工作者收集 CPU 使用率指標。如需詳細資訊，請參閱[先決條件](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites)。如果 KCL 無法從工作者收集 CPU 使用率指標，KCL 將回復為使用每個工作者的輸送量來指派租用，並平衡機群中工作者之間的負載。KCL 會監控每個工作者在指定時間收到的輸送量，並重新指派租用，以確保每個工作者從其指派的租用中取得類似的總輸送量層級。

# 使用 KCL 開發消費者
<a name="develop-kcl-consumers"></a>

您可以使用 Kinesis Client Library (KCL) 來建置取用者應用程式，以處理來自 Kinesis 資料串流的資料。

KCL 提供多種語言。本主題涵蓋如何以 Java 和非 Java 語言開發 KCL 消費者。
+ 若要檢視 Kinesis Client Library Javadoc 參考，請參閱 [Amazon Kinesis Client Library Javadoc](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html)。
+ 若要從 GitHub 下載適用於 Java 的 KCL，請參閱適用於 [Java 的 Amazon Kinesis 用戶端程式庫](https://github.com/awslabs/amazon-kinesis-client)。
+ 若要在 Apache Maven 上尋找適用於 Java 的 KCL，請參閱 [KCL Maven 中央儲存庫](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client)。

**Topics**
+ [在 Java 中使用 KCL 開發消費者](develop-kcl-consumers-java.md)
+ [使用非 Java 語言開發具有 KCL 的消費者](develop-kcl-consumers-non-java.md)

# 在 Java 中使用 KCL 開發消費者
<a name="develop-kcl-consumers-java"></a>

## 先決條件
<a name="develop-kcl-consumers-java-prerequisites"></a>

開始使用 KCL 3.x 之前，請確定您有下列項目：
+ Java 開發套件 (JDK) 8 或更新版本
+ 適用於 Java 的 AWS SDK 2.x
+ Maven 或 Gradle 用於相依性管理

KCL 會從工作者正在執行的運算主機收集 CPU 使用率指標，例如 CPU 使用率，以平衡負載，實現工作者之間的平均資源使用率層級。若要讓 KCL 從工作者收集 CPU 使用率指標，您必須符合下列先決條件：

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ 您的作業系統必須是 Linux 作業系統。
+ 您必須在 EC[IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)2。 EC2 

 **Amazon EC2 上的 Amazon EC2)**
+ 您的作業系統必須是 Linux 作業系統。
+ 您必須啟用 [ECS 任務中繼資料端點第 4 版](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html)。
+ 您的 Amazon ECS 容器代理程式版本必須為 1.39.0 或更新版本。

 **上的 Amazon ECS AWS Fargate**
+ 您必須啟用 [Fargate 任務中繼資料端點第 4 版](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html)。如果您使用 Fargate 平台 1.4.0 版或更新版本，預設會啟用此功能。
+ Fargate 平台 1.4.0 版或更新版本。

 **Amazon EC2 上的 Amazon Elastic Kubernetes Service (Amazon EKS)** 
+ 您的作業系統必須是 Linux 作業系統。

 **上的 Amazon EKS AWS Fargate**
+ Fargate 平台 1.3.0 或更新版本。

**重要**  
如果 KCL 無法從工作者收集 CPU 使用率指標，KCL 會回復為使用每個工作者的輸送量來指派租用，並平衡機群中工作者的負載。如需詳細資訊，請參閱[KCL 如何指派租用給工作者並平衡負載](kcl-dynamoDB.md#kcl-assign-leases)。

## 安裝和新增相依性
<a name="develop-kcl-consumers-java-installation"></a>

如果您使用的是 Maven，請將下列相依性新增至 `pom.xml` 檔案。請確定您已將 3.x.x 取代為最新的 KCL 版本。

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

如果您使用的是 Gradle，請將以下內容新增至您的 `build.gradle` 檔案。請確定您已將 3.x.x 取代為最新的 KCL 版本。

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

您可以在 [Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client) 上檢查 KCL 的最新版本。

## 實作消費者
<a name="develop-kcl-consumers-java-implemetation"></a>

KCL 取用者應用程式包含下列重要元件：

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [排程器](#implementation-scheduler)
+ [主要消費者應用程式](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor 是處理 Kinesis 資料串流記錄之業務邏輯所在的核心元件。它定義您的應用程式如何處理從 Kinesis 串流接收到的資料。

主要責任：
+ 初始化碎片的處理
+ 處理來自 Kinesis 串流的記錄批次
+ 碎片的關閉處理 （例如，當碎片分割或合併時，或將租用移交給另一個主機時）
+ 處理檢查點以追蹤進度

以下顯示實作範例：

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

以下是範例中使用的每個方法的詳細說明：

**initialize(InitializationInput initializationInput)**
+ 目的：設定處理記錄所需的任何資源或狀態。
+ 呼叫時：一次，當 KCL 將碎片指派給此記錄處理器時。
+ 重點：
  + `initializationInput.shardId()`：此處理器將處理的碎片 ID。
  + `initializationInput.extendedSequenceNumber()`：開始處理的序號。

**processRecords(ProcessRecordsInput processRecordsInput)**
+ 目的：處理傳入的記錄和選擇性檢查點進度。
+ 呼叫時：重複，只要記錄處理器保留碎片的租用即可。
+ 重點：
  + `processRecordsInput.records()`：要處理的記錄清單。
  + `processRecordsInput.checkpointer()`：用於檢查點進度。
  + 請務必在處理期間處理任何例外狀況，以防止 KCL 失敗。
  + 此方法應該是等冪的，因為在某些情況下，相同的記錄可能會處理多次，例如在意外工作者當機或重新啟動之前尚未檢查點的資料。
  + 在檢查點之前，請務必清除任何緩衝的資料，以確保資料一致性。

**leaseLost(LeaseLostInput leaseLostInput)**
+ 目的：清除處理此碎片的任何特定資源。
+ 呼叫時：當另一個排程器接管此碎片的租用時。
+ 重點：
  + 此方法不允許檢查點。

**shardEnded(ShardEndedInput shardEndedInput)**
+ 目的：完成此碎片和檢查點的處理。
+ 當呼叫時：當碎片分割或合併時，表示已處理此碎片的所有資料。
+ 重點：
  + `shardEndedInput.checkpointer()`：用來執行最終檢查點。
  + 若要完成處理，必須使用此方法中的檢查點。
  + 若未在此排清資料和檢查點，可能會在碎片重新開啟時導致資料遺失或重複處理。

**shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)**
+ 目的：KCL 關閉時檢查和清除資源。
+ 呼叫時：KCL 關閉時，例如應用程式終止時）。
+ 重點：
  + `shutdownRequestedInput.checkpointer()`：用於在關機前執行檢查點。
  + 請務必在 方法中實作檢查點，以便在應用程式停止之前儲存進度。
  + 若未在此排清資料和檢查點，可能會導致應用程式重新啟動時資料遺失或重新處理記錄。

**重要**  
KCL 3.x 透過在前一個工作者關閉之前進行檢查點，確保將租用從一個工作者交付到另一個工作者時，資料重新處理次數更少。如果您未在 `shutdownRequested()`方法中實作檢查點邏輯，則不會看到此好處。請確定您已在 `shutdownRequested()`方法中實作檢查點邏輯。

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory 負責建立新的 RecordProcessor 執行個體。KCL 使用此工廠為應用程式需要處理的每個碎片建立新的 RecordProcessor。

主要責任：
+ 視需要建立新的 RecordProcessor 執行個體
+ 確定每個 RecordProcessor 都已正確初始化

以下是實作範例：

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

在此範例中，每次呼叫 shardRecordProcessor() 時，工廠都會建立新的 SampleRecordProcessor。 shardRecordProcessor 您可以將此延伸到包含任何必要的初始化邏輯。

### 排程器
<a name="implementation-scheduler"></a>

排程器是一種高階元件，可協調 KCL 應用程式的所有活動。它負責資料處理的整體協調。

主要責任：
+ 管理 RecordProcessors 的生命週期
+ 處理碎片的租用管理
+ 座標檢查點
+ 在應用程式的多個工作者之間平衡碎片處理負載
+ 處理正常關機和應用程式終止訊號

排程器通常會在主要應用程式中建立和啟動。您可以在下一節主要消費者應用程式中查看排程器的實作範例。

### 主要消費者應用程式
<a name="implementation-main"></a>

主要消費者應用程式將所有元件連結在一起。它負責設定 KCL 取用者、建立必要的用戶端、設定排程器，以及管理應用程式的生命週期。

主要責任：
+ 設定 AWS 服務用戶端 (Kinesis、DynamoDB、CloudWatch)
+ 設定 KCL 應用程式
+ 建立和啟動排程器
+ 處理應用程式關閉

以下是實作範例：

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL 預設會建立具有專用輸送量的增強型廣發 (EFO) 取用者。如需增強型廣發功能的詳細資訊，請參閱[開發具有專用輸送量的增強型廣發消費者](enhanced-consumers.md)。如果您的取用者少於 2 個，或不需要 200 毫秒以下的讀取傳播延遲，您必須在排程器物件中設定下列組態，以使用共用輸送量取用者：

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

下列程式碼是建立使用共用輸送量取用者之排程器物件的範例：

**匯入**：

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**程式碼**：

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# 使用非 Java 語言開發具有 KCL 的消費者
<a name="develop-kcl-consumers-non-java"></a>

本節涵蓋在 Python、Node.js、.NET 和 Ruby 中使用 Kinesis Client Library (KCL) 的消費者實作。

KCL 是 Java 程式庫。使用稱為 的多語言界面提供 Java 以外的語言支援`MultiLangDaemon`。此協助程式以 Java 為基礎，當您使用 KCL 搭配 Java 以外的語言時，會在背景執行。因此，如果您為非 Java 語言安裝 KCL，並完全以非 Java 語言撰寫消費者應用程式，則由於 ，您仍需要在系統上安裝 Java`MultiLangDaemon`。此外， `MultiLangDaemon` 有一些預設設定，您可能需要針對使用案例進行自訂 （例如，連線的 AWS 區域）。如需 GitHub `MultiLangDaemon`上 的詳細資訊，請參閱 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)。

雖然核心概念在語言之間保持不變，但有一些特定語言的考量事項和實作。如需 KCL 消費者開發的核心概念，請參閱 [在 Java 中使用 KCL 開發消費者](develop-kcl-consumers-java.md)。如需如何在 Python、Node.js、.NET 和 Ruby 中開發 KCL 消費者以及最新更新的詳細資訊，請參閱下列 GitHub 儲存庫：
+ Python：[amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js：[amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET：[amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby：[amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**重要**  
如果您使用的是 JDK 8，請勿使用下列非 Java KCL 程式庫版本。這些版本包含與 JDK 8 不相容的相依性 （日誌）。  
KCL Python 3.0.2 和 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
使用 JDK 8 時，建議您使用這些受影響版本之前或之後發行的版本。

# 使用 KCL 進行多串流處理
<a name="kcl-multi-stream"></a>

本節說明 KCL 中的必要變更，可讓您建立可同時處理多個資料串流的 KCL 取用者應用程式。
**重要**  
只有 KCL 2.3 或更新版本才支援多串流處理。
使用 執行的非 Java 語言撰寫的 KCL 取用者*不支援*多串流處理`multilangdaemon`。
任何 KCL 1.x 版本*都*不支援多串流處理。
+ **MultistreamTracker interface**
  + 若要建置可以同時處理多個串流的取用者應用程式，您必須實作名為 [MultiStreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java) 的新介面。此介面包含傳回資料串流清單及其組態的 `streamConfigList` 方法，以供 KCL 取用者應用程式處理。請注意，正在處理的資料串流可以在取用者應用程式執行時間期間變更。KCL `streamConfigList`會定期呼叫 ，以了解要處理的資料串流中的變更。
  + 會填入 `streamConfigList` [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) 清單。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + `StreamIdentifier` 和 `InitialPositionInStreamExtended`是必要欄位，而 `consumerArn`是選用欄位。`consumerArn` 只有在您使用 KCL 實作增強型廣發消費者應用程式時，才必須提供 。
  + 如需 的詳細資訊`StreamIdentifier`，請參閱 https：//[https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129)。若要建立 `StreamIdentifier`，建議您從 `streamArn`和 KCL 2.5.0 或更新版本`streamCreationEpoch`提供的 建立多串流執行個體。在不支援 的 KCL v2.3 和 v2.4 中`streamArm`，使用 格式建立多串流執行個體`account-id:StreamName:streamCreationTimestamp`。從下一個主要版本開始，此格式將被取代，不再受支援。
  +  MultistreamTracker 也包含刪除租用資料表中舊串流租用的策略 (formerStreamsLeasesDeletionStrategy)。請注意，在取用者應用程式執行期，無法變更策略。如需詳細資訊，請參閱 https：//[https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)。
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) 是全應用程式類別，可用來指定在為 KCL 2.x 版或更新版本建置 KCL 取用者應用程式時要使用的所有 KCL 組態設定。 `ConfigsBuilder`類別現在支援 `MultistreamTracker`界面。您可以使用一個資料串流的名稱初始化 ConfigsBuilder，以取用來自以下內容的記錄： 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

或者，如果您想實作一個同時處理多個串流的 KCL 取用者應用程式，則可以使用 `MultiStreamTracker` 初始化 ConfigsBuilder。

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ 透過針對 KCL 取用者應用程式實作的多串流支援，應用程式租用資料表的每一列現在都包含碎片 ID 和此應用程式處理之多個資料串流的串流名稱。
+ 實作 KCL 取用者應用程式的多串流支援時， leaseKey 會採用下列結構：`account-id:StreamName:streamCreationTimestamp:ShardId`。例如 `111111111:multiStreamTest-1:12345:shardId-000000000336`。

**重要**  
當您現有的 KCL 取用者應用程式設定為僅處理一個資料串流時， `leaseKey`（這是租用資料表的分割區索引鍵） 是碎片 ID。如果您重新設定現有的 KCL 取用者應用程式來處理多個資料串流，它會中斷您的租用資料表，因為`leaseKey`結構必須如下所示： `account-id:StreamName:StreamCreationTimestamp:ShardId`以支援多串流。

# 使用 AWS Glue 結構描述登錄檔搭配 KCL
<a name="kcl-glue-schema"></a>

您可以將 Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合。 AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展結構描述，同時確保已註冊結構描述持續驗證產生的資料。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue 結構描述登錄檔可讓您改善串流應用程式中end-to-end資料品質和資料控管。如需詳細資訊，請參閱 [AWS Glue 結構描述登錄檔](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。設定此整合的其中一種方法是透過適用於 Java 的 KCL。

**重要**  
AWS Glue Kinesis Data Streams 的結構描述登錄整合僅支援 KCL 2.3 或更新版本。
AWS Glue 使用 執行的非 Java 語言撰寫的 KCL 取用者*不支援* Kinesis Data Streams 的結構描述登錄整合`multilangdaemon`。
AWS Glue KCL 1.x 的任何版本*都*不支援 Kinesis Data Streams 的結構描述登錄整合。

如需如何使用 KCL 設定 Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合的詳細說明，請參閱「使用 KPL/KCL 程式庫與資料互動」一節[：將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合。](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)

# KCL 取用者應用程式所需的 IAM 許可
<a name="kcl-iam-permissions"></a>

 您必須將下列許可新增至與您的 KCL 取用者應用程式相關聯的 IAM 角色或使用者。

 指定 AWS 使用精細許可來控制對不同資源之存取的安全最佳實務。 AWS Identity and Access Management (IAM) 可讓您管理 中的使用者和使用者許可 AWS。IAM 政策將明確列出允許的動作以及各項動作所適用的資源。

下表顯示 KCL 取用者應用程式通常所需的最低 IAM 許可：


**KCL 取用者應用程式的最低 IAM 許可**  

| 服務 | 動作 | 資源 ARNs) | 用途 | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  KCL 應用程式將從中處理資料的 Kinesis 資料串流。`arn:aws:kinesis:region:account:stream/StreamName`  |  嘗試讀取記錄之前，消費者應先檢查資料串流是否存在、是否處於作用中狀態，以及資料串流中是否含有碎片。 將消費者註冊到碎片。  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | KCL 應用程式將從中處理資料的 Kinesis 資料串流。`arn:aws:kinesis:region:account:stream/StreamName` |  從碎片讀取記錄。  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  KCL 應用程式將從中處理資料的 Kinesis 資料串流。只有在您使用增強型廣發 (EFO) 取用者時，才新增此動作。 `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  為增強型廣發 (EFO) 消費者訂閱碎片。  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  租用資料表 (KCL 在 DynamoDB 中建立的中繼資料資料表。 `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  KCL 需要這些動作，才能管理在 DynamoDB 中建立的租用資料表。  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  工作者指標和協調器狀態資料表 (DynamoDB 中的中繼資料資料表），由 KCL 建立。 `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  KCL 需要這些動作來管理 DynamoDB 中的工作者指標和協調器狀態中繼資料表。  | 
| Amazon DynamoDB | `Query` |  租用資料表上的全域次要索引。 `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  KCL 需要此動作，才能讀取在 DynamoDB 中建立之租用資料表的全域次要索引。  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  將指標上傳至 CloudWatch，這對於監控應用程式非常有用。使用星號 (\$1) 是因為 CloudWatch 中沒有叫用`PutMetricData`動作的 spcific 資源。  | 

**注意**  
將 ARNs 中的 "region"、"account"、"StreamName" 和 "KCLApplicationName" 分別取代為您自己的 AWS 區域、 AWS 帳戶 number、Kinesis 資料串流名稱和 KCL 應用程式名稱。KCL 3.x 在 DynamoDB 中建立另外兩個中繼資料表。如需 KCL 建立之 DynamoDB 中繼資料資料表的詳細資訊，請參閱 [KCL 中的 DynamoDB 中繼資料表和負載平衡](kcl-dynamoDB.md)。如果您使用組態來自訂 KCL 建立的中繼資料表名稱，請使用這些指定的資料表名稱，而不是 KCL 應用程式名稱。

以下是 KCL 取用者應用程式的範例政策文件。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

使用此範例政策之前，請檢查下列項目：
+ 將 REGION 取代為您的 AWS 區域 （例如 us-east-1)。
+ 以您的 AWS 帳戶 ID 取代 ACCOUNT\$1ID。
+ 以 Kinesis 資料串流的名稱取代 STREAM\$1NAME。
+ 以消費者的名稱取代 CONSUMER\$1NAME，通常是使用 KCL 時的應用程式名稱。
+ 將 KCL\$1APPLICATION\$1NAME 取代為您的 KCL 應用程式名稱。

# KCL 組態
<a name="kcl-configuration"></a>

您可以設定組態屬性來自訂 Kinesis Client Library 的功能，以符合您的特定需求。下表說明組態屬性和類別。

**重要**  
在 KCL 3.x 中，負載平衡演算法旨在實現工作者之間的 CPU 使用率，而不是每個工作者的相同租用數量。設定`maxLeasesForWorker`太低，您可能會限制 KCL 有效平衡工作負載的能力。如果您使用 `maxLeasesForWorker`組態，請考慮增加其值，以允許獲得最佳的負載分佈。


**此資料表顯示 KCL 的組態屬性**  

| 組態屬性 | 組態類別 | Description | 預設值 | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | 此 KCL 應用程式的名稱。用做為 tableName 和 consumerName 的預設值。 | 不適用 | 
| tableName | ConfigsBuilder |  允許覆寫用於 Amazon DynamoDB 租用資料表的資料表名稱。  | 不適用 | 
| streamName | ConfigsBuilder |  此應用程式從中處理其記錄的串流名稱。  | 不適用 | 
| workerIdentifier | ConfigsBuilder |  代表應用程式處理器本項實例的唯一識別符。其必須獨一無二。  | 不適用 | 
| failoverTimeMillis | LeaseManagementConfig |  將租用擁有者視為失敗前必須經過的毫秒數。對於具有大量碎片的應用程式，這可以設定為較高的數字，以減少追蹤租用所需的 DynamoDB IOPS 數量。  | 10，000 (10 秒） | 
| shardSyncIntervalMillis | LeaseManagementConfig |  碎片同步呼叫的時間。  | 60，000 (60 秒） | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  設定時，只要已開始處理子租用就會隨即移除租用。  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  設定時，會忽略具有開放碎片的子碎片。此項主要用於 DynamoDB Streams。  | FALSE | 
| maxLeasesForWorker | LeaseManagementConfig |  單一工作者應接受的租用數量上限。如果工作者無法處理所有碎片，並導致工作者之間的租用指派不佳，則設定過低可能會導致資料遺失。設定碎片時，請考慮碎片總數、工作者數量和工作者處理容量。  | 無限制 | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  控制租用續約執行緒集區的大小。應用程式可容納的租用數愈多，此集區就應該愈大。  | 20 | 
| billingMode | LeaseManagementConfig |  決定在 DynamoDB 中建立之租用資料表的容量模式。有兩種選項：隨需模式 (PAY\$1PER\$1REQUEST) 和佈建模式。我們建議您使用隨需模式的預設設定，因為它會自動擴展以容納您的工作負載，而不需要進行容量規劃。  | PAY\$1PER\$1REQUEST （隨需模式） | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | 當 Kinesis Client Library 需要建立具有佈建容量模式的新 DynamoDB 租用資料表時所使用的 DynamoDB DynamoDB 讀取容量。如果您在組態中使用預設隨需容量模式，您可以忽略此billingMode組態。 | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | 當 Kinesis Client Library 需要建立新的 DynamoDB 租用資料表時所使用的 DynamoDB 讀取容量。如果您在組態中使用預設隨需容量模式，您可以忽略此billingMode組態。 | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  應用程式應該在串流中開始的初始位置。這僅在初次建立租用時使用。  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  決定負載平衡演算法何時應考慮在工作者之間重新指派碎片的百分比值。 這是 KCL 3.x 中引入的新組態。  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  百分比值，用於抑制在單一重新平衡操作中從過載工作者移動的負載量。 這是 KCL 3.x 中引入的新組態。  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  決定是否需要從過載的工作者取得額外的租用，即使它導致租用總傳輸量超過所需的傳輸量。 這是 KCL 3.x 中引入的新組態。  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  決定 KCL 在重新指派租用和負載平衡時是否應忽略工作者的資源指標 （例如 CPU 使用率）。如果您想要防止 KCL 根據 CPU 使用率進行負載平衡，請將此設定為 TRUE。 這是 KCL 3.x 中引入的新組態。  | FALSE | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  在租用指派期間指派給工作者的最大輸送量。 這是 KCL 3.x 中引入的新組態。  | 無限制 | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  控制工作者之間租用交接的行為。設為 true 時，KCL 會嘗試透過讓碎片的 RecordProcessor 有足夠時間完成處理，再將租用移交給其他工作者，來正常轉移租用。這有助於確保資料完整性和順暢的轉換，但可能會增加交接時間。 設定為 false 時，將立即遞交租用，而無需等待 RecordProcessor 正常關閉。這可能會導致交接速度更快，但可能會有處理不完整的風險。 注意：檢查點必須在 RecordProcessor 的 shutdownRequested() 方法中實作，才能受益於正常的租賃交接功能。 這是 KCL 3.x 中引入的新組態。  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  指定等待目前碎片 RecordProcessor 正常關閉的最短時間 （以毫秒為單位），然後再將租約強制轉移給下一個擁有者。 如果您的 processRecords 方法執行時間通常超過預設值，請考慮增加此設定。這可確保 RecordProcessor 在租用轉移發生之前有足夠的時間完成處理。 這是 KCL 3.x 中引入的新組態。  | 30，000 (30 秒） | 
| maxRecords | PollingConfig |  允許設定 Kinesis 傳回的記錄數上限。  | 10,000 | 
| retryGetRecordsInSeconds | PollingConfig |  為故障設定 GetRecords 嘗試間的延遲。  | 無 | 
| maxGetRecordsThreadPool | PollingConfig |  用於 GetRecords 的執行緒集區大小。  | 無 | 
| idleTimeBetweenReadsInMillis | PollingConfig |  決定 KCL 在 GetRecords 呼叫之間等待多久從資料串流輪詢資料。單位為毫秒。  | 1,500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  設定時，即使 Kinesis 無提供任何記錄也會呼叫記錄處理器。  | FALSE | 
| parentShardPollIntervalMillis | CoordinatorConfig |  記錄處理器應該輪詢以檢查父碎片是否已完成的頻率。單位為毫秒。  | 10，000 (10 秒） | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  如果租用資料表包含現有的租用，即停用同步處理碎片資料。  |  FALSE  | 
| shardPrioritization | CoordinatorConfig |  要使用哪些碎片優先順序  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  決定應用程式將在哪個 KCL 版本相容性模式中執行。此組態僅適用於從先前的 KCL 版本遷移。遷移至 3.x 時，您需要將此組態設定為 `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`。您可以在完成遷移時移除此組態。  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  等待重試失敗 KCL 任務的時間。單位為毫秒。  | 500 (0.5 秒） | 
| logWarningForTaskAfterMillis | LifecycleConfig |  任務未完成的情況下要等待多久的時間才記錄警告。  | 無 | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 呼叫 ListShards 發生錯誤時將等待的間隔毫秒數。單位為毫秒。 | 1，500 (1.5 秒） | 
| maxListShardsRetryAttempts | RetrievalConfig | ListShards 在放棄之前重試的次數上限。 | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  指定在將指標發佈至 CloudWatch 之前緩衝指標的持續時間上限 （以毫秒為單位）。  | 10，000 (10 秒） | 
| metricsMaxQueueSize | MetricsConfig |  指定發佈至 CloudWatch 之前要緩衝的指標數目上限。  | 10,000 | 
| metricsLevel | MetricsConfig |  指定要啟用和發佈的 CloudWatch 指標精細程度。 可能的值：NONE、SUMMARY、DETAILED。  |  MetricsLevel.DETAILED  | 
| metricsEnabledDimensions | MetricsConfig |  控制 CloudWatch 指標的允許維度。  | 所有維度 | 

**KCL 3.x 中已停止的組態**

下列組態屬性會在 KCL 3.x 中停止：


**資料表顯示 KCL 3.x 的已停止組態屬性**  

| 組態屬性 | 組態類別 | Description | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  應用程式一次應該嘗試挪用的租用數上限。KCL 3.x 會忽略此組態，並根據工作者的資源使用率重新指派租用。  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  控制工作者是否應優先採用非常過期的租用 （未續約 3 倍容錯移轉時間的租用） 和新的碎片租用，無論目標租用計數為何，但仍遵守最大租用限制。KCL 3.x 會忽略此組態，並一律將過期的租用分散給工作者。  | 

**重要**  
在從先前的 KCL 版本遷移到 KCL 3.x 期間，您仍然必須擁有未取代的組態屬性。在遷移期間，KCL 工作者會先從 KCL 2.x 相容模式開始，並在偵測到應用程式的所有 KCL 工作者都準備好執行 KCL 3.x 時切換到 KCL 3.x 功能模式。當 KCL 工作者執行 KCL 2.x 相容模式時，需要這些已停止的組態。

# KCL 版本生命週期政策
<a name="kcl-version-lifecycle-policy"></a>

本主題概述 Amazon Kinesis Client Library (KCL) 的版本生命週期政策。 會 AWS 定期提供 KCL 版本的新版本，以支援新功能和增強功能、錯誤修正、安全修補程式和相依性更新。我們建議您隨時掌握 KCL up-to-date以掌握最新功能、安全性更新和基礎相依性。**我們不**建議繼續使用不支援的 KCL 版本。

主要 KCL 版本的生命週期包含下列三個階段：
+ **一般可用性 (GA)** – 在此階段，完全支援主要版本。 AWS 提供一般次要和修補程式版本，包括對 Kinesis Data Streams 新功能或 API 更新的支援，以及錯誤和安全性修正。
+ **維護模式** – AWS 限制修補程式版本，以僅解決重大錯誤修正和安全問題。主要版本不會收到 Kinesis Data Streams 新功能或 APIs 的更新。
+ **End-of-support** – 主要版本將不再接收更新或版本。先前發佈的版本將繼續透過公有套件管理員提供，且程式碼將保留在 GitHub 上。使用者可自行決定是否使用已end-of-support版本。我們建議您升級至最新的主要版本。


| 主要版本 | 目前階段 | 版本日期 | 維護模式日期 | End-of-support日期 | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | 維護模式 | 2013-12-19 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | 一般可用性 | 2018-08-02 | -- | -- | 
| KCL 3.x | 一般可用性 | 2024-11-06 | -- | -- | 

# 從先前的 KCL 版本遷移
<a name="kcl-migration-previous-versions"></a>

本主題說明如何從舊版的 Kinesis Client Library (KCL) 遷移。

## KCL 3.0 有哪些新功能？
<a name="kcl-migration-new-3-0"></a>

相較於舊版，Kinesis Client Library (KCL) 3.0 引入了幾項主要增強功能：
+  它會自動將工作從過度利用的工作者重新分配到取用者應用程式機群中未充分利用的工作者，以降低取用者應用程式的運算成本。這種新的負載平衡演算法可確保跨工作者平均分佈的 CPU 使用率，並消除過度佈建工作者的需求。
+  它透過最佳化租用資料表上的讀取操作來降低與 KCL 相關聯的 DynamoDB 成本。
+ 當租用重新指派給另一個工作者時，它可讓目前的工作者完成已處理記錄的檢查點，將資料的重新處理降至最低。
+  它使用 AWS SDK for Java 2.x 來改善效能和安全性功能，完全移除 適用於 Java 的 AWS SDK 1.x 上的相依性。

如需詳細資訊，請參閱 [KCL 3.0 版本備註。](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md)

**Topics**
+ [KCL 3.0 有哪些新功能？](#kcl-migration-new-3-0)
+ [從 KCL 2.x 遷移至 KCL 3.x](kcl-migration-from-2-3.md)
+ [轉返為先前的 KCL 版本](kcl-migration-rollback.md)
+ [轉返後向前復原至 KCL 3.x](kcl-migration-rollforward.md)
+ [具有佈建容量模式之租用資料表的最佳實務](kcl-migration-lease-table.md)
+ [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)

# 從 KCL 2.x 遷移至 KCL 3.x
<a name="kcl-migration-from-2-3"></a>

本主題提供step-by-step說明，將您的消費者從 KCL 2.x 遷移至 KCL 3.x。KCL 3.x 支援 KCL 2.x 消費者就地遷移。您可以繼續使用 Kinesis 資料串流中的資料，同時以滾動方式遷移工作者。

**重要**  
KCL 3.x 維護與 KCL 2.x 相同的界面和方法。因此，您不需要在遷移期間更新記錄處理程式碼。不過，您必須設定適當的組態，並檢查遷移所需的步驟。強烈建議您遵循下列遷移步驟，以獲得順暢的遷移體驗。

## 步驟 1：事前準備
<a name="kcl-migration-from-2-3-prerequisites"></a>

開始使用 KCL 3.x 之前，請確定您有下列項目：
+ Java 開發套件 (JDK) 8 或更新版本
+ 適用於 Java 的 AWS SDK 2.x
+ Maven 或 Gradle 用於相依性管理

**重要**  
請勿將 2.27.19 到 2.27.23 適用於 Java 的 AWS SDK 版與 KCL 3.x 搭配使用。這些版本包含導致與 KCL DynamoDB 用量相關的例外狀況錯誤的問題。我們建議您使用 2 適用於 Java 的 AWS SDK .28.0 版或更新版本，以避免此問題。

## 步驟 2：新增相依性
<a name="kcl-migration-from-2-3-dependencies"></a>

如果您使用的是 Maven，請將下列相依性新增至 `pom.xml` 檔案。請確定您已將 3.x.x 取代為最新的 KCL 版本。

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

如果您使用的是 Gradle，請將以下內容新增至您的 `build.gradle` 檔案。請確定您已將 3.x.x 取代為最新的 KCL 版本。

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

您可以在 [Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client) 上檢查 KCL 的最新版本。

## 步驟 3：設定與遷移相關的組態
<a name="kcl-migration-from-2-3-configuration"></a>

若要從 KCL 2.x 遷移至 KCL 3.x，您必須設定下列組態參數：
+ CoordinatorConfig.clientVersionConfig:此組態會決定應用程式將執行的 KCL 版本相容性模式。從 KCL 2.x 遷移至 3.x 時，您需要將此組態設定為 `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`。若要設定此組態，請在建立排程器物件時新增以下行：

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

以下是如何設定 `CoordinatorConfig.clientVersionConfig` 以從 KCL 2.x 遷移至 3.x 的範例。您可以根據您的特定需求，視需要調整其他組態：

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

請務必讓消費者應用程式中的所有工作者在指定時間使用相同的負載平衡演算法，因為 KCL 2.x 和 3.x 使用不同的負載平衡演算法。執行具有不同負載平衡演算法的工作者可能會導致負載分佈不佳，因為這兩種演算法會獨立運作。

此 KCL 2.x 相容性設定可讓您的 KCL 3.x 應用程式在與 KCL 2.x 相容的模式下執行，並使用 KCL 2.x 的負載平衡演算法，直到取用者應用程式中的所有工作者升級至 KCL 3.x。遷移完成後，KCL 會自動切換到完整的 KCL 3.x 功能模式，並開始為所有執行中的工作者使用新的 KCL 3.x 負載平衡演算法。

**重要**  
如果您不是使用 ，`ConfigsBuilder`而是建立`LeaseManagementConfig`物件來設定組態，則必須在 KCL 3.x 版或更新版本`applicationName`中新增一個名為 的參數。如需詳細資訊，請參閱 [ LeaseManagementConfig 建構函數的編譯錯誤](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig)。建議使用 `ConfigsBuilder` 來設定 KCL 組態。 `ConfigsBuilder` 提供更靈活且可維護的方式來設定 KCL 應用程式。

## 步驟 4：遵循 shutdownRequested() 方法實作的最佳實務
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x 引入了一項稱為*正常租用交接*的功能，以在租用交付給另一個工作者時，將資料的重新處理降至最低，作為租用重新指派程序的一部分。這是透過在租用交接之前檢查租用資料表中最後一個處理的序號來實現的。為了確保正常的租用交接正常運作，您必須確定在 `RecordProcessor`類別的 `shutdownRequested`方法中叫用`checkpointer`物件。如果您未在 `shutdownRequested`方法中叫用`checkpointer`物件，您可以實作它，如下列範例所示。

**重要**  
下列實作範例是正常租用交接的最低需求。如有需要，您可以將其擴展為包含與檢查點相關的其他邏輯。如果您正在執行任何非同步處理，請確保在叫用檢查點之前已處理傳送至下游的所有記錄。
雖然正常的租用交接可大幅降低租用傳輸期間重新處理資料的可能性，但無法完全消除此可能性。為了保持資料完整性和一致性，請將下游消費者應用程式設計為等冪。這表示他們應該能夠處理潛在的重複記錄處理，而不會對整體系統造成負面影響。

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## 步驟 5：檢查收集工作者指標的 KCL 3.x 先決條件
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x 會從工作者收集 CPU 使用率指標，例如 CPU 使用率，以平衡工作者之間的負載。消費者應用程式工作者可以在 Amazon EC2、Amazon ECS、Amazon EKS 或 上執行 AWS Fargate。只有在滿足下列先決條件時，KCL 3.x 才能從工作者收集 CPU 使用率指標：

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ 您的作業系統必須是 Linux 作業系統。
+ 您必須在 EC[IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)2。 EC2 

 **Amazon EC2 上的 Amazon EC2)**
+ 您的作業系統必須是 Linux 作業系統。
+ 您必須啟用 [ECS 任務中繼資料端點第 4 版](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html)。
+ 您的 Amazon ECS 容器代理程式版本必須為 1.39.0 或更新版本。

 **上的 Amazon ECS AWS Fargate**
+ 您必須啟用 [Fargate 任務中繼資料端點第 4 版](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html)。如果您使用 Fargate 平台 1.4.0 版或更新版本，預設會啟用此功能。
+ Fargate 平台 1.4.0 版或更新版本。

 **Amazon EC2 上的 Amazon Elastic Kubernetes Service (Amazon EKS)** 
+ 您的作業系統必須是 Linux 作業系統。

 **上的 Amazon EKS AWS Fargate**
+ Fargate 平台 1.3.0 或更新版本。

**重要**  
如果 KCL 3.x 因為不符合先決條件而無法從工作者收集 CPU 使用率指標，它會重新平衡每個租用的輸送量層級負載。此備用重新平衡機制將確保所有工作者將從指派給每個工作者的租用中獲得類似的總輸送量層級。如需詳細資訊，請參閱[KCL 如何指派租用給工作者並平衡負載](kcl-dynamoDB.md#kcl-assign-leases)。

## 步驟 6：更新 KCL 3.x 的 IAM 許可
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

您必須將下列許可新增至與您的 KCL 3.x 取用者應用程式相關聯的 IAM 角色或政策。這包括更新 KCL 應用程式使用的現有 IAM 政策。如需詳細資訊，請參閱[KCL 取用者應用程式所需的 IAM 許可](kcl-iam-permissions.md)。

**重要**  
您現有的 KCL 應用程式可能沒有在 IAM 政策中新增下列 IAM 動作和資源，因為 KCL 2.x 中不需要這些動作和資源。在執行 KCL 3.x 應用程式之前，請確定您已新增它們：  
動作： `UpdateTable`  
資源 ARNs)： `arn:aws:dynamodb:region:account:table/KCLApplicationName`
動作： `Query`  
資源 ARNs)： `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
動作：`CreateTable`、`DescribeTable`、`Scan`、`GetItem`、`PutItem`、`UpdateItem`、 `DeleteItem`  
資源 (ARNs)：`arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`、 `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
將 ARNs 中的「region」、「account」和「KCLApplicationName」分別取代為您自己的 AWS 區域、 AWS 帳戶 number 和 KCL 應用程式名稱。如果您使用組態來自訂 KCL 建立的中繼資料表名稱，請使用這些指定的資料表名稱，而不是 KCL 應用程式名稱。

## 步驟 7：將 KCL 3.x 程式碼部署到您的工作者
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

設定遷移所需的組態並完成所有先前的遷移檢查清單之後，您就可以建置程式碼並將其部署至工作者。

**注意**  
如果您看到`LeaseManagementConfig`建構函數的編譯錯誤，請參閱 [ LeaseManagementConfig 建構函數的編譯錯誤](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig)，以取得疑難排解資訊。

## 步驟 8：完成遷移
<a name="kcl-migration-from-2-3-finish"></a>

在部署 KCL 3.x 程式碼期間，KCL 會繼續使用 KCL 2.x 的租用指派演算法。當您成功將 KCL 3.x 程式碼部署到所有工作者時，KCL 會自動偵測到此情況，並根據工作者的資源使用率切換到新的租用指派演算法。如需新租用指派演算法的詳細資訊，請參閱 [KCL 如何指派租用給工作者並平衡負載](kcl-dynamoDB.md#kcl-assign-leases)。

在部署期間，您可以使用傳送到 CloudWatch 的下列指標來監控遷移程序。您可以在 `Migration`操作下監控指標。所有指標都是per-KCL-application指標，並設定為`SUMMARY`指標層級。如果`CurrentState:3xWorker`指標的`Sum`統計資料符合 KCL 應用程式中的工作者總數，則表示遷移至 KCL 3.x 已成功完成。

**重要**  
 所有工作者準備好執行後，KCL 至少需要 10 分鐘才能切換到新的租用指派演算法。


**KCL 遷移程序的 CloudWatch 指標**  

| 指標 | Description | 
| --- | --- | 
| CurrentState:3xWorker |  成功遷移至 KCL 3.x 並執行新租用指派演算法的 KCL 工作者數量。如果此指標的`Sum`計數符合您工作者的總數，則表示遷移至 KCL 3.x 已成功完成。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  在遷移過程中以 KCL 2.x 相容模式執行的 KCL 工作者數量。此指標的非零值表示遷移仍在進行中。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  在遷移過程中遇到的例外狀況數量。這些例外狀況大多是暫時性錯誤，KCL 3.x 會自動重試以完成遷移。如果您觀察到持久性`Fault`指標值，請檢閱遷移期間的日誌，以進一步進行故障診斷。如果問題持續發生，請聯絡 支援。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  在租用資料表上建立全域次要索引 (GSI) 的狀態。此指標指出是否已建立租用資料表上的 GSI，這是執行 KCL 3.x 的先決條件。值為 0 或 1，1 表示成功建立。在回復狀態期間，不會發出此指標。再次向前捲動後，您可以繼續監控此指標。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  工作者指標從所有工作者發出的狀態。指標指出是否所有工作者都會發出 CPU 使用率等指標。值為 0 或 1，其中 1 表示所有工作者都成功發出指標，並準備好使用新的租用指派演算法。在復原狀態期間，不會發出此指標。再次向前捲動後，您可以繼續監控此指標。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/kcl-migration-from-2-3.html)  | 

KCL 在遷移期間提供復原功能至 2.x 相容模式。成功遷移至 KCL 3.x 後，`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`如果不再需要轉返，建議您移除 `CoordinatorConfig.clientVersionConfig`的設定。移除此組態會停止從 KCL 應用程式發出遷移相關指標。

**注意**  
我們建議您在遷移期間和完成遷移後監控應用程式的效能和穩定性一段時間。如果您發現任何問題，您可以使用 KCL [Migration Tool 復原工作者以使用 KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) 2.x 相容功能。

# 轉返為先前的 KCL 版本
<a name="kcl-migration-rollback"></a>

本主題說明將消費者復原至先前版本的步驟。當您需要轉返時，有兩個步驟的程序：

1. 執行 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 重新部署先前的 KCL 版本代碼 （選用）。

## 步驟 1：執行 KCL 移轉工具
<a name="kcl-migration-rollback-tool"></a>

當您需要轉返至先前的 KCL 版本時，您必須執行 KCL 移轉工具。KCL Migration Tool 會執行兩項重要任務：
+ 移除 DynamoDB 租用資料表上，稱為工作者指標資料表和全域次要索引的中繼資料資料表。這兩個成品是由 KCL 3.x 建立，但當您回復到先前的版本時不需要。
+ 它可讓所有工作者在與 KCL 2.x 相容的模式下執行，並開始使用先前 KCL 版本中使用的負載平衡演算法。如果您使用 KCL 3.x 新負載平衡演算法時遇到問題，此步驟會立即緩解該問題。

**重要**  
DynamoDB 中的協調器狀態資料表必須存在，且不得在移轉、轉返及向前復原過程中刪除。

**注意**  
請務必讓取用者應用程式中的所有工作者，在指定時間使用相同的負載平衡演算法。KCL Migration Tool 可確保 KCL 3.x 取用者應用程式中的所有工作者都切換到 KCL 2.x 相容模式，以便所有工作者在轉返付款期間執行相同的負載平衡演算法，回到先前的 KCL 版本。

您可以在 [KCL GitHub 儲存庫](https://github.com/awslabs/amazon-kinesis-client/tree/master)的指令碼目錄中，下載 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。指令碼可以從任何工作者或任何具有寫入協調器狀態資料表、刪除工作者指標資料表和更新租用資料表所需許可的主機執行。如需執行指令碼[KCL 取用者應用程式所需的 IAM 許可](kcl-iam-permissions.md)所需的 IAM 許可，請參閱 。每個 KCL 應用程式只能執行指令碼一次。您可以使用下列命令執行 KCL Migration Tool：

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**參數**
+ --region：將 取代`<region>`為 AWS 區域。
+ --application\$1name：如果您使用 DynamoDB 中繼資料資料表 （租用資料表、協調器狀態資料表和工作者指標資料表） 的預設名稱，則需要此參數。如果您已為這些資料表指定自訂名稱，可以省略此參數。`<applicationName>` 將 取代為您實際的 KCL 應用程式名稱。如果未提供自訂名稱，工具會使用此名稱衍生預設資料表名稱。
+ --lease\$1table\$1name （選用）：當您在 KCL 組態中設定租用資料表的自訂名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。`leaseTableName` 將 取代為您為租用資料表指定的自訂資料表名稱。
+ --coordinator\$1state\$1table\$1name （選用）：當您在 KCL 組態中為協調器狀態資料表設定自訂名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。`<coordinatorStateTableName>` 將 取代為您為協調器狀態資料表指定的自訂資料表名稱。
+ --worker\$1metrics\$1table\$1name （選用）：當您在 KCL 組態中為工作者指標資料表設定自訂名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。`<workerMetricsTableName>` 將 取代為您為工作者指標資料表指定的自訂資料表名稱。

## 步驟 2：使用先前的 KCL 版本重新部署程式碼 （選用）
<a name="kcl-migration-rollback-redeploy"></a>

 執行轉返 KCL 移轉工具後，您會看到以下其中一則訊息：
+ **訊息 1：**「轉返已完成。您的 KCL 應用程式正在執行 KCL 2.x 相容模式。如果您沒有看到任何迴歸的緩解措施，請使用先前的 KCL 版本部署程式碼，以回復到先前的應用程式二進位檔。」
  + **必要動作：**這表示您的工作者是在 KCL 2.x 相容模式下執行。如果問題仍然存在，請將具有先前 KCL 版本的程式碼重新部署到您的工作者。
+ **訊息 2：**「轉返已完成。您的 KCL 應用程式正在執行 KCL 3.x 功能模式。除非您在 5 分鐘內沒有看到問題的任何緩解措施，否則不需要轉返到先前的應用程式二進位檔。如果您仍然有問題，請使用先前的 KCL 版本部署程式碼，以轉返至先前的應用程式二進位檔。」
  + **必要動作：**這表示您的工作者是在 KCL 3.x 模式下執行，而 KCL Migration Tool 會將所有工作者切換到 KCL 2.x 相容模式。如果問題已解決，您不需要使用先前的 KCL 版本重新部署程式碼。如果問題仍然存在，請將具有先前 KCL 版本的程式碼重新部署到您的工作者。

 

# 轉返後向前復原至 KCL 3.x
<a name="kcl-migration-rollforward"></a>

本主題說明在復原後將消費者轉返至 KCL 3.x 的步驟。當您需要轉送時，必須經歷兩個步驟的程序：

1. 執行 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 使用 KCL 3.x 部署程式碼。

## 步驟 1：執行 KCL 移轉工具
<a name="kcl-migration-rollback-tool"></a>

執行 KCL 移轉工具。具有下列命令的 KCL Migration Tool 可轉送至 KCL 3.x：

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**參數**
+ --region：將 取代`<region>`為 AWS 區域。
+ --application\$1name：如果您使用協調器狀態資料表的預設名稱，則需要此參數。如果您已指定協調器狀態資料表的自訂名稱，可以省略此參數。`<applicationName>` 將 取代為您實際的 KCL 應用程式名稱。如果未提供自訂名稱，工具會使用此名稱衍生預設資料表名稱。
+ --coordinator\$1state\$1table\$1name （選用）：當您在 KCL 組態中為協調器狀態資料表設定自訂名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。`<coordinatorStateTableName>` 將 取代為您為協調器狀態資料表指定的自訂資料表名稱。

以向前復原模式執行移轉工具後，KCL 會建立 KCL 3.x 所需的下列 DynamoDB 資源：
+ 租用資料表上的全域次要索引
+ 工作者指標資料表

## 步驟 2：使用 KCL 3.x 部署程式碼
<a name="kcl-migration-rollback-redeploy"></a>

執行向前復原的 KCL 移轉工具之後，請使用 KCL 3.x 將程式碼部署至工作者。遵循 [步驟 8：完成遷移](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) 以完成遷移。

# 具有佈建容量模式之租用資料表的最佳實務
<a name="kcl-migration-lease-table"></a>

如果您 KCL 應用程式的租用資料表切換到佈建容量模式，KCL 3.x 會在租用資料表上建立全域次要索引，並使用佈建帳單模式和與基本租用資料表相同的讀取容量單位 (RCU) 和寫入容量單位 (WCU)。建立全域次要索引時，建議您監控 DynamoDB 主控台中全域次要索引的實際用量，並視需要調整容量單位。如需切換 KCL 建立之 DynamoDB 中繼資料資料表容量模式的更詳細指南，請參閱 [KCL 所建立中繼資料資料表的 DynamoDB 容量模式](kcl-dynamoDB.md#kcl-capacity-mode)。

**注意**  
根據預設，KCL 會使用隨需容量模式建立中繼資料表，例如租用資料表、工作者指標資料表和協調器狀態資料表，以及租用資料表上的全域次要索引。我們建議您使用隨需容量模式，根據您的用量變更自動調整容量。

# 從 KCL 1.x 移轉到 KCL 3.x
<a name="kcl-migration-1-3"></a>

本主題說明將消費者從 KCL 1.x 遷移至 KCL 3.x 的指示。相較於 KCL 2.x 和 KCL 3.x，KCL 1.x 使用不同的類別和界面。您必須先將記錄處理器、記錄處理器工廠和工作者類別遷移至 KCL 2.x/3.x 相容格式，並遵循 KCL 2.x 遷移至 KCL 3.x 的遷移步驟。您可以直接從 KCL 1.x 升級到 KCL 3.x。
+ **步驟 1：遷移記錄處理器**

  遵循將[消費者從 KCL 1.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) [遷移至 KCL 2.x 頁面中的遷移記錄處理器](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)一節。
+ **步驟 2：遷移記錄處理器工廠**

  遵循將[消費者從 KCL 1.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) [遷移到 KCL 2.x 頁面中的遷移記錄處理器工廠](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration)一節。
+ **步驟 3：遷移工作者**

  遵循將[消費者從 KCL 1.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) [遷移至 KCL 2.x 頁面中的遷移工作者](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration)區段。
+ **步驟 4：遷移 KCL 1.x 組態 **

  請遵循將[消費者從](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) [KCL 1.x 遷移至 KCL 2.x 頁面中的設定 Amazon Kinesis 用戶端](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration)一節。
+ **步驟 5：檢查閒置時間移除和用戶端組態移除**

  遵循[從 KCL 1.x 遷移消費者到 KCL 2.x ](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)頁面中的[閒置時間移除](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal)和[用戶端組態移除](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals)章節。
+ **步驟 6：遵循 KCL 2.x 到 KCL 3.x 遷移指南中的step-by-step說明**

  依照[從 KCL 2.x 遷移至 KCL 3.x](kcl-migration-from-2-3.md)頁面上的指示完成遷移。如果您需要轉返至先前的 KCL 版本或在轉返後轉返至 KCL 3.x，請參閱 [轉返為先前的 KCL 版本](kcl-migration-rollback.md)和 [轉返後向前復原至 KCL 3.x](kcl-migration-rollforward.md)。

**重要**  
請勿將 2.27.19 到 2.27.23 適用於 Java 的 AWS SDK 版與 KCL 3.x 搭配使用。這些版本包含導致與 KCL DynamoDB 用量相關的例外狀況錯誤的問題。我們建議您使用 2 適用於 Java 的 AWS SDK .28.0 版或更新版本，以避免此問題。

# 先前的 KCL 版本文件
<a name="kcl-archive"></a>

下列主題已封存。若要查看目前的 Kinesis Client Library 文件，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

**Topics**
+ [KCL 1.x 和 2.x 資訊](shared-throughput-kcl-consumers.md)
+ [開發具有共用輸送量的自訂消費者](shared-throughput-consumers.md)
+ [將消費者從 KCL 1.x 遷移至 KCL 2.x](kcl-migration.md)

# KCL 1.x 和 2.x 資訊
<a name="shared-throughput-kcl-consumers"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

開發自訂取用者應用程式以處理來自 KDS 資料串流之資料的其中一種方法，是使用 Kinesis Client Library (KCL)。

**Topics**
+ [關於 KCL （先前版本）](#shared-throughput-kcl-consumers-overview)
+ [KCL 舊版](#shared-throughput-kcl-consumers-versions)
+ [KCL 概念 （先前版本）](#shared-throughput-kcl-consumers-concepts)
+ [使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](#shared-throughput-kcl-consumers-leasetable)
+ [使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流](#shared-throughput-kcl-multistream)
+ [將 KCL 與 AWS Glue 結構描述登錄檔搭配使用](#shared-throughput-kcl-consumers-glue-schema-registry)

**注意**  
對於 KCL 1.x 和 KCL 2.x，建議根據您的使用案例，升級至最新的 KCL 1.x 版或 KCL 2.x 版本。KCL 1.x 和 KCL 2.x 皆會定期更新為更新的版本，其中包含最新的相依性和安全修補程式、錯誤修正，以及向後相容的新功能。如需詳細資訊，請參閱 [https://github.com/awslabs/amazon-kinesis-client/releases](https://github.com/awslabs/amazon-kinesis-client/releases)。

## 關於 KCL （先前版本）
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL 會處理與分散式運算相關的許多複雜任務，協助您取用和處理 Kinesis 資料串流中的資料。這其中包含跨多個取用者應用程式執行個體的負載平衡、對取用者應用程式執行個體失敗的回應、檢查點處理記錄，以及對重新分片的反應。KCL 會處理所有這些子任務，以便您可以專注在編寫自訂記錄處理邏輯上。

KCL 與 AWS SDK 中提供的 Kinesis Data Streams API 不同。Kinesis Data Streams API 可協助您管理 Kinesis Data Streams 的許多層面 (包括建立串流、重新分片、放入與取得記錄)。KCL 圍繞所有這些子任務提供了抽象層，特別是可讓您專注於取用者應用程式的自訂資料處理邏輯。如需 Kinesis Data Streams API 的相關資訊，請參閱 [Amazon Kinesis API 參考](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html)。

**重要**  
KCL 是一種 Java 程式庫。使用稱為 MultiLangDaemon 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。例如，若您安裝了適用於 Python 的 KCL 並完全以 Python 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請參閱 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)。

KCL 在您的記錄處理邏輯與 Kinesis Data Streams 之間擔任媒介。

## KCL 舊版
<a name="shared-throughput-kcl-consumers-versions"></a>

目前，您可以使用下列受支援的 KCL 版本之一，來建置自訂的取用者應用程式：
+ **KCL 1.x**

  如需詳細資訊，請參閱[開發 KCL 1.x 消費者](developing-consumers-with-kcl.md)
+ **KCL 2.x**

  如需詳細資訊，請參閱[開發 KCL 2.x 消費者](developing-consumers-with-kcl-v2.md)

您可以使用 KCL 1.x 或 KCL 2.x 來建置使用共用輸送量的取用者應用程式。如需詳細資訊，請參閱[使用 KCL 開發具有共用輸送量的自訂消費者](custom-kcl-consumers.md)。

若要建置使用專用輸送量的取用者應用程式 (增強型散發取用者)，您只能使用 KCL 2.x。如需詳細資訊，請參閱[開發具有專用輸送量的增強型廣發消費者](enhanced-consumers.md)。

如需有關 KCL 1.x 和 KCL 2.x 之間差異的詳細資訊，以及如何從 KCL 1.x 遷移至 KCL 2.x 的指示，請參閱 [將消費者從 KCL 1.x 遷移至 KCL 2.x](kcl-migration.md)。

## KCL 概念 （先前版本）
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **KCL 取用者應用程式** – 使用 KCL 自訂建置的應用程式，專為讀取和處理資料串流中的記錄而設計。
+ **取用者應用程式執行個體** - KCL 取用者應用程式通常是分散式，可同時執行一個或多個應用程式執行個體，以在發生故障時進行協調並對資料記錄處理進行動態負載平衡。
+ **工作者** – KCL 取用者應用程式執行個體用來開始處理資料的高階類別。
**重要**  
每個 KCL 取用者應用程式執行個體都有一個工作者。

  工作者會初始化並監督各種任務，包括同步處理碎片和租用資訊、追蹤碎片指派，以及處理來自碎片的資料。工作者提供 KCL 取用者應用程式的組態資訊，例如資料串流的名稱，其資料記錄此 KCL 取用者應用程式將要處理的資料，以及存取此資料串流所需的 AWS 登入資料。工作者也會啟動該特定 KCL 取用者應用程式執行個體，將資料記錄從資料串流傳送至記錄處理器。
**重要**  
在 KCL 1.x 中，此類別被稱為**工作者**。如需詳細資訊 (這些是 Java KCL 儲存庫)，請參閱 [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java)。在 KCL 2.x.x 中，此類別被稱為**排程器**。排程器在 KCL 2.x 中的用途與 KCL 1.x 中的工作者的目的相同。[如需有關 KCL 2.x 中排程器類別的詳細資訊，請參閱 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java。](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java)
+ **租用** – 定義工作者與碎片之間繫結的資料。分散式 KCL 取用者應用程式使用租用來分割跨工作節點機群的資料記錄處理。在任何給定時間，每個資料記錄碎片都會透過 **leaseKey** 變數所識別的租用繫結至特定工作者。

  根據預設，工作者可以同時持有一或多個租用 (取決於 **maxleAsForWorker** 變數的值)。
**重要**  
每個工作者都將爭奪保留資料串流中，所有可用碎片的所有可用租用。但是，只有一名工作者可以在任何時間成功持有每個租用。

  例如，如果您有一個含有工作者 A 的取用者應用程式執行個體 A 正在處理具有 4 個碎片的資料串流，則工作者 A 可以同時持有對碎片 1、2、3 和 4 的租用。但是，如果您有兩個取用者應用程式執行個體：A 和 B 具有工作者 A 和工作者 B，而且這些執行個體正在處理具有 4 個碎片的資料串流，則工作者 A 和工作者 B 無法同時持有對碎片 1 的租用。一個工作者會持有特定碎片的租用，直到準備好停止處理此碎片的資料記錄，或直到失敗為止。當一名工作者停止持有租用時，另一名工作者佔用並持有租用。

  [如需更多資訊，(這些是 Java KCL 存儲庫)，請參閱 [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) 以獲取 KCL 1.x 的資訊和參閱 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java) 取得 KCL 2.x 的資訊。
+ **租用資料表** - 唯一的 Amazon DynamoDB 資料表，用於追蹤 KDS 資料串流中，由 KCL 取用者應用程式的工作者租用和處理的碎片。在 KCL 取用者應用程式執行時，租用資料表必須與資料串流中的最新碎片資訊保持同步 (在工作者內部和所有工作者之間)。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](#shared-throughput-kcl-consumers-leasetable)。
+ **記錄處理器** – 定義 KCL 取用者應用程式如何處理從資料串流取得的資料的邏輯。在執行期，KCL 取用者應用程式執行個體會實體化工作者，而此工作者會針對其持有租用的每個碎片執行個體化一個記錄處理器。

## 使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [什麼是租用資料表](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [輸送量](#shared-throughput-kcl-leasetable-throughput)
+ [租用資料表如何與 Kinesis 資料串流中的碎片同步](#shared-throughput-kcl-consumers-leasetable-sync)

### 什麼是租用資料表
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

這對每個 Amazon Kinesis Data Streams 應用程式，KCL 會使用唯一的租用資料表 (存儲在 Amazon DynamoDB 資料表中)，來追蹤 KDS 資料串流中由 KCL 取用者應用程式的工作者租用和處理的碎片。

**重要**  
KCL 會使用取用者應用程式的名稱來建立此取用者應用程式所使用的租用資料表名稱，因此，每個取用者應用程式名稱都必須是唯一的。

您可以在取用者應用程式執行時使用 [Amazon DynamoDB 主控台](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html)檢視其租用資料表。

如果應用程式啟動時，KCL 取用者應用程式的租用資料表不存在，其中一個工作者會建立此應用程式的租用資料表。

**重要**  
 您的帳戶除須支付 Kinesis Data Streams 本身的相關費用外，另將收取與 DynamoDB 資料表關聯的費用。

租用資料表內的每一列代表您的取用者應用程式的工作者所處理的某個碎片。如果您的 KCL 取用者應用程式僅處理一個資料串流，則租用資料表的雜湊索引鍵 `leaseKey` 就是碎片 ID。如果您是 [使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流](#shared-throughput-kcl-multistream)，則 leaseKey 的結構如下所示：`account-id:StreamName:streamCreationTimestamp:ShardId`。例如 `111111111:multiStreamTest-1:12345:shardId-000000000336`。

除了碎片 ID 外，每一列還包含以下資料：
+ **checkpoint：**碎片的最新檢查點序號。資料串流中所有碎片的此值皆為獨一無二。
+ **checkpointSubSequenceNumber：**使用 Kinesis Producer Library 的彙整功能時，此為 **checkpoint** 的延伸，將追蹤 Kinesis 記錄內的個別使用者記錄。
+ **leaseCounter：**用於租用版本控制，使工作者可偵測出其租用已由另一工作者接管。
+ **leaseKey：**租用的唯一識別符。每項租用特屬於資料串流中的某個碎片，一次由一個工作者所持有。
+ **leaseOwner：**持有此租用的工作者。
+ **ownerSwitchesSinceCheckpoint：**自上次寫入檢查點至今，此租用更改了工作者的次數。
+ **parentShardId：**用於確保已完全處理過父碎片後才開始對子碎片進行處理。這可確保按照記錄放入串流中的相同順序處理記錄。
+ **hashrange：**`PeriodicShardSyncManager` 用於執行週期性同步以尋找租用資料表中遺失的碎片，並在需要時為其建立租用。
**注意**  
從 KCL 1.14 和 KCL 2.3 開始，每個碎片的租用資料表中都會顯示此資料。如需有關 `PeriodicShardSyncManager` 和租用與碎片之間的定期同步的詳細資訊，請參閱 [租用資料表如何與 Kinesis 資料串流中的碎片同步](#shared-throughput-kcl-consumers-leasetable-sync)。
+ **childshards：**`LeaseCleanupManager` 用於檢閱子碎片的處理狀態，並決定是否可以從租用資料表中刪除父碎片。
**注意**  
從 KCL 1.14 和 KCL 2.3 開始，每個碎片的租用資料表中都會顯示此資料。
+ **shardID：**碎片的 ID。
**注意**  
如果您是 [使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流](#shared-throughput-kcl-multistream)，則此資料僅存在於租用資料表中。這僅在適用於 Java 的 KCL 2.x 中受支援，從適用於 Java 的 KCL 2.3 和更新版本開始。
+ **串流名稱**資料串流的識別碼，格式如下：`account-id:StreamName:streamCreationTimestamp`。
**注意**  
如果您是 [使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流](#shared-throughput-kcl-multistream)，則此資料僅存在於租用資料表中。這僅在適用於 Java 的 KCL 2.x 中受支援，從適用於 Java 的 KCL 2.3 和更新版本開始。

### 輸送量
<a name="shared-throughput-kcl-leasetable-throughput"></a>

如果您的 Amazon Kinesis Data Streams 應用程式收到佈建輸送量例外狀況，則您即應提升 DynamoDB 資料表的佈建輸送量。KCL 建立的資料表其佈建輸送量為每秒 10 次讀取和每秒 10 次寫入，但這對您的應用程式而言可能不夠。例如，若您的 Amazon Kinesis Data Streams 經常執行檢查點作業或對由多個碎片構成的串流進行操作，您可能就需要更多的輸送量。

如需 DynamoDB 中佈建輸送量的相關資訊，請參閱《Amazon DynamoDB 開發人員指南》**中的[讀取/寫入容量模式](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html)和[使用資料表和資料](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html)。

### 租用資料表如何與 Kinesis 資料串流中的碎片同步
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

KCL 取用者應用程式中的工作者會使用租用來處理來自指定資料串流的碎片。在任何給定時間，哪個工作者正在租用哪個碎片的資訊存儲在租用資料表中。在 KCL 取用者應用程式執行時，租用資料表必須與資料串流中的最新碎片資訊保持同步 。KCL 會在取用者應用程式啟動載入期間 (在取用者應用程式初始化或重新啟動時)，以及每當正在處理的碎片到達結束 (重新分割) 時，將租用資料表與從 Kinesis Data Streams 服務取得的碎片資訊同步化。換句話說，工作者或 KCL 取用者應用程式會與它們在初始使用者應用程式啟動程序期間處理的資料串流，以及每當取用者應用程式遇到資料串流重新分片事件時，都會與其所處理的資料串流同步處理。

**Topics**
+ [KCL 1.0-1.13 和 KCL 2.0-2.2 中的同步](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [KCL 2.x 中的同步，從 KCL 2.3 及更新版本開始](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [KCL 1.x 中的同步，從 KCL 1.14 及更新版本開始](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### KCL 1.0-1.13 和 KCL 2.0-2.2 中的同步
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

在 KCL 1.0 - 1.13 和 KCL 2.0 - 2.2 中，在取用者應用程式的啟動載入期間以及每個資料串流重新分片事件期間，KCL 會透過調用 `ListShards` 或 `DescribeStream` 探索 API，將租用資料表與從 Kinesis 資料串流服務取得的碎片資訊同步。在上面列出的所有 KCL 版本中，KCL 取用者應用程式的每個工作者都會完成下列步驟，以便在取用者應用程式的啟動載入期間以及每個串流重新分片事件中執行租用/碎片同步處理程序：
+ 擷取正在處理的資料串流的所有碎片
+ 從租用資料表中擷取所有碎片租用
+ 篩選出租用資料表中沒有租用的每個開放碎片
+ 逐一查看所有找到的開放碎片以及每個沒有開放父級的開放碎片：
  + 透過其祖先路徑遍歷樹狀結構，以確定碎片是否為子代。如果正在處理祖系碎片 (租用資料表中存在祖系碎片的租用項目)，或者應處理祖系碎片 (例如，如果初始位置為 `TRIM_HORIZON` 或 `AT_TIMESTAMP`)，則碎片即視為子代
  + 如果內容中的開放碎片是子代，KCL 會根據初始位置檢查碎片，並在必要時為其父項建立租用

#### KCL 2.x 中的同步，從 KCL 2.3 及更新版本開始
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

從 KCL 2.x (KCL 2.3) 及更新版本的最新支援版本開始，程式庫現在支援同步程序的下列變更。這些租用/碎片同步變更可大幅減少 KCL 取用者應用程式對 Kinesis Data Streams 服務進行的 API 呼叫次數，並最佳化 KCL 取用者應用程式中的租用管理。
+ 在應用程式的啟動載入期間，如果租用資料表是空的，則 KCL 會利用 `ListShard` API 的篩選選項 (`ShardFilter` 選用的請求參數) 來擷取和建立租用，僅用於在 `ShardFilter` 參數指定的時間開放的碎片快照。此 `ShardFilter` 參數可讓您篩選出 `ListShards` API 的回應。`ShardFilter` 參數的唯一必要屬性是 `Type`。KCL 會使用 `Type` 篩選屬性及其下列有效值來識別並傳回可能需要新租用之開啟碎片的快照：
  + `AT_TRIM_HORIZON` - 回應包括所有在 `TRIM_HORIZON` 打開的碎片。
  + `AT_LATEST` - 回應僅包含目前開放的資料串流碎片。
  + `AT_TIMESTAMP` - 回應包含開始時間戳記小於或等於指定時間戳記，且結束時間戳記大於或等於指定時間戳記或仍處於開放狀態的所有碎片。

  `ShardFilter` 用於為空租用資料表建立租用，以針對在 `RetrievalConfig#initialPositionInStreamExtended` 指定之碎片的快照初始化租用。

  如需 `ShardFilter` 的相關資訊，請參閱 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html)。
+ 單一當選的工作者領導者執行租用/碎片同步處理，而不是執行租用/碎片同步處理以使租用資料表與資料串流中的最新碎片保持最新狀態的所有工作者。
+ KCL 2.3 使用 `ChildShards` 傳回 `GetRecords` 和 `SubscribeToShard` API 的參數來執行在關閉碎片 `SHARD_END` 上發生的租用/碎片同步處理，允許 KCL 工作者只為其完成處理之碎片的子碎片建立租用。對於共用輸送量取用者應用程式，此租用/碎片同步處理的最佳化會使用 `GetRecords` API 的 `ChildShards` 參數。對於專用輸送量 (增強型散發) 取用者應用程式，此租用/碎片同步處理的最佳化會使用 `SubscribeToShard` API 的 `ChildShards` 參數。如需詳細資訊，請參閱 [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)、[SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) 和 [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。
+ 透過上述變更，KCL 的行為將從學習所有現有碎片的所有工作者的模型，轉移到僅學習每個工作者所擁有碎片的子碎片的工作者模型。因此，除了取用者應用程式啟動載入和重新分片事件期間發生的同步處理之外，KCL 現在還會執行額外的定期碎片/租用掃描，以識別租用資料表中的任何潛在漏洞 (換句話說，了解所有新碎片)，以確保資料串流的完整雜湊範圍正在處理，並在需要時為其建立租用。`PeriodicShardSyncManager` 是負責執行定期租用/碎片掃描的元件。

  如需有關 KCL 2.3 中 `PeriodicShardSyncManager` 的詳細資訊，請參閱 [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java\$1L201-L213](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213)。

  在 KCL 2.3 中，新組態選項可用於在 `LeaseManagementConfig` 中設定 `PeriodicShardSyncManager`：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/shared-throughput-kcl-consumers.html)

  現在也會發出新的 CloudWatch 指標，以監控 `PeriodicShardSyncManager` 的運作狀態。如需詳細資訊，請參閱[PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task)。
+ 包括 `HierarchicalShardSyncer` 的最佳化，以僅為一層碎片建立租用。

#### KCL 1.x 中的同步，從 KCL 1.14 及更新版本開始
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

從 KCL 1.x (KCL 1.14) 及更新版本的最新支援版本開始，程式庫現在支援同步程序的下列變更。這些租用/碎片同步變更可大幅減少 KCL 取用者應用程式對 Kinesis Data Streams 服務進行的 API 呼叫次數，並最佳化 KCL 取用者應用程式中的租用管理。
+ 在應用程式的啟動載入期間，如果租用資料表是空的，則 KCL 會利用 `ListShard` API 的篩選選項 (`ShardFilter` 選用的請求參數) 來擷取和建立租用，僅用於在 `ShardFilter` 參數指定的時間開放的碎片快照。此 `ShardFilter` 參數可讓您篩選出 `ListShards` API 的回應。`ShardFilter` 參數的唯一必要屬性是 `Type`。KCL 會使用 `Type` 篩選屬性及其下列有效值來識別並傳回可能需要新租用之開啟碎片的快照：
  + `AT_TRIM_HORIZON` - 回應包括所有在 `TRIM_HORIZON` 打開的碎片。
  + `AT_LATEST` - 回應僅包含目前開放的資料串流碎片。
  + `AT_TIMESTAMP` - 回應包含開始時間戳記小於或等於指定時間戳記，且結束時間戳記大於或等於指定時間戳記或仍處於開放狀態的所有碎片。

  `ShardFilter` 用於為空租用資料表建立租用，以針對在 `KinesisClientLibConfiguration#initialPositionInStreamExtended` 指定之碎片的快照初始化租用。

  如需 `ShardFilter` 的相關資訊，請參閱 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html)。
+ 單一當選的工作者領導者執行租用/碎片同步處理，而不是執行租用/碎片同步處理以使租用資料表與資料串流中的最新碎片保持最新狀態的所有工作者。
+ KCL 1.14 使用 `ChildShards` 傳回 `GetRecords` 和 `SubscribeToShard` API 的參數來執行在關閉碎片 `SHARD_END` 上發生的租用/碎片同步處理，允許 KCL 工作者只為其完成處理之碎片的子碎片建立租用。如需詳細資訊，請參閱 [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) 和 [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。
+ 透過上述變更，KCL 的行為將從學習所有現有碎片的所有工作者的模型，轉移到僅學習每個工作者所擁有碎片的子碎片的工作者模型。因此，除了取用者應用程式啟動載入和重新分片事件期間發生的同步處理之外，KCL 現在還會執行額外的定期碎片/租用掃描，以識別租用資料表中的任何潛在漏洞 (換句話說，了解所有新碎片)，以確保資料串流的完整雜湊範圍正在處理，並在需要時為其建立租用。`PeriodicShardSyncManager` 是負責執行定期租用/碎片掃描的元件。

  當 `KinesisClientLibConfiguration#shardSyncStrategyType` 設定為 `ShardSyncStrategyType.SHARD_END` 時，`PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` 用於確定包含租用資料表中漏洞的連續掃描數目臨界值，之後強制執行碎片同步化。當 `KinesisClientLibConfiguration#shardSyncStrategyType` 設定為 `ShardSyncStrategyType.PERIODIC` 時，會忽略 `leasesRecoveryAuditorInconsistencyConfidenceThreshold`。

  如需有關 KCL 1.14 中 `PeriodicShardSyncManager` 的詳細資訊，請參閱[ https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java\$1L987-L999](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999)。

  在 KCL 1.14 中，新組態選項可用於在 `LeaseManagementConfig` 中設定 `PeriodicShardSyncManager`：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/shared-throughput-kcl-consumers.html)

  現在也會發出新的 CloudWatch 指標，以監控 `PeriodicShardSyncManager` 的運作狀態。如需詳細資訊，請參閱[PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task)。
+ KCL 1.14 現在也支援延遲租用清除。當碎片超過資料串流的保留期限或因重新分片操作而關閉時，達到 `SHARD_END` 時，`LeaseCleanupManager` 會以非同步方式刪除租用。

  新組態選項可用於設定 `LeaseCleanupManager`：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ 包括 `KinesisShardSyncer` 的最佳化，以僅為一層碎片建立租用。

## 使用相同的 KCL 2.x for Java 取用者應用程式處理多個資料串流
<a name="shared-throughput-kcl-multistream"></a>

本節說明適用於 Java 的 KCL 2.x 中的下列變更，可讓您建立可同時處理多個資料串流的 KCL 取用者應用程式。

**重要**  
僅在適用於 Java 的 KCL 2.x 中支援多串流處理，從適用於 Java 的 KCL 2.3 和更新版本開始。  
對於可以實現 KCL 2.x 的任何其他語言，「不」支援多串流處理。  
任何 KCL 1.x 版本均不支援多串流處理。
+ **MultistreamTracker interface**

  若要建置可以同時處理多個串流的取用者應用程式，您必須實作名為 [MultiStreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java) 的新介面。此介面包含傳回資料串流清單及其組態的 `streamConfigList` 方法，以供 KCL 取用者應用程式處理。請注意，正在處理的資料串流可以在取用者應用程式執行期變更。KCL 會定期呼叫 `streamConfigList`，以瞭解要處理的資料串流變更。

  該 `streamConfigList` 方法會填充 [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) 清單。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  請注意，`StreamIdentifier` 和 `InitialPositionInStreamExtended` 是必填欄位，而 `consumerArn` 是選填欄位。只有在您使用 KCL 2.x 來實作增強型散發取用者應用程式時，才必須提供 `consumerArn`。

  如需 的詳細資訊`StreamIdentifier`，請參閱 https：//[https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129)。若要建立 `StreamIdentifier`，建議您從 `streamArn`和 `streamCreationEpoch` v2.5.0 及更新版本中提供的 建立多串流執行個體。在不支援 的 KCL v2.3 和 v2.4 中`streamArm`，使用 格式建立多串流執行個體`account-id:StreamName:streamCreationTimestamp`。從下一個主要版本開始，此格式將被取代，不再受支援。

  `MultistreamTracker` 還包括刪除租用資料表 (`formerStreamsLeasesDeletionStrategy`) 中舊串流的租用的策略。請注意，在取用者應用程式執行期，無法變更策略。如需詳細資訊，請參閱 [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) 是一個應用程式範圍的類別，可用來指定建置 KCL 取用者應用程式時要使用的所有 KCL 2.x 組態設定值。`ConfigsBuilder` 類別現在支援 `MultistreamTracker` 介面。您可以使用一個資料串流的名稱初始化 ConfigsBuilder，以取用來自以下內容的記錄：

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  或者，如果您想實作一個同時處理多個串流的 KCL 取用者應用程式，則可以使用 `MultiStreamTracker` 初始化 ConfigsBuilder。

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ 針對 KCL 取用者應用程式實作多串流支援後，應用程式租用資料表的每一列現在都包含碎片 ID 和此應用程式所處理之多個資料串流的串流名稱。
+ 實作 KCL 取用者應用程式的多串流支援時，leaseKey 會採用下列結構：`account-id:StreamName:streamCreationTimestamp:ShardId`。例如 `111111111:multiStreamTest-1:12345:shardId-000000000336`。
**重要**  
如果您的現有 KCL 取用者應用程式設定僅處理一個資料串流，則 leaseKey (租用資料表的雜湊索引鍵) 就是碎片 ID。如果您重新設定這個現有的 KCL 取用者應用程式來處理多個資料串流，則它會中斷租用資料表，因為有了多重串流支援，leaseKey 結構必須如下所示：`account-id:StreamName:StreamCreationTimestamp:ShardId`。

## 將 KCL 與 AWS Glue 結構描述登錄檔搭配使用
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

您可以將 Kinesis 資料串流與 AWS Glue 結構描述登錄檔整合。 AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展結構描述，同時確保已註冊結構描述持續驗證產生的資料。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue結構描述登錄檔可讓您改善串流應用程式中end-to-end資料品質和資料控管。如需詳細資訊，請參閱 [AWS Glue 結構描述登錄檔](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。設定此整合的方法之一是透過 Java 中的 KCL。

**重要**  
目前，Kinesis Data Streams 和 AWS Glue 結構描述登錄整合僅支援使用在 Java 中實作的 KCL 2.3 消費者的 Kinesis 資料串流。不提供多語言支援。不支援 KCL 1.0 取用者。不支援 KCL 2.3 之前的 KCL 2.x 取用者。

如需如何使用 KCL 設定 Kinesis Data Streams 與結構描述登錄檔整合的詳細說明，請參閱「使用 KPL/KCL 程式庫與資料互動」一節[：將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)。

# 開發具有共用輸送量的自訂消費者
<a name="shared-throughput-consumers"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

如果您不需要專用輸送量從 Kinesis Data Streams 接收資料，而且不需要低於 200 毫秒的讀取傳播延遲，則可依照以下主題所述建置取用者應用程式。您可以使用 Kinesis Client Library (KCL) 或 適用於 Java 的 AWS SDK。

**Topics**
+ [使用 KCL 開發具有共用輸送量的自訂消費者](custom-kcl-consumers.md)

如需如何建置可經由專用輸送量從 Kinesis 資料串流接收記錄的取用者相關資訊，請參閱 [開發具有專用輸送量的增強型廣發消費者](enhanced-consumers.md)。

# 使用 KCL 開發具有共用輸送量的自訂消費者
<a name="custom-kcl-consumers"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

開發具有共用輸送量的自訂取用者應用程式的方法之一是使用 Kinesis Client Library (KCL)。

針對您正在使用的 KCL 版本選擇下列主題。

**Topics**
+ [開發 KCL 1.x 消費者](developing-consumers-with-kcl.md)
+ [開發 KCL 2.x 消費者](developing-consumers-with-kcl-v2.md)

# 開發 KCL 1.x 消費者
<a name="developing-consumers-with-kcl"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 為 Amazon Kinesis Data Streams 開發取用者應用程式。

如需詳細資訊，請參閱[關於 KCL （先前版本）](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview)。

根據您要使用的選項，從下列主題中進行選擇。

**Topics**
+ [在 Java 中開發 Kinesis Client Library 取用者](kinesis-record-processor-implementation-app-java.md)
+ [在 Node.js 中開發 Kinesis Client Library 取用者](kinesis-record-processor-implementation-app-nodejs.md)
+ [在 .NET 中開發 Kinesis Client Library 取用者](kinesis-record-processor-implementation-app-dotnet.md)
+ [在 Python 中開發 Kinesis Client Library 取用者](kinesis-record-processor-implementation-app-py.md)
+ [在 Ruby 中開發 Kinesis Client Library 消費者](kinesis-record-processor-implementation-app-ruby.md)

# 在 Java 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-java"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Java。如要檢視 Javadoc 參考，請參閱 [AmazonKinesisClient 類別的AWS Javadoc 主題](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)。

若要從 GitHub 下載 Java KCL，請前往 [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client)。若要尋找 Apache Maven 上的 Java KCL，請前往 [KCL 搜尋結果](https://search.maven.org/#search|ga|1|amazon-kinesis-client)頁面。如需從 GitHub 下載 Java KCL 取用者應用程式的範本程式碼，請至 GitHub 前往[適用於 Java 的 KCL 範例專案](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis)頁面。

範例應用程式使用 [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html)。您可以從 `configure` 檔案中定義的靜態 `AmazonKinesisApplicationSample.java` 方法更改日誌記錄組態。如需如何使用 Apache Commons Logging 搭配 Log4j 和 AWS Java 應用程式的詳細資訊，請參閱《 *適用於 Java 的 AWS SDK 開發人員指南*》中的 [ Log4j 記錄](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html)。

以 Java 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 IRecordProcessor 方法](#kinesis-record-processor-implementation-interface-java)
+ [實作 IRecordProcessor 介面的類別工廠](#kinesis-record-processor-implementation-factory-java)
+ [建立工作者](#kcl-java-worker)
+ [修改組態屬性](#kinesis-record-processor-initialization-java)
+ [遷移至記錄處理器界面的第 2 版](#kcl-java-v2-migration)

## 實作 IRecordProcessor 方法
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL 目前支援兩種版本的 `IRecordProcessor` 界面：原始界面適用於第一版的 KCL，而第 2 版自 KCL 1.5.0 版起均可使用。兩種界面皆完全受支援。兩種界面皆可完整支援。您的選擇取決於具體的情境要求。如需查看兩者間的所有差異，請參閱您在本機建置的 Javadoc 或原始碼。以下各節概要說明最低限度的入門實作。

**Topics**
+ [原始界面 (第 1 版)](#kcl-java-interface-original)
+ [更新界面 （第 2 版）](#kcl-java-interface-v2)

### 原始界面 (第 1 版)
<a name="kcl-java-interface-original"></a>

原始 `IRecordProcessor` 界面 (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) 公開了您的消費者必須實作的下列記錄處理器方法。範例提供的實作可讓您用於做為起點 (請參閱 `AmazonKinesisApplicationSampleRecordProcessor.java`)。

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialize**  
KCL 將於記錄處理器執行個體化時呼叫 `initialize` 方法，傳遞特定碎片 ID 作為參數。此記錄處理器只會處理該碎片，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
public void initialize(String shardId)
```

**processRecords**  
KCL 會呼叫 `processRecords` 方法，傳遞由 `initialize(shardId)` 方法所指定碎片中之資料記錄的清單。記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

除了資料本身外，記錄還包含序號和分割區索引鍵。工作者在處理資料時可使用這些值。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`Record` 類別公開了下列方法，可供存取記錄的資料、序號和分割區索引鍵。

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

範例中，私有方法 `processRecordsWithRetries` 的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過將檢查點指標 (`IRecordProcessorCheckpointer`) 傳遞給 `processRecords` 為您進行這項追蹤。記錄處理器將對此界面呼叫 `checkpoint` 方法，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將使用此資訊於上一筆已知處理過的記錄處重新啟動碎片處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 將不會開始處理新碎片。

如果您未傳遞參數，KCL 將假定對 `checkpoint` 的呼叫表示所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應僅在已處理過向其傳遞的清單中之所有記錄後才呼叫 `checkpoint`。記錄處理器不需要在每次呼叫 `checkpoint` 時呼叫 `processRecords`。例如，處理器可以每呼叫三次 `checkpoint` 才呼叫一次 `processRecords`。您可以選擇性指定某筆記錄的確切序號做為 `checkpoint` 的參數。在此情況下，KCL 將假定所有記錄皆已處理，僅止於處理到該記錄。

範例中，私有方法 `checkpoint` 示範了如何利用適當的例外狀況處理和重試邏輯來呼叫 `IRecordProcessorCheckpointer.checkpoint`。

KCL 倚賴 `processRecords` 以處理任何因處理資料記錄而引發的例外狀況。如果 `processRecords` 擲回例外狀況，KCL 將略過例外狀況發生前已傳遞的資料記錄。也就是說，這些記錄不會重新傳送到擲回例外狀況的記錄處理器或消費者內的任何其他記錄處理器。

**shutdown**  
KCL 會在處理結束 (關閉原因是 `TERMINATE`) 或工作者不再回應 (關閉原因為 `ZOMBIE`) 時呼叫 `shutdown` 方法。

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

KCL 還會將 `IRecordProcessorCheckpointer` 界面傳遞給 `shutdown`。如果關閉原因是 `TERMINATE`，表示記錄處理器應已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 方法。

### 更新界面 （第 2 版）
<a name="kcl-java-interface-v2"></a>

更新後的 `IRecordProcessor` 界面 (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) 公開了您的消費者必須實作的下列記錄處理器方法：

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

原始版本界面的所有引數皆可透過容器物件的 get 方法進行存取。例如，若要擷取 `processRecords()` 中的記錄清單，可使用 `processRecordsInput.getRecords()`。

自此界面的第 2 版 (KCL 1.5.0 及更新版本) 起，除了原始界面提供的輸入外，還可使用以下各項新的輸入：

起始序號  
在傳遞給 `InitializationInput` 操作的 `initialize()` 物件中，將向記錄處理器執行個體提供的各筆記錄其起始序號。這是由先前處理同一碎片的記錄處理器執行個體執行上一次檢查點作業的序號。當您的應用程式需要此序號時，請提供這項資訊。

待定檢查點序號  
在傳遞給 `initialize()` 操作的 `InitializationInput` 物件中，上一個記錄處理器執行個體於停止前未能遞交的待定檢查點序號 (若有)。

## 實作 IRecordProcessor 介面的類別工廠
<a name="kinesis-record-processor-implementation-factory-java"></a>

實作記錄處理器方法的類別還需要實作處理站。您的消費者在執行個體化工作者時將傳遞此處理站的參考。

範例是在 `AmazonKinesisApplicationSampleRecordProcessorFactory.java` 檔案中使用原始記錄處理器界面實作處理站類別。若您希望類別處理站建立第 2 版的記錄處理器，請使用套件名稱 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`。

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## 建立工作者
<a name="kcl-java-worker"></a>

如 [實作 IRecordProcessor 方法](#kinesis-record-processor-implementation-interface-java) 所述，KCL 記錄處理器界面有兩種版本可供選擇，而這將影響您建立工作者的方式。原始記錄處理器界面使用以下程式碼結構建立工作者：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

若為第 2 版的記錄處理器界面，您則可使用 `Worker.Builder` 建立工作者，而不必擔心應該使用哪個建構函數以及引數的順序。更新後的記錄處理器界面使用以下程式碼結構建立工作者：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## 修改組態屬性
<a name="kinesis-record-processor-initialization-java"></a>

範例提供了組態屬性的預設值。工作者的這份組態資料隨後整併到 `KinesisClientLibConfiguration` 物件。此物件以及 `IRecordProcessor` 的類別處理站參考將傳遞至用於執行個體化工作者的呼叫。您可使用 Java 屬性檔案 (請參閱 `AmazonKinesisApplicationSample.java`) 以自訂值覆寫任何這些屬性。

### Application name (應用程式名稱)
<a name="configuration-property-application-name"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式名稱。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可能分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-cred-java"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。例如，如果您在 EC2 執行個體上執行取用者，建議您使用 IAM 角色來啟動執行個體。反映與此 IAM 角色相關聯許可的 AWS 憑證，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者登入資料最為安全。

範例應用程式首先嘗試從執行個體中繼資料擷取 IAM 憑證：

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

如果範例應用程式無法從執行個體中繼資料取得登入資料，其將嘗試從屬性檔案擷取登入資料：

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

如需執行個體中繼資料的詳細資訊，請參閱《*Amazon EC2 使用者指南*》中的[執行個體中繼資料](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html)。

### 將工作者 ID 用於多個執行個體
<a name="kinesis-record-processor-workerid-java"></a>

範例初始化程式碼透過使用本機電腦的名稱並附加全域唯一識別符的方式建立工作者 ID (`workerId`)，如以下程式碼片段所示。如此可支援消費者應用程式的多個執行個體在單一電腦上執行的情況。

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## 遷移至記錄處理器界面的第 2 版
<a name="kcl-java-v2-migration"></a>

若您想要遷移使用原始界面的程式碼，則除了遵照前述步驟外，您還需執行以下步驟：

1. 將您的記錄處理器類別更改為匯入第 2 版的記錄處理器界面：

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. 將各項輸入的參考更改為使用容器物件的 `get` 方法。例如，在 `shutdown()` 操作中，將 "`checkpointer`" 更改為 "`shutdownInput.getCheckpointer()`"。

1. 將您的記錄處理器處理站類別更改為匯入第 2 版的記錄處理器處理站界面：

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. 將工作者的建構更改為使用 `Worker.Builder`。例如：

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# 在 Node.js 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Node.js。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Node.js 的 KCL 並完全以 Node.js 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Node.js KCL，請移至 [Kinesis Client Library (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs)。

**範本程式碼下載**

Node.js 提供了兩份適用於 KCL 的程式碼範例：
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  以下各節將利用此範例說明以 Node.js 建置 KCL 取用者應用程式的原理。
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   程度更為進階的範例，使用真實情境，適合您在熟悉基本範本程式碼之後研究。本文不會就此範例進行討論，但其本身附有 README 檔案提供更多詳細資訊。

以 Node.js 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作記錄處理器](#kinesis-record-processor-implementation-interface-nodejs)
+ [修改組態屬性](#kinesis-record-processor-initialization-nodejs)

## 實作記錄處理器
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

使用適用於 Node.js 的 KCL 所開發最簡單形式的取用者必須實作 `recordProcessor` 函數，後者則又包含 `initialize`、`processRecords` 和 `shutdown` 函數。範例提供的實作可讓您用於做為起點 (請參閱 `sample_kcl_app.js`)。

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialize**  
KCL 將於記錄處理器啟動時呼叫 `initialize` 函數。此記錄處理器只會處理以 `initializeInput.shardId` 傳遞的碎片 ID，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。這是因為 Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL 將依照 `initialize` 函數內指定的碎片，使用該碎片中各資料記錄的清單做為輸入以呼叫此函數。您所實作的記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
processRecords: function(processRecordsInput, completeCallback)
```

除了資料本身外，記錄還包含工作者在處理資料時可使用的序號和分割區索引鍵。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`record` 字典公開了以下的索引鍵值組，可供存取記錄的資料、序號和分割區索引鍵：

```
record.data
record.sequenceNumber
record.partitionKey
```

請注意，資料為 Base64 編碼。

基本範例中，`processRecords` 函數的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過以 `processRecordsInput.checkpointer` 傳遞的 `checkpointer` 物件進行這項追蹤。記錄處理器將呼叫 `checkpointer.checkpoint` 函數，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將在您重新啟動碎片處理時使用此資訊，以便從上一筆已知處理過的記錄處繼續處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 不會開始處理新碎片。

如果您未傳遞序號給 `checkpoint` 函數，KCL 將假定對 `checkpoint` 的呼叫表示所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應**僅**在已處理過向其傳遞的清單中之所有記錄後才呼叫 `checkpoint`。記錄處理器不需要在每次呼叫 `checkpoint` 時呼叫 `processRecords`。例如，處理器可以每呼叫三次該函數才呼叫一次 `checkpoint`，或於記錄處理器外部發生事件時呼叫 (比方您已實作的自訂確認/驗證服務)。

您可以選擇性指定某筆記錄的確切序號做為 `checkpoint` 的參數。在此情況下，KCL 將假定所有記錄皆已處理，僅止於處理到該記錄。

基本範例應用程式示範了最簡單可行的方式呼叫 `checkpointer.checkpoint` 函數。此時您可以在該函數中為您的消費者加入其他所需的檢查點邏輯。

**shutdown**  
KCL 會在處理結束 (`shutdownInput.reason` 為 `TERMINATE`) 或工作者不再回應 (`shutdownInput.reason` 為 `ZOMBIE`) 時呼叫 `shutdown` 函數。

```
shutdown: function(shutdownInput, completeCallback)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

KCL 還會將 `shutdownInput.checkpointer` 物件傳遞給 `shutdown`。如果關閉原因是 `TERMINATE`，您即應確保記錄處理器已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 函數。

## 修改組態屬性
<a name="kinesis-record-processor-initialization-nodejs"></a>

範例提供了組態屬性的預設值。您可使用自訂值覆寫任何這些屬性 (請參閱基本範例中的 `sample.properties`)。

### Application name (應用程式名稱)
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可能分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-credentials-nodejs"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。`sample.properties` 檔案必須向[預設登入資料供應者鏈結](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的某一登入資料供應者提供您的登入資料。如果您是在 Amazon EC2 執行個體上執行取用者，建議您使用 IAM 角色來設定執行個體。反映與此 IAM 角色相關聯許可 AWS 的憑證可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者應用程式的登入資料最為安全。

以下範例設定 KCL​ 使用 `sample_kcl_app.js` 中提供的記錄處理器來處理名為 `kclnodejssample` 的 Kinesis 資料串流。

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# 在 .NET 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 .NET。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 .NET 的 KCL 並完全以 .NET 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 .NET KCL，請前往 [Kinesis Client Library (.NET)](https://github.com/awslabs/amazon-kinesis-client-net)。如需下載 .NET KCL 取用者應用程式的範本程式碼，請至 GitHub 前往[適用於 .NET 的 KCL 範例取用者專案](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer)頁面。

以 .NET 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 IRecordProcessor 類別方法](#kinesis-record-processor-implementation-interface-dotnet)
+ [修改組態屬性](#kinesis-record-processor-initialization-dotnet)

## 實作 IRecordProcessor 類別方法
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

消費者必須實作 `IRecordProcessor` 的下列方法。範例消費者提供的實作可讓您用於做為起點 (請參閱 `SampleRecordProcessor` 中的 `SampleConsumer/AmazonKinesisSampleConsumer.cs` 類別)。

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**初始化**  
KCL 將於記錄處理器執行個體化時呼叫此方法，透過 `input` 參數 (`input.ShardId`) 傳遞特定碎片 ID。此記錄處理器只會處理該碎片，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。這是因為 Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
`input` 會呼叫此方法，透過 `input.Records` 參數 () 傳遞由 `Initialize` 方法所指定碎片中之資料記錄的清單。您所實作的記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
public void ProcessRecords(ProcessRecordsInput input)
```

除了資料本身外，記錄還包含序號和分割區索引鍵。工作者在處理資料時可使用這些值。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`Record` 類別公開了以下項目，可供存取記錄的資料、序號和分割區索引鍵：

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

範例中，`ProcessRecordsWithRetries` 方法的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過將 `Checkpointer` 物件傳遞給 `ProcessRecords` (`input.Checkpointer`) 為您進行這項追蹤。記錄處理器將呼叫 `Checkpointer.Checkpoint` 方法，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將使用此資訊於上一筆已知處理過的記錄處重新啟動碎片處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `Checkpointer.Checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 不會開始處理新碎片。

如果您未傳遞參數，KCL 將假定對 `Checkpointer.Checkpoint` 的呼叫代表所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應僅在已處理過向其傳遞的清單中之所有記錄後才呼叫 `Checkpointer.Checkpoint`。記錄處理器不需要在每次呼叫 `Checkpointer.Checkpoint` 時呼叫 `ProcessRecords`。例如，處理器可以每呼叫三次或四次該方法才呼叫一次 `Checkpointer.Checkpoint`。您可以選擇性指定某筆記錄的確切序號做為 `Checkpointer.Checkpoint` 的參數。在此情況下，KCL 將假定各記錄皆已處理，僅止於處理到該記錄。

範例中，私有方法 `Checkpoint(Checkpointer checkpointer)` 示範了如何利用適當的例外狀況處理和重試邏輯來呼叫 `Checkpointer.Checkpoint` 方法。

適用於 .NET 的 KCL 處理例外狀況的方式有別於其他 KCL 語言程式庫，其並不會處理任何因處理資料記錄而引發的例外狀況。使用者程式碼未捕捉的任何例外狀況都將導致程式當機。

**Shutdown**  
KCL 會在處理結束 (關閉原因是 `TERMINATE`) 或工作者不再回應 (關閉 `input.Reason` 值為 `ZOMBIE`) 時呼叫 `Shutdown` 方法。

```
public void Shutdown(ShutdownInput input)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

KCL 還會將 `Checkpointer` 物件傳遞給 `shutdown`。如果關閉原因是 `TERMINATE`，表示記錄處理器應已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 方法。

## 修改組態屬性
<a name="kinesis-record-processor-initialization-dotnet"></a>

範例消費者提供了組態屬性的預設值。您可使用自訂值覆寫任何這些屬性 (請參閱 `SampleConsumer/kcl.properties`)。

### Application name (應用程式名稱)
<a name="modify-kinesis-record-processor-application-name"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可能分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-creds-dotnet"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必須向[預設登入資料供應者鏈結](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的某一登入資料供應者提供您的登入資料。如果您在 EC2 執行個體上執行取用者應用程式，建議您使用 IAM 角色設定執行個體。反映與此 IAM 角色相關聯許可的 AWS 憑證，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者登入資料最為安全。

範例的屬性檔案設定由 KCL 使用 `AmazonKinesisSampleConsumer.cs` 所提供的記錄處理器，處理名為 "words" 的 Kinesis 資料串流。

# 在 Python 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-py"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Python。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Python 的 KCL 並完全以 Python 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Python KCL，請前往 [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。如需下載 Python KCL 取用者應用程式的範本程式碼，請至 GitHub 前[往適用於 Python 的 KCL 範例專案](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)頁面。

以 Python 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 RecordProcessor 類別方法](#kinesis-record-processor-implementation-interface-py)
+ [修改組態屬性](#kinesis-record-processor-initialization-py)

## 實作 RecordProcessor 類別方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 類別必須擴充 `RecordProcessorBase` 以實作下列方法。範例提供的實作可讓您用於做為起點 (請參閱 `sample_kclpy_app.py`)。

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialize**  
KCL 將於記錄處理器執行個體化時呼叫 `initialize` 方法，傳遞特定碎片 ID 作為參數。此記錄處理器只會處理該碎片，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。這是因為 Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL 會呼叫此方法，傳遞由 `initialize` 方法所指定碎片中之資料記錄的清單。您所實作的記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
def process_records(self, records, checkpointer) 
```

除了資料本身外，記錄還包含序號和分割區索引鍵。工作者在處理資料時可使用這些值。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`record` 字典公開了以下的索引鍵值組，可供存取記錄的資料、序號和分割區索引鍵：

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

請注意，資料為 Base64 編碼。

範例中，`process_records` 方法的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過將 `Checkpointer` 物件傳遞給 `process_records` 為您進行這項追蹤。記錄處理器將對此物件呼叫 `checkpoint` 方法，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將使用此資訊於上一筆已知處理過的記錄處重新啟動碎片處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 不會開始處理新碎片。

如果您未傳遞參數，KCL 將假定對 `checkpoint` 的呼叫表示所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應僅在已處理過向其傳遞的清單中之所有記錄後才呼叫 `checkpoint`。記錄處理器不需要在每次呼叫 `checkpoint` 時呼叫 `process_records`。例如，處理器可以每呼叫三次該方法才呼叫一次 `checkpoint`。您可以選擇性指定某筆記錄的確切序號做為 `checkpoint` 的參數。在此情況下，KCL 將假定所有記錄皆已處理，僅止於處理到該記錄。

範例中，私有方法 `checkpoint` 示範了如何利用適當的例外狀況處理和重試邏輯來呼叫 `Checkpointer.checkpoint` 方法。

KCL 倚賴 `process_records` 以處理任何因處理資料記錄而引發的例外狀況。如果 `process_records` 擲回例外狀況，KCL 將略過例外狀況發生前已傳遞至 `process_records` 的資料記錄。也就是說，這些記錄不會重新傳送到擲回例外狀況的記錄處理器或消費者內的任何其他記錄處理器。

**shutdown**  
 KCL 會在處理結束 (關閉原因是 `TERMINATE`) 或工作者不再回應 (關閉 `reason` 為 `ZOMBIE`) 時呼叫 `shutdown` 方法。

```
def shutdown(self, checkpointer, reason)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

 KCL 還會將 `Checkpointer` 物件傳遞給 `shutdown`。如果關閉 `reason` 是 `TERMINATE`，表示記錄處理器應已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 方法。

## 修改組態屬性
<a name="kinesis-record-processor-initialization-py"></a>

範例提供了組態屬性的預設值。您可使用自訂值覆寫任何這些屬性 (請參閱 `sample.properties`)。

### Application name (應用程式名稱)
<a name="kinesis-record-processor-application-name-py"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式名稱。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-creds-py"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必須向[預設登入資料供應者鏈結](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的某一登入資料供應者提供您的登入資料。如果您在 Amazon EC2 執行個體上執行取用者應用程式，建議您使用 IAM 角色設定執行個體。反映與此 IAM 角色相關聯許可的 AWS 憑證，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者應用程式的登入資料最為安全。

範例的屬性檔案設定由 KCL 使用 `sample_kclpy_app.py` 所提供的記錄處理器，處理名為 "words" 的 Kinesis 資料串流。

# 在 Ruby 中開發 Kinesis Client Library 消費者
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Ruby。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Ruby 的 KCL 並完全以 Ruby 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Ruby KCL，請前往 [Kinesis Client Library (Ruby)](https://github.com/awslabs/amazon-kinesis-client-ruby)。如需下載 Ruby KCL 取用者應用程式的範本程式碼，請至 GitHub 前往[適用於 Ruby 的 KCL 範例專案](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples)頁面。

如需 KCL Ruby 支援程式庫的詳細資訊，請參閱 [KCL Ruby Gems 文件](http://www.rubydoc.info/gems/aws-kclrb)。

# 開發 KCL 2.x 消費者
<a name="developing-consumers-with-kcl-v2"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

此主題說明如何使用 2.0 版本的 Kinesis Client Library (KCL)。

如需關於 KCL 的更多資訊，請參閱[使用 Kinesis Client Library 1.x 開發取用者](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html)中的概觀。

根據您要使用的選項，從下列主題中進行選擇。

**Topics**
+ [在 Java 中開發 Kinesis Client Library 取用者](kcl2-standard-consumer-java-example.md)
+ [在 Python 中開發 Kinesis Client Library 取用者](kcl2-standard-consumer-python-example.md)
+ [使用 KCL 2.x 開發增強型廣發消費者](building-enhanced-consumers-kcl-retired.md)

# 在 Java 中開發 Kinesis Client Library 取用者
<a name="kcl2-standard-consumer-java-example"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

以下程式碼顯示 `ProcessorFactory` 和 `RecordProcessor` 的 Java 範例實作。如果您想要利用增強型散發功能，請參閱[使用具有增強型散發功能的取用者](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html)​。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# 在 Python 中開發 Kinesis Client Library 取用者
<a name="kcl2-standard-consumer-python-example"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Python。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Python 的 KCL 並完全以 Python 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Python KCL，請前往 [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。如需下載 Python KCL 取用者應用程式的範本程式碼，請至 GitHub 前[往適用於 Python 的 KCL 範例專案](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)頁面。

以 Python 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 RecordProcessor 類別方法](#kinesis-record-processor-implementation-interface-py)
+ [修改組態屬性](#kinesis-record-processor-initialization-py)

## 實作 RecordProcessor 類別方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 類別必須擴充 `RecordProcessorBase` 類別以實作下列方法。

```
initialize
process_records
shutdown_requested
```

範例提供的實作可讓您用於做為起點。

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## 修改組態屬性
<a name="kinesis-record-processor-initialization-py"></a>

範例提供了組態屬性的預設值，如下列指令碼所示。您可使用自訂值覆寫任何這些屬性。

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Application name (應用程式名稱)
<a name="kinesis-record-processor-application-name-py"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式名稱。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 憑證
<a name="kinesis-record-processor-creds-py"></a>

您必須將 AWS 登入資料提供給[預設登入資料提供者鏈結中的其中一個登入資料提供者](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。如果您在 Amazon EC2 執行個體上執行取用者應用程式，建議您使用 IAM role. AWS credentials 設定執行個體，以反映與此 IAM 角色相關聯的許可，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者應用程式的登入資料最為安全。

# 使用 KCL 2.x 開發增強型廣發消費者
<a name="building-enhanced-consumers-kcl-retired"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

在 Amazon Kinesis Data Streams 中使用*增強型散發功*能的取用者從資料串流接收記錄時，專用輸送量可高達每個碎片每秒 2 MB 的資料。這類消費者不必與其他從串流接收資料的消費者競爭。如需詳細資訊，請參閱[開發具有專用輸送量的增強型廣發消費者](enhanced-consumers.md)。

您可以使用 2.0 版或更新版本的 Kinesis Client Library (KCL) 開發應用程式，利用增強型散發功能從串流接收資料。KCL 將自動為您的應用程式訂閱串流中的所有碎片，並確保您的取用者應用程式讀取的輸送量值達到每個碎片每秒 2 MB。如果您想要使用 KCL 但不想開啟增強型散發功能，請參閱[使用 Kinesis Client Library 2.0 開發取用者](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html)​。

**Topics**
+ [在 Java 中使用 KCL 2.x 開發增強型廣發消費者](building-enhanced-consumers-kcl-java.md)

# 在 Java 中使用 KCL 2.x 開發增強型廣發消費者
<a name="building-enhanced-consumers-kcl-java"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 2.0 版或更新版本的 Kinesis Client Library (KCL) 在 Amazon Kinesis Data Streams 中開發應用程式，利用增強型散發功能從串流接收資料。以下程式碼顯示 `ProcessorFactory` 和 `RecordProcessor` 的 Java 範例實作。

建議您在 `KinesisAsyncClient` 中，使用 `KinesisClientUtil` 來建立 `KinesisAsyncClient` 及設定 `maxConcurrency`。

**重要**  
Amazon Kinesis Client 可能會明顯發生延遲，除非您設定 `KinesisAsyncClient` 的 `maxConcurrency` 夠高，足以運作所有的租賃服務，並可額外使用 `KinesisAsyncClient`。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# 將消費者從 KCL 1.x 遷移至 KCL 2.x
<a name="kcl-migration"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis 用戶端程式庫頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

此主題說明 Kinesis Client Library (KCL) 版本 1.x 和 2.x 之間的差異。她還說明如何將取用者從 KCL 的版本 1.x 遷移至版本 2.x。遷移用戶端後，該用戶端會從前一個檢查點的位置開始處理記錄。

2.0 版 KCL 引進了以下的介面變更：


**KCL 介面變更**  

| KCL 1.x 介面 | KCL 2.0 介面 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | 整併至 software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [遷移記錄處理器](#recrod-processor-migration)
+ [遷移記錄處理器工廠](#recrod-processor-factory-migration)
+ [遷移工作者](#worker-migration)
+ [設定 Amazon Kinesis 用戶端](#client-configuration)
+ [閒置時間移除](#idle-time-removal)
+ [用戶端組態移除](#client-configuration-removals)

## 遷移記錄處理器
<a name="recrod-processor-migration"></a>

以下範例顯示基於 KCL 1.x 所實作的記錄處理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**遷移記錄處理器類別**

1. 將介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改為 `software.amazon.kinesis.processor.ShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. 更新 `import` 和 `initialize` 方法的 `processRecords` 陳述式。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. 將 `shutdown` 方法取代為以下的新方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

記錄處理器類別經更新後的版本如下。

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## 遷移記錄處理器工廠
<a name="recrod-processor-factory-migration"></a>

記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 處理站的範例。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**遷移記錄處理器處理站**

1. 將實作的介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改為 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. 更改 `createProcessor` 的傳回簽章。

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

以下是 2.0 版記錄處理器處理站的範例：

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## 遷移工作者
<a name="worker-migration"></a>

在 KCL 的版本 2.0，名為 `Scheduler`​ 的新類別會取代 ​`Worker` 類別。以下是 KCL 1.x 工作者的範例。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**移轉至工作者**

1. 將 `Worker` 類別的 `import` 陳述式變更為 `Scheduler` 和 `ConfigsBuilder` 類別的匯入陳述式。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 建立 `ConfigsBuilder` 和 `Scheduler`，如下列範例所示：

   建議您在 `KinesisAsyncClient` 中，使用 `KinesisClientUtil` 來建立 `KinesisAsyncClient` 及設定 `maxConcurrency`。
**重要**  
Amazon Kinesis Client 可能會明顯發生延遲，除非您設定 `KinesisAsyncClient` 的 `maxConcurrency` 夠高，足以運作所有的租賃服務，並可額外使用 `KinesisAsyncClient`。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## 設定 Amazon Kinesis 用戶端
<a name="client-configuration"></a>

隨著 2.0 版 Kinesis Client Library 的推出，用戶端組態已從單一組態類別 (`KinesisClientLibConfiguration`) 進展為六個組態類別。下表說明遷移情形。


**組態欄位及其新類別**  

| 原始欄位 | 新的組態類別 | Description | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | 此 KCL 應用程式的名稱。用做為 tableName 和 consumerName 的預設值。 | 
| tableName | ConfigsBuilder | 允許覆寫用於 Amazon DynamoDB 租用資料表的資料表名稱。 | 
| streamName | ConfigsBuilder | 此應用程式從中處理其記錄的串流名稱。 | 
| kinesisEndpoint | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| dynamoDBEndpoint | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| initialPositionInStreamExtended | RetrievalConfig | KCL 開始擷記錄所在碎片中的位置，透過應用程式初次執行開始。 | 
| kinesisCredentialsProvider | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| dynamoDBCredentialsProvider | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| cloudWatchCredentialsProvider | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| failoverTimeMillis | LeaseManagementConfig | 將租用擁有者視為失敗前必須經過的毫秒數。 | 
| workerIdentifier | ConfigsBuilder | 代表應用程式處理器本項實例的唯一識別符。其必須獨一無二。 | 
| shardSyncIntervalMillis | LeaseManagementConfig | 碎片同步呼叫的時間。 | 
| maxRecords | PollingConfig | 允許設定 Kinesis 傳回的記錄數上限。 | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | 此選項已經移除。請參閱「閒置時間移除項目」一節。 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | 設定時，即使 Kinesis 無提供任何記錄也會呼叫記錄處理器。 | 
| parentShardPollIntervalMillis | CoordinatorConfig | 記錄處理器應該輪詢以檢查父碎片是否已完成的頻率。 | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | 設定時，只要已開始處理子租用就會隨即移除租用。 | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | 設定時，會忽略具有開放碎片的子碎片。此項主要用於 DynamoDB Streams。 | 
| kinesisClientConfig | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| dynamoDBClientConfig | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| cloudWatchClientConfig | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| taskBackoffTimeMillis | LifecycleConfig | 重試失敗任務的等待時間。 | 
| metricsBufferTimeMillis | MetricsConfig | 控制 CloudWatch 指標發佈。 | 
| metricsMaxQueueSize | MetricsConfig | 控制 CloudWatch 指標發佈。 | 
| metricsLevel | MetricsConfig | 控制 CloudWatch 指標發佈。 | 
| metricsEnabledDimensions | MetricsConfig | 控制 CloudWatch 指標發佈。 | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | 此選項已經移除。請參閱「檢查點序號驗證」一節。 | 
| regionName | ConfigsBuilder | 此選項已經移除。請參閱「用戶端組態移除項目」一節。 | 
| maxLeasesForWorker | LeaseManagementConfig | 應用程式的單一執行個體應該接受的租用數上限。 | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | 應用程式一次應該嘗試挪用的租用數上限。 | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Kinesis Client Library 需要建立新的 DynamoDB 租用資料表時所使用的 DynamoDB 讀取 IOP。 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Kinesis Client Library 需要建立新的 DynamoDB 租用資料表時所使用的 DynamoDB 讀取 IOP。 | 
| initialPositionInStreamExtended | LeaseManagementConfig | 應用程式應該在串流中開始的初始位置。這僅在初次建立租用時使用。 | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | 如果租用資料表包含現有的租用，即停用同步處理碎片資料。TODO：KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | 要使用哪些碎片優先順序 | 
| shutdownGraceMillis | N/A | 此選項已經移除。請參閱「MultiLang 移除項目」一節。 | 
| timeoutInSeconds | N/A | 此選項已經移除。請參閱「MultiLang 移除項目」一節。 | 
| retryGetRecordsInSeconds | PollingConfig | 為故障設定 GetRecords 嘗試間的延遲。 | 
| maxGetRecordsThreadPool | PollingConfig | 用於 GetRecords 的執行緒集區大小。 | 
| maxLeaseRenewalThreads | LeaseManagementConfig | 控制租用續約執行緒集區的大小。應用程式可容納的租用數愈多，此集區就應該愈大。 | 
| recordsFetcherFactory | PollingConfig | 允許將工廠進行替換，該工廠會用於建立擷取程式，而該擷取程式會從串流進行擷取。 | 
| logWarningForTaskAfterMillis | LifecycleConfig | 任務未完成的情況下要等待多久的時間才記錄警告。 | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 呼叫 ListShards 發生錯誤時將等待的間隔毫秒數。 | 
| maxListShardsRetryAttempts | RetrievalConfig | ListShards 在放棄之前重試的次數上限。 | 

## 閒置時間移除
<a name="idle-time-removal"></a>

1.x 版 KCL 的 `idleTimeBetweenReadsInMillis` 對應於兩種計量：
+ 任務分派檢查的間隔時間量。您現在可以透過設定 `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`，設定各任務的此一間隔時間。
+ 當 Kinesis Data Streams 未傳回任何記錄時將休眠的時間量。在 2.0 版中，具強化廣發功能的記錄是自其各自的擷取器推送。僅當推送的請求送達時，碎片消費者才會發生活動。

## 用戶端組態移除
<a name="client-configuration-removals"></a>

在 2.0 版中，KCL 不再建立用戶端。其端賴使用者提供有效的用戶端。基於此項變更，所有控制用戶端建立的組態參數皆已移除。若您需要這類參數，可以先就用戶端進行所需設定再將用戶端提供予 `ConfigsBuilder`。


****  

| 已移除的欄位 | 等效組態 | 
| --- | --- | 
| kinesisEndpoint | 使用慣用的端點設定開發套件 KinesisAsyncClient：KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()。 | 
| dynamoDBEndpoint | 使用慣用的端點設定開發套件 DynamoDbAsyncClient：DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()。 | 
| kinesisClientConfig | 使用所需的組態設定開發套件 KinesisAsyncClient：KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| dynamoDBClientConfig | 使用所需的組態設定開發套件 DynamoDbAsyncClient：DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| cloudWatchClientConfig | 使用所需的組態設定開發套件 CloudWatchAsyncClient：CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| regionName | 使用慣用的區域設定開發套件。所有開發套件用戶端的做法皆相同。例如 KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build()。 | 