

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Kinesis Client Library
<a name="kcl"></a>

## 什么是 Kinesis Client Library？
<a name="kcl-library-what-is"></a>

Kinesis Client Library（KCL）是独立的 Java 软件库，其旨在简化来自 Amazon Kinesis Data Streams 的数据的使用和处理过程。KCL 负责处理与分布式计算相关的多种复杂任务，这样开发人员就可以专注于实现其数据处理业务逻辑。KCL 可管理的活动包括在多个工作程序之间进行负载均衡、响应工作程序故障、已处理记录的检查点操作，以及响应流中分片数量的变化。

KCL 经常更新，以纳入新版底层库、安全改进和错误修复。建议使用最新版本的 KCL，以避免出现已知问题，并从所有最新的改进中受益。要查找最新的 KCL 版本，请访问[ KCL Github](https://github.com/awslabs/amazon-kinesis-client)。

**重要**  
建议使用最新的 KCL 版本，以避免出现已知错误和问题。如果您使用的是 KCL 2.6.0 或更早版本，请升级到 KCL 2.6.1 或更高版本，以避免在流容量变化时出现分片处理受阻，但这种情况比较罕见。
KCL 属于 Java 库。使用名为的基于 Java 的守护程序提供对 Java 以外语言的支持。 MultiLangDaemon MultiLangDaemon通过 STDIN 和 STDOUT 与 KCL 应用程序交互。有关 MultiLangDaemon on 的更多信息 GitHub，请参阅[使用非 Java 语言通过 KCL 开发消费端](develop-kcl-consumers-non-java.md)。
不要在 KCL 3.x 中使用 2.27.19 到 2.27.23 适用于 Java 的 AWS SDK 版本。这些版本出现的问题会导致使用 KCL 的 DynamoDB 时出现相关异常错误。我们建议您使用 2.28.0 或更高 适用于 Java 的 AWS SDK 版本来避免此问题。

## KCL 主要功能和优势
<a name="kcl-benefits"></a>

以下是 KCL 的主要功能和相关优势：
+ **可扩展性**：KCL 支持应用程序在多个工作程序之间分配处理负载，以实现动态扩展。您可以手动横向缩减或扩展应用程序，也可以自动扩缩，而不必担心负载的重新分配。
+ **负载均衡**：KCL 在可用工作程序之间自动平衡处理负载，从而实现跨工作程序的均匀工作分配。
+ **检查点**：KCL 对已处理记录的检查点操作进行管理，使应用程序能够从上次成功处理的位置恢复处理。
+ **容错能力**：KCL 有内置容错能力，即使个别工作程序出现故障，也能确保数据处理继续进行。KCL 还提供 at-least-once送货服务。
+ **处理流级别的变化**：KCL 可以适应由于数据量变化而可能发生的分片拆分与合并。KCL 通过确保只有在父分片完成处理并进行检查点操作后才处理子分片，从而保持排序状态。
+ **监控**：KCL 与 Amazon 集成， CloudWatch 用于消费者级监控。
+ **多语言支持**：KCL 原生支持 Java，并支持多种非 Java 编程语言。 MultiLangDaemon

# KCL 概念
<a name="kcl-concepts"></a>

本节说明了 Kinesis Client Library（KCL）的核心概念和交互功能。这些概念是开发和管理 KCL 消费端应用程序的基础。
+ **KCL 消费端应用程序**：使用 Kinesis Client Library 自定义构建的应用程序，旨在读取和处理 Kinesis 数据流中的记录。
+ **工作程序**：KCL 消费端应用程序通常是分布式的，同时运行一个或多个工作程序。KCL 协调工作程序以分布式方式使用来自流的数据，并在多个工作程序之间平衡负载。
+ **调度器**：KCL 工作程序用于开始处理数据的高级类。每个 KCL 工作程序都有一个调度器。调度器负责初始化和监督各种任务，包括同步 Kinesis 数据流的分片信息、跟踪工作程序中的分片分配以及根据分配给工作程序的分片处理来自流的数据。调度器可以采用多种影响调度器行为的配置，例如待处理流的名称以及 AWS 凭证。调度器启动数据记录传输，使其从流传输至记录处理器。
+ **记录处理器**：定义 KCL 消费端应用程序如何处理从数据流中接收的数据的逻辑。必须在记录处理器中实现您自己的自定义数据处理逻辑。KCL 工作程序可实例化调度器。然后，调度器为持有租约的每个分片实例化一个记录处理器。一个工作程序可以运行多个记录处理器。
+ **租约**：定义工作程序和分片之间的分配。KCL 消费端应用程序使用租约将数据记录处理分配到多个工作程序中。每个分片在任何给定时间仅通过租约与一个工作程序绑定，而每个工作程序可以同时持有一份或多份租约。当工作程序因停止或故障而不再持有租约时，KCL 会指派其他工作程序承接租约。要了解有关租约的更多信息，请参阅 [Github documentation: Lease Lifecycle](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle)。
+ **租约表**：唯一的 Amazon DynamoDB 表，用于跟踪 KCL 消费端应用程序的所有租约。每个 KCL 消费端应用程序都会创建自己的租约表。租约表用于跨工作程序维持状态，以协调数据的处理。有关更多信息，请参阅 [KCL 中的 DynamoDB 元数据表和负载均衡](kcl-dynamoDB.md)。
+ **检查点**：将最后一次成功处理的记录的位置永久存储于分片中的过程。KCL 管理检查点操作，以确保在工作程序故障或应用程序重新启动时，可以从最后一个检查点操作位置恢复处理。检查点作为租约元数据的一部分存储于 DynamoDB 租约表中。这样工作程序就可以从前一个工作程序停止处继续处理。

# KCL 中的 DynamoDB 元数据表和负载均衡
<a name="kcl-dynamoDB"></a>

KCL 管理来自工作程序的租约和 CPU 利用率指标等元数据。KCL 使用 DynamoDB 表跟踪这些元数据。对于每个 Amazon Kinesis Data Streams 应用程序，KCL 会创建三个 DynamoDB 表来管理元数据：租约表、工作程序指标表和协调器状态表。

**注意**  
KCL 3.x 引入了两个新的元数据表：*工作程序指标*和*协调器状态*表。

**重要**  
 必须为 KCL 应用程序添加适当的权限，才能在 DynamoDB 中创建和管理元数据表。有关更多信息，请参阅 [KCL 消费端应用程序所必需的 IAM 权限](kcl-iam-permissions.md)。  
KCL 消费端应用程序不会自动移除这三个 DynamoDB 元数据表。在停用消费端应用程序时，务必移除这些由 KCL 消费端应用程序创建的 DynamoDB 元数据表，以避免不必要的成本。

## 租约表
<a name="kcl-leasetable"></a>

租约表是唯一的 Amazon DynamoDB 表，用于跟踪由 KCL 消费端应用程序的调度器租赁和处理的分片。每个 KCL 消费端应用程序都会创建自己的租约表。KCL 默认将消费端应用程序的名称用作租约表的名称。可使用配置来设置自定义表名称。KCL 还使用 leaseOwner 的分区键在租约表上创建[全局二级索引](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html)，以高效发现租约。全局二级索引镜像了基础租约表中的 leaseKey 属性。如果应用程序启动时 KCL 消费端应用程序的租约表不存在，则其中一个工作程序会为您的应用程序创建租约表。

您可在消费端应用程序运行的同时使用 [Amazon DynamoDB 控制台](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html)查看租约表。

**重要**  
每个 KCL 消费端应用程序的名称必须是唯一的，以防止租约表名称出现重复。
除开与 Kinesis Data Streams 本身关联的费用，您的账户将被收取与 DynamoDB 表关联的费用。

租约表中的每行表示您消费端应用程序的调度器正在处理的分片。重要字段包括：
+ **leaseKey**：对于单流处理，此为分片 ID。使用 KCL 进行多流处理时，其结构为 `account-id:StreamName:streamCreationTimestamp:ShardId`。leaseKey 是租约表的分区键。有关多流处理的更多信息，请参阅[使用 KCL 进行多流处理](kcl-multi-stream.md)。
+ **checkpoint：**分片的最新检查点序号。
+ **checkpointSubSequence数字：**使用 Kinesis Producer 库的聚合功能时，这是对**检查点**的扩展，用于跟踪 Kinesis 记录中的单个用户记录。
+ **leaseCounter**：用于检查工作程序当前是否正在积极处理租约。如果租约所有权转移给其他工作程序，leaseCounter 就会增加。
+ **leaseOwner：**持有此租约的当前工作程序。
+ **ownerSwitchesSince检查点：**自上次检查点以来，这份租约更换了多少次员工。
+ **parentShardId:** 此分片的父级 ID。在子分片上开始处理之前，务必确保父分片已完全处理，从而保持正确的记录处理顺序。
+ **childShardId:** 此分片的拆分或合并 IDs产生的子分片列表。用于在重新分片操作期间跟踪分片世系并管理处理顺序。
+ **startingHashKey:** 此分片的哈希键范围的下限。
+ **endingHashKey:** 此分片的哈希键范围的上限。

如果使用 KCL 进行多流处理，您会在租约表中看到以下两个额外字段。有关更多信息，请参阅 [使用 KCL 进行多流处理](kcl-multi-stream.md)。
+ **shardID：**分片的 ID。
+ **streamName**：数据流的标识符采用以下格式：`account-id:StreamName:streamCreationTimestamp`。

## 工作程序指标表
<a name="kcl-worker-metrics-table"></a>

工作程序指标表是各个 KCL 应用程序唯一的 Amazon DynamoDB 表，用于记录各工作程序的 CPU 利用率指标。KCL 使用这些指标进行高效的租约分配，从而在工作程序之间实现资源利用的平衡。默认情况下，KCL 使用 `KCLApplicationName-WorkerMetricStats` 作为工作程序指标表的名称。

## 协调器状态表
<a name="kcl-coordinator-state-table"></a>

协调器状态表是各个 KCL 应用程序唯一的 Amazon DynamoDB 表，用于存储工作程序的内部状态信息。例如，协调器状态表存储有关领导选择的数据，或从 KCL 2.x 就地迁移至 KCL 3.x 的相关元数据。默认情况下，KCL 使用 `KCLApplicationName-CoordinatorState` 作为协调器状态表的名称。

## KCL 创建的元数据表的 DynamoDB 容量模式
<a name="kcl-capacity-mode"></a>

默认情况下，Kinesis Client Library（KCL）使用[按需容量模式](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html)创建 DynamoDB 元数据表，例如租约表、工作程序指标表和协调器状态表。此模式可自动扩缩读取和写入容量以适应流量，而无需进行容量规划。我们强烈建议将容量模式保留为按需模式，以便更有效地操作这些元数据表。

如果决定将租约表切换到[预置容量模式](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html)，请遵循以下最佳实践：
+ 分析使用模式：
  + 使用 Amazon 指标监控应用程序的读写模式和使用情况（RCU、WCU）。 CloudWatch 
  + 了解峰值及平均吞吐量需求。
+ 计算所需的容量：
  + 根据您的分析估算读取容量单位 (RCUsWCUs) 和写入容量单位 ()。
  + 考虑诸如分片数量、检查点频率和工作程序计数之类的因素。
+ 实现自动扩缩：
  + 使用 [DynamoDB 自动扩缩](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling)自动调整预置容量，并设置适当的最小和最大容量限制。
  + DynamoDB 自动扩缩有助于避免 KCL 元数据表达到容量限制和受到限制。
+ 定期监控和优化：
  + 持续监控的 CloudWatch 指标`ThrottledRequests`。
  + 随着时间推移，按照工作负载的变化调整容量。

如果在 KCL 消费端应用程序的元数据 DynamoDB 表中遇到 `ProvisionedThroughputExceededException`，必须增加 DynamoDB 表的预置吞吐能力。如在首次创建消费端应用程序时设置了某种级别的读取容量单位（RCU）和写入容量单位（WCU），随着使用量的增长，这种容量可能无法满足需求。例如，如果 KCL 消费端应用程序频繁执行检查点操作或在包含多个分片的流上运行，则可能需要更多的容量单位。有关 DynamoDB 中预置吞吐量的信息，请参阅《Amazon DynamoDB 开发人员指南》中的 [DynamoDB 吞吐能力](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html)和[更新表](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable)。

## KCL 如何向工作程序分配租约并平衡负载
<a name="kcl-assign-leases"></a>

KCL 不断收集和监控运行工作程序的计算主机的 CPU 利用率指标，以确保工作负载均匀分配。这些 CPU 利用率指标存储于 DynamoDB 的工作程序指标表中。如果 KCL 检测到某些工作程序的 CPU 利用率高于其他工作程序，它会在工作程序之间重新分配租约，以降低高使用率工作程序的负载。目标是在消费端应用程序队列中更均匀地平衡工作负载，防止任何单一工作程序过载。由于 KCL 在消费端应用程序队列中分配 CPU 利用率，所以可以通过选择适当数量的工作程序来适当调整消费端应用程序队列容量的大小，或者使用自动扩缩功能来高效管理计算容量以降低成本。

**重要**  
只有在满足某些先决条件的情况下，KCL 才能从工作程序收集 CPU 利用率指标。有关更多信息，请参阅 [先决条件](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites)。如果 KCL 无法从工作程序收集 CPU 利用率指标，KCL 将回退到使用每个工作程序的吞吐量来分配租约，并在队列中的工作程序之间平衡负载。KCL 将监控每个工作程序在给定时间收到的吞吐量并重新分配租约，以确保每个工作程序从其被分配的租约中获得类似的总吞吐量水平。

# 使用 KCL 开发消费端
<a name="develop-kcl-consumers"></a>

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的消费端应用程序。

KCL 有多种语言版本。本主题介绍如何使用 Java 和非 Java 语言开发 KCL 消费端。
+ 要查看 Kinesis Client Library Javadoc 参考，请参阅 [Amazon Kinesis Client Library Javadoc](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html)。
+ 要从中下载适用于 Java 的 KCL GitHub，请参阅适用于 Java 的 [Amazon Kinesis 客户端库](https://github.com/awslabs/amazon-kinesis-client)。
+ 要在 Apache Maven 上找到 KCL for Java，请参阅 [KCL Maven Central 存储库](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client)。

**Topics**
+ [使用 Java 通过 KCL 开发消费端](develop-kcl-consumers-java.md)
+ [使用非 Java 语言通过 KCL 开发消费端](develop-kcl-consumers-non-java.md)

# 使用 Java 通过 KCL 开发消费端
<a name="develop-kcl-consumers-java"></a>

## 先决条件
<a name="develop-kcl-consumers-java-prerequisites"></a>

开始使用 KCL 3.x 之前，请确保已具备以下条件：
+ Java Development Kit（JDK）8 或更高版本
+ 适用于 Java 的 AWS SDK 2.x
+ 用于依赖项管理的 Maven 或 Gradle

KCL 从运行工作程序的计算主机上收集 CPU 利用率指标（例如 CPU 利用率）来平衡负载，从而在各工作程序之间实现均衡的资源利用率水平。要让 KCL 能够从工作程序收集 CPU 使用率指标，必须满足以下先决条件：

 **Amazon Elastic Compute Cloud（Amazon EC2）**
+ 操作系统必须是 Linux 操作系统。
+ 您必须[IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)在 EC2 实例中启用。

 **Amazon EC2 上的 Amazon Elastic Container Service（Amazon ECS）**
+ 操作系统必须是 Linux 操作系统。
+ 必须启用 [ECS 任务元数据端点版本 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html)。
+ Amazon ECS 容器代理版本必须为 1.39.0 或更高版本。

 **Amazon ECS 已开启 AWS Fargate**
+ 必须启用 [Fargate 任务元数据端点版本 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html)。如果使用的是 Fargate 平台版本 1.4.0 或更高版本，则默认启用此功能。
+ Fargate（平台版本 1.4.0 或更高版本）。

 **Amazon EC2 上的 Amazon Elastic Kubernetes Service（Amazon EKS）** 
+ 操作系统必须是 Linux 操作系统。

 **亚马逊 EKS 开启 AWS Fargate**
+ Fargate（平台版本 1.3.0 或更高版本）。

**重要**  
如果 KCL 无法从工作程序收集 CPU 利用率指标，KCL 将回退到使用每个工作程序的吞吐量来分配租约，并在队列中的工作程序之间平衡负载。有关更多信息，请参阅 [KCL 如何向工作程序分配租约并平衡负载](kcl-dynamoDB.md#kcl-assign-leases)。

## 安装并添加依赖项
<a name="develop-kcl-consumers-java-installation"></a>

如果您使用的是 Maven，请将以下依赖项添加到您的 `pom.xml` 文件中。确保将 3.x.x 替换为最新的 KCL 版本。

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

如果使用 Gradle，请在 `build.gradle` 文件中添加以下信息。确保将 3.x.x 替换为最新的 KCL 版本。

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

可以在 [Maven Central 存储库](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client)中查看最新版本的 KCL。

## 实现消费端
<a name="develop-kcl-consumers-java-implemetation"></a>

KCL 消费端应用程序包含以下关键组件：

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [调度器](#implementation-scheduler)
+ [主消费端应用程序](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor 是处理 Kinesis 数据流记录的业务逻辑所在的核心组件。它定义了应用程序如何处理从 Kinesis 流接收的数据。

主要职责：
+ 初始化分片的处理
+ 处理来自 Kinesis 流的批量记录
+ 关闭分片的处理（例如，在分片拆分或合并，或者将租约移交给另一台主机时）
+ 处理检查点操作以跟踪进度

以下示例演示如何实施。

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

下面详细说明了本例中使用的各种方法：

**初始化（初InitializationInput始化输入）**
+ 目的：为处理记录设置任何必要的资源或状态。
+ 调用时间：当 KCL 为该记录处理器分配分片时，调用一次。
+ 关键点：
  + `initializationInput.shardId()`：此处理器将要处理的分片的 ID。
  + `initializationInput.extendedSequenceNumber()`：开始处理的序列号。

**流程记录 () ProcessRecordsInput processRecordsInput**
+ 目的：处理传入的记录，可选择处理检查点进度。
+ 调用时间：只要记录处理器持有分片的租约时，就反复调用。
+ 关键点：
  + `processRecordsInput.records()`：要处理的记录列表。
  + `processRecordsInput.checkpointer()`：用于进度的检查点操作。
  + 确保在处理过程中处理了所有异常，以防止 KCL 出现故障。
  + 该方法应该具有幂等性，因为在某些情况下，同一条记录可能会处理多次，例如在工作程序意外崩溃或重启之前尚未进行检查点操作的数据。
  + 在进行检查点操作之前，务必刷新任何缓存数据，以确保数据一致性。

**LeaseLost () LeaseLostInput leaseLostInput**
+ 目的：清理用于处理此分片的所有特定资源。
+ 调用时间：当其他调度器接管此分片的租约时。
+ 关键点：
  + 该方法不允许进行检查点操作。

**ShardEnded () ShardEndedInput shardEndedInput**
+ 目的：完成此分片的处理并进行检查点操作。
+ 调用时间：当分片拆分或合并时，表示该分片的所有数据都已处理完毕。
+ 关键点：
  + `shardEndedInput.checkpointer()`：用于执行最终的检查点操作。
  + 该方法必须进行检查点操作才能完成处理。
  + 若此处未刷新数据和进行检查点操作，可能会导致分片重新打开时出现数据丢失或重复处理。

**已请求关机 () ShutdownRequestedInput shutdownRequestedInput**
+ 目的：在 KCL 关闭时进行检查点操作并清理资源。
+ 调用时间：当 KCL 关闭时，例如，在应用程序终止时。
+ 关键点：
  + `shutdownRequestedInput.checkpointer()`：用于在关闭前执行检查点操作。
  + 确保在该方法中进行了检查点操作，以便在应用程序停止之前保存进度。
  + 若此处未刷新数据和进行检查点操作，可能会导致在应用程序重新启动时出现数据丢失或重新处理记录。

**重要**  
KCL 3.x 通过在前一个工作程序关闭前进行检查点操作，确保在租约从一个工作程序移交给另一个工作程序时减少数据重复处理。如果未在 `shutdownRequested()` 方法中实现检查点操作逻辑，就无法体验到这一好处。请确保已在 `shutdownRequested()` 方法中实现了检查点操作逻辑。

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory 负责创建新 RecordProcessor实例。KCL 使用此工厂 RecordProcessor 为应用程序需要处理的每个分片创建一个新分片。

主要职责：
+ 按需创建新 RecordProcessor 实例
+ 确保每个都已 RecordProcessor 正确初始化

以下是一个实施示例：

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

在此示例中， SampleRecordProcessor 每次调用 shardRecordProcessor () 时，工厂都会创建一个新的。您可以进行扩展以添加任何必要的初始化逻辑。

### 调度器
<a name="implementation-scheduler"></a>

调度器是一个协调 KCL 应用程序所有活动的高级组件。调度器负责数据处理的总体编排。

主要职责：
+ 管理生命周期 RecordProcessors
+ 处理分片的租约管理
+ 协调检查点操作
+ 在应用程序的多个工作程序之间平衡分片处理负载
+ 处理正常关闭和应用程序终止信号

调度器通常在主应用程序中创建和启动。您可以在下一节“主消费端应用程序”中查看调度器的实现示例。

### 主消费端应用程序
<a name="implementation-main"></a>

主消费端应用程序将所有组件联系在一起。它负责设置 KCL 消费端、创建必要的客户端、配置调度器和管理应用程序的生命周期。

主要职责：
+ 设置 AWS 服务客户端（Kinesis、DynamoDB 等） CloudWatch
+ 配置 KCL 应用程序
+ 创建并启动调度器
+ 处理应用程序关闭

以下是一个实施示例：

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 默认情况下，KCL 会创建一个具有专用吞吐量的增强型扇出型（EFO）消费端。有关增强扇出功能的更多信息，请参阅[开发具有专用吞吐量的增强扇出型消费端](enhanced-consumers.md)。如果消费端少于 2 个，或者不需要低于 200 毫秒的读取传播延迟，则必须在调度器对象中设置以下配置来使用共享吞吐量消费端：

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

以下代码是一个创建使用共享吞吐量消费端的调度器对象的示例：

**进口**：

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**代码**：

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# 使用非 Java 语言通过 KCL 开发消费端
<a name="develop-kcl-consumers-non-java"></a>

本节介绍如何在 Python、Node.js、.NET 和 Ruby 中实现使用 Kinesis Client Library（KCL）的消费端。

KCL 属于 Java 库。使用名为 `MultiLangDaemon` 的多语言接口提供对 Java 以外语言的支持。此进程守护程序基于 Java，当您使用 Java 以外语言的 KCL 时，该程序会在后台运行。因此，如果您安装了适用于非 Java 语言的 KCL 并完全在非 Java 语言中编写消费端应用程序，则由于 `MultiLangDaemon`，您仍需要在您的系统中安装 Java。此外，`MultiLangDaemon` 存在部分默认设置，您可能需要根据自己的使用案例自定义此类设置（例如所连接到的 AWS 区域）。有关 `MultiLangDaemon` on 的更多信息 GitHub，请参阅 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)。

虽然各语言的核心概念一致，但也有一些针对具体语言的注意事项和实现。有关 KCL 消费端开发的核心概念，请参阅[使用 Java 通过 KCL 开发消费端](develop-kcl-consumers-java.md)。有关如何在 Python、Node.js、.NET 和 Ruby 中开发 KCL 使用者的更多详细信息以及最新更新，请参阅以下 GitHub 存储库：
+ Python：[amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js：[amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET：[amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby：[amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**重要**  
如果使用的是 JDK 8，请勿使用以下非 Java KCL 库版本。这些版本包含与 JDK 8 不兼容的依赖项（logback）。  
KCL Python 3.0.2 和 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
使用 JDK 8 时，我们建议使用在这些受影响版本之前或之后发布的版本。

# 使用 KCL 进行多流处理
<a name="kcl-multi-stream"></a>

本节介绍了 KCL 中所需的更改，这些更改让您能够创建可同时处理多个数据流的 KCL 消费端应用程序。
**重要**  
只有 KCL 2.3 或更高版本才支持多流处理功能。
使用非 Java 语言编写的通过 `multilangdaemon` 运行的 KCL 消费端*不*支持多流处理功能。
任何版本的 KCL 1.x 都*不*支持多流处理功能。
+ **MultistreamTracker 接口**
  + 要构建可以同时处理多个流的使用者应用程序，必须实现一个名为的新接口[MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java)。此接口包括返回要由 KCL 消费端应用程序处理的数据流及其配置列表的 `streamConfigList` 方法。请注意，正在处理的数据流可以在消费端应用程序运行时进行更改。KCL 会定期调用 `streamConfigList` 来了解要处理的数据流的变化。
  + `streamConfigList`填充[StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)列表。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + `StreamIdentifier` 和 `InitialPositionInStreamExtended` 是必填字段，`consumerArn` 是选填字段。只有在使用 KCL 实现增强型扇出消费端应用程序时，才必须提供 `consumerArn`。
  + 有关的更多信息`StreamIdentifier`，请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). 要创建 `StreamIdentifier`，我们建议您从 KCL 2.5.0 或更高版本提供的 `streamArn` 和 `streamCreationEpoch` 创建一个多流实例。在不支持 `streamArm` 的 KCL v2.3 和 v2.4 中，请使用格式 `account-id:StreamName:streamCreationTimestamp` 创建一个多流实例。从下一个主要版本开始，此格式将弃用且不再受支持。
  +  MultistreamTracker 还包括删除租赁表中旧直播租约的策略 (formerStreamsLeasesDeletionStrategy)。请注意，在消费端应用程序运行时无法更改策略。欲了解更多信息，请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0 .java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)。
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)是一个应用程序范围的类，可用于指定在构建 KCL 2.x 或更高版本的 KCL 使用者应用程序时要使用的所有 KCL 配置设置。 `ConfigsBuilder`类现在支持该`MultistreamTracker`接口。你可以用一个数据流的名称进行初始化 ConfigsBuilder 以使用来自以下内容的记录： 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

或者，`MultiStreamTracker`如果要实现同时处理多个流的 KCL 使用者应用程序，也可以使用进行初始化 ConfigsBuilder 。

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ 通过为您的 KCL 消费端应用程序实现多流支持功能，应用程序租约表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。
+ 在为 KCL 消费端应用程序实现多流支持功能后，leaseKey 采用以下结构：`account-id:StreamName:streamCreationTimestamp:ShardId`。例如 `111111111:multiStreamTest-1:12345:shardId-000000000336`。

**重要**  
当现有 KCL 消费端应用程序配置为仅处理一个数据流时，`leaseKey`（租约表的分区键）就是分片 ID。如果您将此现有 KCL 消费端应用程序重新配置为处理多个数据流，则会破坏租约表，因为 `leaseKey` 必须采用如下结构才能支持多流功能：`account-id:StreamName:StreamCreationTimestamp:ShardId`。

# 在 KC AWS Glue L 中使用架构注册表
<a name="kcl-glue-schema"></a>

你可以将 Kinesis Data Streams 与架构注册表集成 AWS Glue 。 AWS Glue 架构注册表允许您集中发现、控制和演变架构，同时确保生成的数据由注册架构持续验证。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。借助 AWS Glue Schema 注册表，您可以改善流媒体应用程序中的 end-to-end数据质量和数据治理。有关更多信息，请参阅 [AWS Glue 架构注册表](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。设置此集成的方法之一是使用适用于 Java 的 KCL。

**重要**  
AWS Glue 只有 KCL 2.3 或更高版本支持 Kinesis Data Streams 的架构注册表集成。
AWS Glue 使用运行的非 Java 语言编写的 KCL 使用者*不*支持 Kinesis Data Streams 的架构注册表集成。`multilangdaemon`
AWS Glue *任何版本的 KCL 1.x 都不支持 Kinesis Data Streams 的架构注册表集成。*

有关如何使用 KCL 设置 Kinesis 数据流与 AWS Glue 架构注册表集成的详细说明，请参阅 “用[例：将 Amazon Kinesis 数据流与架构注册表集成” 中的 “使用 KPL/KCL 库与数据](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)交互” 部分。 AWS Glue 

# KCL 消费端应用程序所必需的 IAM 权限
<a name="kcl-iam-permissions"></a>

 必须向与 KCL 消费端应用程序有关的 IAM 角色或用户添加以下权限。

 的安全最佳实践 AWS 要求使用细粒度的权限来控制对不同资源的访问权限。 AWS Identity and Access Management (IAM) 允许您在中管理用户和用户权限 AWS。IAM policy 明确列出了允许的操作以及这些操作适用于的资源。

下表显示了 KCL 消费端应用程序通常需要的最低 IAM 权限：


**KCL 消费端应用程序的最低 IAM 权限**  

| 服务 | 操作 | 资源 (ARNs) | 用途 | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  KCL 应用程序从中处理数据的 Kinesis 数据流。`arn:aws:kinesis:region:account:stream/StreamName`  |  在尝试读取记录前，消费端会检查数据流是否存在，数据流是否处于活动状态，以及分片是否包含在数据流中。 将消费端注册到分片。  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | KCL 应用程序从中处理数据的 Kinesis 数据流。`arn:aws:kinesis:region:account:stream/StreamName` |  从分片读取记录。  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  KCL 应用程序从中处理数据的 Kinesis 数据流。只有在使用增强扇出型（EFO）消费端时才添加此操作。 `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  为增强型扇出（EFO）消费端订阅分片。  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  租约表（KCL 创建的 DynamoDB 中的元数据表）。 `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  KCL 需要执行这些操作才能管理 DynamoDB 中创建的租约表。  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  KCL 创建的工作程序指标和协调器状态表（DynamoDB 中的元数据表）。 `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  KCL 需要这些操作才能管理 DynamoDB 中的工作程序指标和协调器状态元数据表。  | 
| Amazon DynamoDB | `Query` |  租约表上的全局二级索引。 `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  KCL 需要执行此操作才能读取 DynamoDB 中创建的租约表的全局二级索引。  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  上传指标对于监控应用程序非常有用。 CloudWatch 之所以使用星号 (\$1)，是因为没有用于调用`PutMetricData`操作 CloudWatch 的特定资源。  | 

**注意**  
将中的 “区域”、“账户”、“” 和 “KCLApplication名称” 分别替换为您自己的 AWS 账户 号码 AWS 区域、Kinesis 数据流名称和 KCL 应用程序名称。StreamName ARNsKCL 3.x 在 DynamoDB 中又创建了两个元数据表。有关 KCL 创建的 DynamoDB 元数据表的详细信息，请参阅[KCL 中的 DynamoDB 元数据表和负载均衡](kcl-dynamoDB.md)。如果使用配置来自定义 KCL 创建的元数据表的名称，请使用这些指定的表名称而不是 KCL 应用程序名称。

下面是 KCL 消费端应用程序的策略文档示例。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

在使用该策略示例之前，检查以下项：
+ 将区域替换为你的 AWS 区域 （例如 us-east-1）。
+ 将账号\$1ID替换为你的 AWS 账户 账号。
+ 将 STREAM\$1NAME 替换为 Kinesis 数据流的名称。
+ 使用 KCL 时，请将 CONSUMER\$1NAME 替换为消费端的名称，这通常是应用程序的名称。
+ 将 KCL\$1APPLICATION\$1NAME 替换为 KCL 应用程序的名称。

# KCL 配置
<a name="kcl-configuration"></a>

您可以设置配置属性来自定义 Kinesis Client Library 的功能，以满足具体要求。下表列明了配置属性和类。

**重要**  
在 KCL 3.x 中，负载均衡算法的目标是在各工作程序之间实现均匀的 CPU 使用率，而不是使各个工作程序的租约数量相等。如果 `maxLeasesForWorker` 设置得太低，可能会限制 KCL 有效平衡工作负载的能力。如果使用 `maxLeasesForWorker` 配置，可考虑增加其值以实现最佳的负载分布。


**本表显示了 KCL 的配置属性**  

| 配置属性 | 配置类 | 说明 | 默认 值 | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | 此 KCL 应用程序的名称。用作 tableName 和 consumerName 的默认名称。 | 不适用 | 
| tableName | ConfigsBuilder |  允许覆盖用于 Amazon DynamoDB 租赁表的表名称。  | 不适用 | 
| streamName | ConfigsBuilder |  此应用程序从其中处理记录的流的名称。  | 不适用 | 
| workerIdentifier | ConfigsBuilder |  表示应用程序处理器的这种实例化的唯一标识符。此值必须唯一。  | 不适用 | 
| failoverTimeMillis | LeaseManagementConfig |  在您可以将租赁所有者视为已失败之前必须经过的毫秒数。对于拥有大量分片的应用程序，可以将分片数设置为更高的数字，以减少跟踪租约所需的 DynamoDB IOPS 数。  | 10000（10 秒） | 
| shardSyncIntervalMillis | LeaseManagementConfig |  分片同步调用之间的时间。  | 60000（60 秒） | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  如果设置，只要子租赁已开始处理，即可删除租赁。  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  如果设置，将忽略具有打开的分片的子分片。这主要适用于 DynamoDB Streams。  | FALSE | 
| maxLeasesForWorker | LeaseManagementConfig |  单个工作程序应接受的最大租约数。如果设置太低，则在工作程序无法处理所有分片时可能会导致数据丢失，并造成工作程序之间的租约分配不够理想。在进行配置时，应考虑总分片数、工作程序数和工作程序的处理能力。  | 无限制 | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  控制租赁续订线程池的大小。您的应用程序可以容纳的租赁越多，此池应该就越大。  | 20 | 
| billingMode | LeaseManagementConfig |  确定 DynamoDB 中创建的租约表的容量模式。容量模式有两个选项：按需模式 (PAY\$1PER\$1REQUEST) 和预置模式。我们建议使用默认的按需模式设置，因为这种模式可以自动扩缩以适应工作负载，而无需进行容量规划。  | PAY\$1PER\$1REQUEST（按需模式） | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | 如果 Kinesis Client Library 需要使用预置的容量模式创建新的 DynamoDB 租约表，DynamoDB 将读取使用的容量。如果在 billingMode 配置中使用默认的按需容量模式，可以忽略上述配置。 | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | 如果 Kinesis Client Library 需要创建新的 DynamoDB 租约表，DynamoDB 将读取使用的容量。如果在 billingMode 配置中使用默认的按需容量模式，可以忽略上述配置。 | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  应用程序应在流中开始的初始位置。此值仅在创建初始租赁时使用。  |  InitialPositionInStream.trim\$1HORIZ  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  用于确定负载均衡算法何时应考虑在工作程序之间重新分配分片的一个百分比值。 这是 KCL 3.x 中引入的新配置。  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  用于抑制单次再平衡操作中将从超载工作程序转移的负载量的一个百分比值。 这是 KCL 3.x 中引入的新配置。  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  确定是否仍需要从超载工作程序获得额外的租约，即使这会导致占用的总租约吞吐量超过所需的吞吐量。 这是 KCL 3.x 中引入的新配置。  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  确定在重新分配租约和负载均衡时，KCL 是否应忽略工作程序的资源指标（例如 CPU 利用率）。如果要阻止 KCL 根据 CPU 利用率进行负载均衡，则将其设置为 TRUE。 这是 KCL 3.x 中引入的新配置。  | FALSE | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  在租约分配期间分配给工作程序的最大吞吐量。 这是 KCL 3.x 中引入的新配置。  | 无限制 | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  控制工作程序之间的租约移交行为。设置为 true 时，KCL 将尝试通过在将租约移交给其他工作人员之前让分片 RecordProcessor 有足够的时间完成处理来优雅地转移租约。这有助于确保数据完整性和平稳过渡，但可能会增加移交时间。 如果设置为 false，则租约将立即移交，无需等待优雅 RecordProcessor 地关闭。这可以加快移交速度，但可能存在处理不完全的风险。 注意：检查点必须在的 shutdownRequested () 方法中实现， RecordProcessor 才能从优雅的租赁移交功能中受益。 这是 KCL 3.x 中引入的新配置。  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  指定在强制将租约转让给下一个所有者之前，等待当前分片正常关闭的最短时间（ RecordProcessor 以毫秒为单位）。 如果 processRecords 方法的典型运行时间比默认值长，可考虑增加此设置。这样可以确保在租赁转让发生之前 RecordProcessor 有足够的时间完成其处理。 这是 KCL 3.x 中引入的新配置。  | 30000（30 秒） | 
| maxRecords | PollingConfig |  允许设置 Kinesis 返回的最大记录数。  | 10000 | 
| retryGetRecordsInSeconds | PollingConfig |  配置 GetRecords 尝试失败之间的延迟。  | 无 | 
| maxGetRecordsThreadPool | PollingConfig |  使用的线程池大小 GetRecords。  | 无 | 
| idleTimeBetweenReadsInMillis | PollingConfig |  确定 KCL 在两次 GetRecords 调用轮询数据流数据之间等待多长时间。单位为毫秒。  | 1500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  如果设置，即使 Kinesis 中未提供任何记录，也会调用记录处理器。  | FALSE | 
| parentShardPollIntervalMillis | CoordinatorConfig |  记录处理器应轮询多少时间才能查看是否已完成父分片。单位为毫秒。  | 10000（10 秒） | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  如果租赁表包含现有租赁，请禁用同步的分片数据。  |  FALSE  | 
| shardPrioritization | CoordinatorConfig |  要使用的分片优先级。  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  确定应用程序将运行在哪个 KCL 版本兼容模式下。此配置仅适用于从之前的 KCL 版本进行迁移的情况。迁移至 3.x 时，需要将该配置设置为 `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`。完成迁移后，即可删除此配置。  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  等待重试 KCL 失败任务的时间。单位为毫秒。  | 500（0.5 秒） | 
| logWarningForTaskAfterMillis | LifecycleConfig |  任务尚未完成的情况下在记录警告之前要等待的时长。  | 无 | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 发生故障时在调用 ListShards 之间要等待的时间（以毫秒为单位）。单位为毫秒。 | 1,500（1.5 秒） | 
| maxListShardsRetryAttempts | RetrievalConfig | ListShards 在放弃之前重试的最长时间。 | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  指定在发布指标之前缓冲指标的最大持续时间（以毫秒为单位）。 CloudWatch  | 10000（10 秒） | 
| metricsMaxQueueSize | MetricsConfig |  指定发布到之前要缓冲的最大指标数 CloudWatch。  | 10000 | 
| metricsLevel | MetricsConfig |  指定要启用和发布的 CloudWatch 指标的粒度级别。 可能的值：NONE、SUMMARY、DETAILED。  |  MetricsLevel。详细  | 
| metricsEnabledDimensions | MetricsConfig |  控制 CloudWatch 指标允许的维度。  | 所有维度 | 

**KCL 3.x 中停用的配置**

以下配置属性在 KCL 3.x 中已停用：


**下表显示了 KCL 3.x 已停用的配置属性**  

| 配置属性 | 配置类 | 说明 | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  应用程序一次应尝试窃取的最大租赁数量。KCL 3.x 会忽略此配置，并根据工作程序的资源利用率重新分配租约。  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  控制工作程序是否应优先考虑已过期的租约（租约未续订的时间达故障转移时间的 3 倍）和新的分片租约，而不管目标租约数如何，但仍需遵守最大租约限制。KCL 3.x 会忽略此配置，并始终将过期租约分布到各个工作程序中。  | 

**重要**  
在从之前的 KCL 版本迁移至 KCL 3.x 时，仍必须拥有停用的配置属性。在迁移过程中，KCL 工作程序首先从 KCL 2.x 兼容模式启动，然后在检测到应用程序的所有 KCL 工作程序做好运行 KCL 3.x 的准备时切换到 KCL 3.x 功能模式。当 KCL 工作程序运行 KCL 2.x 兼容模式时，需要这些停用的配置。

# KCL 版本生命周期策略
<a name="kcl-version-lifecycle-policy"></a>

本主题概述了 Amazon Kinesis 客户端库 (KCL) 的版本生命周期策略。 AWS 定期为 KCL 版本提供新版本，以支持新功能和增强功能、错误修复、安全补丁和依赖项更新。我们建议您继续 up-to-date使用 KCL 版本，以了解最新功能、安全更新和底层依赖关系。我们**不**建议继续使用不受支持的 KCL 版本。

主要 KCL 版本的生命周期包括以下三个阶段：
+ **正式发布 (GA)**-在此阶段，将完全支持主要版本。 AWS 提供常规的次要版本和补丁版本，其中包括对 Kinesis Data Streams 的新功能或 API 更新的支持，以及错误和安全修复。
+ **维护模式**-将补丁版本的发布 AWS 限制为仅解决关键错误修复和安全问题。主要版本不会收到有关新功能或 Kinesis Data APIs Streams 的更新。
+ **E nd-of-support** — 主版本将不再接收更新或发布。之前发布的版本将继续通过公共包管理器提供，并且代码将保持不变 GitHub。用户可以自行决定 end-of-support是否使用已达到的版本。建议您升级到最新的主要版本。


| 主要版本 | 当前阶段 | 发行日期 | 维护模式日期 | End-of-support 日期 | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | 维护模式 | 2013-12-19 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | 正式发布 | 2018-08-02 | -- | -- | 
| KCL 3.x | 正式发布 | 2024-11-06 | -- | -- | 

# 从之前的 KCL 版本迁移
<a name="kcl-migration-previous-versions"></a>

本主题介绍如何从之前 Kinesis Client Library（KCL）版本进行迁移。

## KCL 3.0 中有何新功能？
<a name="kcl-migration-new-3-0"></a>

与之前的版本相比，Kinesis Client Library（KCL）3.0 推出了多项主要改进：
+  通过自动将工作从消费端应用程序队列中过度利用的工作程序重新分配给利用不足的工作程序，从而降低消费端应用程序的计算成本。这种新的负载均衡算法确保在各工作程序之间实现均匀的 CPU 利用率分布，并且无需过度配置工作程序。
+  通过优化租约表中的读取操作，降低了与 KCL 相关的 DynamoDB 成本。
+ 支持当前工作程序完成对已处理记录的检查点操作，从而最大限度地减少租约重新分配给其他工作程序时对数据的再处理。
+  它 AWS SDK for Java 2.x 用于改进性能和安全功能，完全消除了对 适用于 Java 的 AWS SDK 1.x 的依赖。

有关更多信息，请参阅 [KCL 3.0 发行说明](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md)。

**Topics**
+ [KCL 3.0 中有何新功能？](#kcl-migration-new-3-0)
+ [从 KCL 2.x 迁移至 KCL 3.x](kcl-migration-from-2-3.md)
+ [回滚至先前 KCL 版本](kcl-migration-rollback.md)
+ [回滚后前滚到 KCL 3.x](kcl-migration-rollforward.md)
+ [使用预置容量模式的租约表的最佳实践](kcl-migration-lease-table.md)
+ [从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)

# 从 KCL 2.x 迁移至 KCL 3.x
<a name="kcl-migration-from-2-3"></a>

本主题提供了将您的消费者从 KCL 2.x 迁移到 KCL 3.x 的 step-by-step说明。KCL 3.x 支持 KCL 2.x 消费端进行就地迁移。在滚动迁移工作程序时，可继续使用 Kinesis 数据流中的数据。

**重要**  
KCL 3.x 的接口和方法与 KCL 2.x 保持一致。因此，在迁移期间无需更新记录处理代码。但必须设置正确的配置，并检查迁移所需的步骤。我们强烈建议遵循以下迁移步骤，以获得顺畅的迁移体验。

## 步骤 1：先决条件
<a name="kcl-migration-from-2-3-prerequisites"></a>

开始使用 KCL 3.x 之前，请确保已具备以下条件：
+ Java Development Kit（JDK）8 或更高版本
+ 适用于 Java 的 AWS SDK 2.x
+ 用于依赖项管理的 Maven 或 Gradle

**重要**  
不要在 KCL 3.x 中使用 2.27.19 到 2.27.23 适用于 Java 的 AWS SDK 版本。这些版本出现的问题会导致使用 KCL 的 DynamoDB 时出现相关异常错误。我们建议您使用 2.28.0 或更高 适用于 Java 的 AWS SDK 版本来避免此问题。

## 步骤 2：添加依赖项
<a name="kcl-migration-from-2-3-dependencies"></a>

如果您使用的是 Maven，请将以下依赖项添加到您的 `pom.xml` 文件中。确保将 3.x.x 替换为最新的 KCL 版本。

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

如果使用 Gradle，请在 `build.gradle` 文件中添加以下信息。确保将 3.x.x 替换为最新的 KCL 版本。

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

可以在 [Maven Central 存储库](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client)中查看最新版本的 KCL。

## 步骤 3：设置迁移相关配置
<a name="kcl-migration-from-2-3-configuration"></a>

要从 KCL 2.x 迁移至 KCL 3.x，就必须设置以下配置参数：
+ CoordinatorConfig。 clientVersionConfig：此配置决定了应用程序将在哪种 KCL 版本兼容模式下运行。从 KCL 2.x 迁移至 3.x 时，需要将该配置设置为 `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`。要设置此配置，请在创建调度器对象时添加以下行：

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

下面是如何设置 `CoordinatorConfig.clientVersionConfig` 从 KCL 2.x 迁移至 3.x 的示例。您可以按照自己的具体需求，根据需要调整其他配置：

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

重要的是，由于 KCL 2.x 和 3.x 使用的负载均衡算法不同，消费端应用程序中的所有工作程序在给定时间均使用相同的负载均衡算法。如果使用不同的负载均衡算法运行工作程序，可能会导致负载分配不够理想，因为这两种算法是独立运行的。

借助于这种 KCL 2.x 兼容性设置，KCL 3.x 应用程序可以在兼容 KCL 2.x 的模式下运行，并使用 KCL 2.x 的负载均衡算法，直至消费端应用程序中的所有工作程序都升级到 KCL 3.x。迁移完成后，KCL 会自动切换到完整版 KCL 3.x 功能模式，并开始为所有正在运行的工作程序使用新的 KCL 3.x 负载均衡算法。

**重要**  
如果未使用 `ConfigsBuilder`，而是创建 `LeaseManagementConfig` 对象来设置配置，则必须在 KCL 3.x 或更高版本中再添加一个称为 `applicationName` 的参数。有关详细信息，请参阅[ LeaseManagementConfig 构造函数的编译错误](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig)。我们建议使用 `ConfigsBuilder` 来设置 KCL 配置。`ConfigsBuilder` 提供了一种更加灵活、更易于维护的方式来配置 KCL 应用程序。

## 第 4 步：遵循 shutdownRequested() 方法实现的最佳实践
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x 中推出了一项名为“*优雅移交租约*”的功能，在租约重新分配过程中将租约移交给其他工作程序时，该功能可最大程度减少对数据的再处理。其实现方法是在租约移交之前，对租约表中最后处理的序列号进行检查点操作。为确保优雅移交租约功能的正常运行，必须保证在 `RecordProcessor` 类的 `shutdownRequested` 方法中调用 `checkpointer` 对象。如果在 `shutdownRequested` 方法中未调用 `checkpointer` 对象，可以按照以下示例所示进行实现。

**重要**  
以下实现示例是优雅移交租约的最低要求。需要时可以进行扩展，以添加与检查点相关的其他逻辑。如果当前正在执行任何异步处理，请确保在调用检查点操作之前已对所有传送到下游的记录进行了处理。
虽然优雅移交租约可以显著减小租约转移期间进行数据再处理的可能性，但不能完全消除这种可能性。为保持数据的完整性和一致性，请将下游消费端应用程序设计为具有幂等性。这意味着消费端应用程序应该能够处理潜在的记录重复处理，而不会对整个系统带来不利影响。

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## 步骤 5：检查 KCL 3.x 收集工作程序指标的先决条件
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x 收集 CPU 利用率指标（例如工作程序的 CPU 利用率），以期均匀地平衡各工作程序之间的负载。消费端应用程序的工作程序可运行在 Amazon EC2、Amazon EKS、Amazon EKS 或 AWS Fargate上。只有在满足以下先决条件时，KCL 3.x 才能从工作程序收集 CPU 利用率指标：

 **Amazon Elastic Compute Cloud（Amazon EC2）**
+ 操作系统必须是 Linux 操作系统。
+ 您必须[IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)在 EC2 实例中启用。

 **Amazon EC2 上的 Amazon Elastic Container Service（Amazon ECS）**
+ 操作系统必须是 Linux 操作系统。
+ 必须启用 [ECS 任务元数据端点版本 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html)。
+ Amazon ECS 容器代理版本必须为 1.39.0 或更高版本。

 **Amazon ECS 已开启 AWS Fargate**
+ 必须启用 [Fargate 任务元数据端点版本 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html)。如果使用的是 Fargate 平台版本 1.4.0 或更高版本，则默认启用此功能。
+ Fargate（平台版本 1.4.0 或更高版本）。

 **Amazon EC2 上的 Amazon Elastic Kubernetes Service（Amazon EKS）** 
+ 操作系统必须是 Linux 操作系统。

 **亚马逊 EKS 开启 AWS Fargate**
+ Fargate（平台版本 1.3.0 或更高版本）。

**重要**  
如果 KCL 3.x 因为未满足先决条件而无法从工作程序收集 CPU 利用率指标，它将根据每个租约的吞吐量级别重新平衡负载。这种后备再平衡机制可确保所有工作程序都能从分配给各工作程序的租约中获得类似级别的总吞吐量。有关更多信息，请参阅 [KCL 如何向工作程序分配租约并平衡负载](kcl-dynamoDB.md#kcl-assign-leases)。

## 第 6 步：更新 KCL 3.x 的 IAM 权限
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

必须向与 KCL 3.x 消费端应用程序有关的 IAM 角色或策略添加以下权限。这涉及对 KCL 应用程序使用的现有 IAM 策略进行更新。有关更多信息，请参阅 [KCL 消费端应用程序所必需的 IAM 权限](kcl-iam-permissions.md)。

**重要**  
您的现有 KCL 应用程序可能没有在 IAM 策略中添加以下 IAM 操作与资源，因为 KCL 2.x 不需要这些操作与资源。在运行 KCL 3.x 应用程序之前，请确保已添加这些操作与资源：  
动作：`UpdateTable`  
资源 (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName`
动作：`Query`  
资源 (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
操作：`CreateTable`、`DescribeTable`、`Scan`、`GetItem`、`PutItem`、`UpdateItem`、`DeleteItem`  
资源 (ARNs):`arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`, `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState` 
将中的 “区域”、“账户” 和 “KCLApplication名称” 分别替换为您自己的名称 AWS 区域、 AWS 账户 号码和 KCL 应用程序名称。 ARNs 如果使用配置来自定义 KCL 创建的元数据表的名称，请使用这些指定的表名称而不是 KCL 应用程序名称。

## 第 7 步：将 KCL 3.x 代码部署到工作程序
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

在设置了迁移所需的配置并完成之前所有的迁移清单之后，就可以构建代码并将其部署到工作程序中。

**注意**  
如果您看到构造函数出现编译错误，请参阅`LeaseManagementConfig`构造函数的[编译错误以](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig)获取疑难解答信息。 LeaseManagementConfig 

## 步骤 8：完成迁移
<a name="kcl-migration-from-2-3-finish"></a>

在部署 KCL 3.x 代码期间，KCL 将继续使用来自 KCL 2.x 的租约分配算法。成功将 KCL 3.x 代码部署到所有工作程序后，KCL 会自动检测部署情况，并根据工作程序的资源利用率切换到新的租约分配算法。有关新租约分配算法的更多详细信息，请参阅[KCL 如何向工作程序分配租约并平衡负载](kcl-dynamoDB.md#kcl-assign-leases)。

在部署期间，您可以使用向其发送以下指标来 CloudWatch监控迁移过程。您可以监控 `Migration` 操作下的指标。所有指标均为 per-KCL-application指标，并设置为`SUMMARY`指标级别。如果 `CurrentState:3xWorker` 指标的 `Sum` 统计数据与 KCL 应用程序的工作程序总数一致，则表示已成功完成向 KCL 3.x 的迁移。

**重要**  
 在所有工作程序做好运行新租约分配算法的准备之后，KCL 至少需要 10 分钟才能切换到新算法。


**CloudWatch KCL 迁移过程的指标**  

| 指标 | 说明 | 
| --- | --- | 
| CurrentState:3xWorker |  成功迁移至 KCL 3.x 并运行新租约分配算法的 KCL 工作程序数量。如果此指标的 `Sum` 计数与工作程序总数一致，则表示已成功完成向 KCL 3.x 的迁移。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  迁移过程中在 KCL 2.x 兼容模式下运行的 KCL 工作程序数量。该指标若为非零值，表示迁移仍在进行中。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  迁移过程中遇到的异常数量。这些异常大多数是瞬时错误，KCL 3.x 会自动重试以完成迁移。如果发现永久性的 `Fault` 指标值，请查看迁移期间的日志，以进一步排除故障。如果问题仍然存在，请联系 支持。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  租约表上创建全局二级索引（GSI）的状态。该指标表示租约表上的 GSI 是否已创建，这是运行 KCL 3.x 的一个先决条件。其值为 0 或 1，其中 1 表示创建成功。在回滚状态下，不会发出该指标。再次向前滚动后，可以继续监控该指标。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  所有工作程序指标的发送状态。该指标表示是否所有工作程序都发出诸如 CPU 利用率之类的指标。其值为 0 或 1，其中 1 表示所有工作程序均已成功发出指标，并准备好使用新的租约分配算法。在回滚状态下，不会发出该指标。再次向前滚动后，可以继续监控该指标。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/kcl-migration-from-2-3.html)  | 

KCL 在迁移期间提供回滚至 2.x 兼容模式的能力。成功迁移至 KCL 3.x 后，如果不再需要回滚，我们建议移除 `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 的 `CoordinatorConfig.clientVersionConfig` 设置。移除该配置之后，就不会从 KCL 应用程序发出与迁移相关的指标。

**注意**  
我们建议在迁移期间和完成迁移后，监控应用程序的性能和稳定性一段时间。如果发现任何问题，可使用 [KCL 迁移工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)回滚工作程序以使用 KCL 2.x 兼容功能。

# 回滚至先前 KCL 版本
<a name="kcl-migration-rollback"></a>

本主题介绍将消费端回滚到先前版本的步骤。需要回滚时，可执行一个两步流程：

1. 运行 [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 重新部署以前的 KCL 版本代码（可选）。

## 步骤 1：运行 KCL 迁移工具
<a name="kcl-migration-rollback-tool"></a>

当需要回滚到先前 KCL 版本时，必须运行 KCL 迁移工具。KCL 迁移工具可完成两项重要任务：
+ 它在 DynamoDB 中的租约表上移除一个名为工作线程指标表的元数据表和全局二级索引。这两个构件由 KCL 3.x 创建，但在回滚到先前版本时并不需要。
+ 它使所有工作程序在与 KCL 2.x 兼容的模式下运行，并开始使用先前 KCL 版本中使用的负载平衡算法。如果 KCL 3.x 中的新负载均衡算法存在问题，这将立即缓解问题。

**重要**  
DynamoDB 中的协调器状态表必须存在，并且在迁移、回滚和前滚过程中不得删除。

**注意**  
重要的是，使用者应用程序中的所有工作线程在给定时间均使用相同的负载均衡算法。KCL 迁移工具可确保 KCL 3.x 使用者应用程序中的所有工作线程都切换到 KCL 2.x 兼容模式，以便在部署回滚到先前 KCL 版本期间，所有工作线程都运行相同的负载均衡算法。

您可以在 [KCL 存储库的脚本目录中下载 [KCL GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master) 迁移工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。可以从任何工作程序或具备以下必需权限的主机上运该脚本：写入协调器状态表、删除工作程序指标表以及更新租约表的权限。可参考 [KCL 消费端应用程序所必需的 IAM 权限](kcl-iam-permissions.md) 获取运行脚本所需的 IAM 权限。每个 KCL 应用程序只能运行该脚本一次。您可使用以下命令运行 KCL 迁移工具：

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**参数**
+ --region：`<region>`替换为你的。 AWS 区域
+ --application\$1name：如果您为 DynamoDB 元数据表（租约表、协调器状态表和工作线程指标表）使用默认名称，则需要此参数。如果您为这些表指定了自定义名称，则可以忽略此参数。将 `<applicationName>` 替换为实际的 KCL 应用程序名称。如果未提供自定义名称，该工具将使用此名称来派生默认表名称。
+ --lease\$1table\$1name（可选）：如果您在 KCL 配置中为租约表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 `leaseTableName` 替换为您为租约表指定的自定义表名称。
+ --coordinator\$1state\$1table\$1name（可选）：如果您在 KCL 配置中为协调器状态表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 `<coordinatorStateTableName>` 替换为您为协调器状态表指定的自定义表名称。
+ --worker\$1metrics\$1table\$1name（可选）：如果您在 KCL 配置中为工作线程指标表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 `<workerMetricsTableName>` 替换为您为工作线程指标表指定的自定义表名称。

## 步骤 2：使用先前 KCL 版本重新部署代码（可选）
<a name="kcl-migration-rollback-redeploy"></a>

 运行 KCL 迁移工具来进行回滚后，您将看到以下消息之一：
+ **消息 1：**“回滚已完成。您的 KCL 应用程序正在运行 KCL 2.x 兼容模式。如果您看不到回归缓解，请使用先前 KCL 版本部署代码，回滚到先前的应用程序二进制文件。”
  + **必需的操作：**这意味着您的工作人员正在 KCL 2.x 兼容模式下运行。如果仍有问题，请使用先前 KCL 版本将代码重新部署到工作程序。
+ **消息 2：**“回滚已完成。您的 KCL 应用程序正在运行 KCL 3.x 功能模式。您无需回滚到以前的应用程序二进制文件，除非在 5 分钟内看不到任何缓解该问题的措施。如果仍有问题，请使用先前 KCL 版本部署代码，以回滚到先前的应用程序二进制文件。”
  + **必需的操作：**这意味着您的工作人员在 KCL 3.x 模式下运行，KCL 迁移工具已将所有工作人员切换到兼容 KCL 2.x 的模式。如果问题得到解决，则无需使用之前的 KCL 版本重新部署代码。如果仍有问题，请使用先前 KCL 版本将代码重新部署到工作程序。

 

# 回滚后前滚到 KCL 3.x
<a name="kcl-migration-rollforward"></a>

本主题介绍在回滚后将消费端前滚到 KCL 3.x 的步骤。当您需要前滚时，必须完成一个由两步组成的过程：

1. 运行 [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 使用 KCL 3.x 部署代码。

## 步骤 1：运行 KCL 迁移工具
<a name="kcl-migration-rollback-tool"></a>

运行 KCL Migration Tool。具有以下命令的 KCL 迁移工具，用于前滚到 KCL 3.x：

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**参数**
+ --region：`<region>`替换为你的。 AWS 区域
+ --application\$1name：如果您为协调器状态表使用默认名称，则需要此参数。如果您已为协调器状态表指定了自定义名称，则可以忽略此参数。将 `<applicationName>` 替换为实际的 KCL 应用程序名称。如果未提供自定义名称，该工具将使用此名称来派生默认表名称。
+ --coordinator\$1state\$1table\$1name（可选）：如果您在 KCL 配置中为协调器状态表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 `<coordinatorStateTableName>` 替换为您为协调器状态表指定的自定义表名称。

在前滚模式下运行迁移工具后，KCL 会创建 KCL 3.x 所需的以下 DynamoDB 资源：
+ 租约表上的全局二级索引
+ 工作线程指标表

## 步骤 2：使用 KCL 3.x 部署代码
<a name="kcl-migration-rollback-redeploy"></a>

运行 KCL 迁移工具以进行前滚后，使用 KCL 3.x 将代码部署到工作线程。按照 [步骤 8：完成迁移](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) 完成迁移。

# 使用预置容量模式的租约表的最佳实践
<a name="kcl-migration-lease-table"></a>

如果 KCL 应用程序的租约表已切换至预置容量模式，KCL 3.x 会在采用预置计费模式的租约表上创建全局二级索引，租约表的读取容量单位（RCU）和写入容量单位（WCU）与基础租约表一致。创建全局二级索引时，我们建议在 DynamoDB 控制台中监控全局二级索引的实际使用情况，并根据需要调整容量单位。有关切换由 KCL 创建的 DynamoDB 元数据表容量模式的更详细指南，请参阅[KCL 创建的元数据表的 DynamoDB 容量模式](kcl-dynamoDB.md#kcl-capacity-mode)。

**注意**  
默认情况下，KCL 使用按需容量模式在租约表上创建元数据表，例如租约表、工作程序指标表和协调器状态表以及全局二级索引。我们建议您使用按需容量模式，以便根据您的使用量变化自动调整容量。

# 从 KCL 1.x 迁移到 KCL 3.x
<a name="kcl-migration-1-3"></a>

本主题说明如何将消费端从 KCL 1.x 迁移至 KCL 3.x。与 KCL 2.x 和 KCL 3.x 相比，KCL 1.x 使用不同的类和接口。必须先将记录处理器、记录处理器工厂和工作线程类迁移到 KCL 2.x/3.x 兼容格式，然后按照将 KCL 2.x 迁移到 KCL 3.x 的迁移步骤进行操作。可直接从 KCL 1.x 升级至 KCL 3.x。
+ **步骤 1：迁移记录处理器**

  按照[将消费端从 KCL 1.x 迁移至 KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) 页面中的[迁移记录处理器](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)部分进行操作。
+ **步骤 2：迁移记录处理器工厂**

  按照[将消费端从 KCL 1.x 迁移至 KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) 页面中的[迁移记录处理器工厂](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration)部分进行操作。
+ **步骤 3：迁移工作人员**

  按照[将消费端从 KCL 1.x 迁移至 KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) 页面中的[迁移工作程序](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration)部分进行操作。
+ **第 4 步：迁移 KCL 1.x 配置**

  按照[将消费端从 KCL 1.x 迁移至KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) 页面中的[配置 Amazon Kinesis 客户端](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration)部分进行操作。
+ **第 5 步：检查闲置时间删除和客户端配置移除情况**

  按照[将消费端从 KCL 1.x 迁移至 KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) 页面中的[闲置时间删除](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal)和[客户端配置移除](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals)部分进行操作。
+ **第 6 步：按照 KCL 2.x 到 KCL 3.x 迁移 step-by-step指南中的说明进行操作**

  遵循 [从 KCL 2.x 迁移至 KCL 3.x](kcl-migration-from-2-3.md) 页面上的说明完成迁移。如需回滚到之前 KCL 版本，或在回滚后向前滚到 KCL 3.x，请参阅[回滚至先前 KCL 版本](kcl-migration-rollback.md)和[回滚后前滚到 KCL 3.x](kcl-migration-rollforward.md)。

**重要**  
不要在 KCL 3.x 中使用 2.27.19 到 2.27.23 适用于 Java 的 AWS SDK 版本。这些版本出现的问题会导致使用 KCL 的 DynamoDB 时出现相关异常错误。我们建议您使用 2.28.0 或更高 适用于 Java 的 AWS SDK 版本来避免此问题。

# 先前的 KCL 版本文档
<a name="kcl-archive"></a>

以下主题已归档。要查看当前的 Kinesis Client Library 文档，请参阅[使用 Kinesis Client Library](kcl.md)。

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

**Topics**
+ [KCL 1.x 和 2.x 信息](shared-throughput-kcl-consumers.md)
+ [开发具有共享吞吐量的自定义消费端](shared-throughput-consumers.md)
+ [将消费端从 KCL 1.x 迁移到 KCL 2.x](kcl-migration.md)

# KCL 1.x 和 2.x 信息
<a name="shared-throughput-kcl-consumers"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

开发可以处理来自 KDS 数据流的数据的自定义消费端应用程序的一种方法，是使用 Kinesis Client Library（KCL）。

**Topics**
+ [关于 KCL（先前版本）](#shared-throughput-kcl-consumers-overview)
+ [KCL 先前版本](#shared-throughput-kcl-consumers-versions)
+ [KCL 概念（先前版本）](#shared-throughput-kcl-consumers-concepts)
+ [使用租约表跟踪 KCL 消费端应用程序处理的分片](#shared-throughput-kcl-consumers-leasetable)
+ [使用相同的适用于 Java 的 KCL 2.x 消费端应用程序处理多个数据流](#shared-throughput-kcl-multistream)
+ [将 KCL 与 AWS Glue 架构注册表一起使用](#shared-throughput-kcl-consumers-glue-schema-registry)

**注意**  
建议您根据使用场景将 KCL 1.x 和 KCL 2.x 升级到最新的 KCL 1.x 版本或 KCL 2.x 版本。KCL 1.x 和 KCL 2.x 会定期更新至最新版本，包括最新依赖项和安全补丁、错误修复以及向后兼容的新功能。有关更多信息，请参阅 [https://github.com/awslabs/amazon-kinesis-client/releases。](https://github.com/awslabs/amazon-kinesis-client/releases)

## 关于 KCL（先前版本）
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL 通过处理许多与分布式计算相关的复杂任务，帮助您使用和处理 Kinesis 数据流中的数据。这些任务包括跨多个消费端应用程序实例的负载平衡、对消费端应用程序实例故障的响应、已处理记录的检查点操作以及对重新分片的反应。KCL 负责所有这些子任务，让您可以将精力集中在编写自定义记录处理逻辑上。

KCL 不同于中提供的 Kinesis Data APIs Streams。 AWS SDKsKinesis Data APIs Streams 可帮助您管理 Kinesis Data Streams 的许多方面，包括创建流、重新分片以及放置和获取记录。KCL 围绕这些子任务提供了一个抽象层，让您可以专注于消费端应用程序的自定义数据处理逻辑工作。有关 Kinesis Data Streams API 的信息，请参阅 [Amazon Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html)。

**重要**  
KCL 属于 Java 库，使用名为 MultiLangDaemon 的多语言接口提供对 Java 以外语言的支持。此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。例如，如果您安装适用于 Python 的 KCL 并完全使用 Python 编写使用者应用程序，则仍然需要在系统上安装 Java，这是因为。 MultiLangDaemon此外， MultiLangDaemon 还有一些您可能需要根据自己的用例自定义的默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请参阅 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)。

KCL 充当记录处理逻辑和 Kinesis Data Streams 之间的中介。

## KCL 先前版本
<a name="shared-throughput-kcl-consumers-versions"></a>

目前，您可以使用以下任一支持的 KCL 版本来构建自定义消费端应用程序：
+ **KCL 1.x**

  有关更多信息，请参阅 [开发 KCL 1.x 消费端](developing-consumers-with-kcl.md)。
+ **KCL 2.x**

  有关更多信息，请参阅 [开发 KCL 2.x 消费端](developing-consumers-with-kcl-v2.md)。

您可以使用 KCL 1.x 或 KCL 2.x 来构建使用共享吞吐量的消费端应用程序。有关更多信息，请参阅 [使用 KCL 开发具有共享吞吐量的自定义消费端](custom-kcl-consumers.md)。

要构建使用专用吞吐量的消费端应用程序（增强型扇出消费端应用程序），只能使用 KCL 2.x。有关更多信息，请参阅 [开发具有专用吞吐量的增强扇出型消费端](enhanced-consumers.md)。

有关 KCL 1.x 和 KCL 2.x 之间差异的信息，以及如何从 KCL 1.x 迁移到 KCL 2.x 的说明，请参阅 [将消费端从 KCL 1.x 迁移到 KCL 2.x](kcl-migration.md)。

## KCL 概念（先前版本）
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **KCL 消费端应用程序** – 使用 KCL 自定义构建的应用程序，旨在读取和处理数据流中的记录。
+ **消费端应用程序实例** – KCL 消费端应用程序通常是分布式应用程序，即一个或多个应用程序实例同时运行，以便协调故障和以动态方式实现数据记录处理负载平衡。
+ **工作程序** – KCL 消费端应用程序实例用来开始处理数据的高级类。
**重要**  
每个 KCL 消费端应用程序实例都有一个工作程序。

  工作程序负责初始化和监督各种任务，包括同步分片和租约信息、跟踪分片分配以及处理来自分片的数据。工作程序向 KCL 提供使用者应用程序的配置信息，例如此 KCL 使用者应用程序要处理的数据记录的数据流的名称以及访问此数据流所需的 AWS 凭据。工作程序还会启动特定的 KCL 消费端应用程序实例，将数据记录从数据流传输到记录处理器。
**重要**  
在 KCL 1.x 中，这个类被称为**工作程序**。有关更多信息（这些是 Java KCL 存储库），请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java) java。在 KCL 2.x 中，这个类被称为**计划程序**。KCL 2.x 中计划程序的用途与 KCL 1.x 中工作程序的用途相同。[有关 KCL 2.x 中调度器类的更多信息，请参见 https://github.com/awslabs/ amazon-kinesis-client /.java。blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java)
+ **租约** – 定义工作程序和分片之间绑定的数据。分布式 KCL 消费端应用程序使用租约在一组工作程序中对数据记录进行分区。在任何指定时间，每个数据记录分片都通过由 **leaseKey** 变量标识的租约绑定到特定的工作程序。

  默认情况下，工作人员可以同时持有一份或多份租约（以 **maxLeasesForWorker** 变量的值为准）。
**重要**  
每个工作程序都将争相持有数据流中所有可用分片的所有可用租约。但是，无论何时，只有一个工作程序可以成功持有每份租约。

  例如，如果您的消费端应用程序实例 A 和工作程序 A 正在处理包含 4 个分片的数据流，则工作程序 A 可以同时持有分片 1、2、3 和 4 的租约。但是，如果您有 A 和 B 两个消费端应用程序实例，以及工作程序 A 和工作程序 B，并且这些实例正在处理包含 4 个分片的数据流，则工作程序 A 和工作程序 B 不能同时持有分片 1 的租约。一个工作程序持有特定分片的租约，直到该工作程序准备好停止处理该分片的数据记录或该工作程序失败为止。当一个工作程序停止持有租约时，另一个工作程序会接管并持有租约。

  有关更多信息（这些是 Java KCL 存储库），请参阅 KCL 1.x 的 [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) 和 KCL 2.x 的 [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java) java。
+ **租约表** – 唯一的 Amazon DynamoDB 表，用于跟踪 KDS 数据流中由 KCL 消费端应用程序的工作程序租赁和处理的分片。在 KCL 消费端应用程序运行时，租约表必须与数据流中的最新分片信息保持同步（在工作程序内部和所有工作程序之间）。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](#shared-throughput-kcl-consumers-leasetable)。
+ **记录处理器** – 定义 KCL 消费端应用程序如何处理从数据流中获取的数据的逻辑。在运行时，KCL 消费端应用程序实例会实例化一个工作程序，该工作程序会为持有租约的每个分片实例化一个记录处理器。

## 使用租约表跟踪 KCL 消费端应用程序处理的分片
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [什么是租约表](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [吞吐量](#shared-throughput-kcl-leasetable-throughput)
+ [租约表如何与 Kinesis 数据流中的分片同步](#shared-throughput-kcl-consumers-leasetable-sync)

### 什么是租约表
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

对于每个 Amazon Kinesis Data Streams 应用程序，KCL 都使用一份唯一的租约表（存储在 Amazon DynamoDB 表中）来跟踪 KDS 数据流中由 KCL 消费端应用程序的工作程序租赁和处理的分片。

**重要**  
KCL 使用消费端应用程序的名称来创建该消费端应用程序使用的租约表的名称，因此每个消费端应用程序的名称必须是唯一的。

您可在消费端应用程序运行的同时使用 [Amazon DynamoDB 控制台](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html)查看租约表。

如果应用程序启动时 KCL 消费端应用程序的租约表不存在，则其中一个工作程序会为此应用程序创建租约表。

**重要**  
 除开与 Kinesis Data Streams 本身关联的费用，您的账户将被收取与 DynamoDB 表关联的费用。

租约表中的每行表示您消费端应用程序正在处理的分片。如果 KCL 消费端应用程序仅处理一个数据流，则租约表的哈希键 `leaseKey` 就是分片 ID。如果您[使用相同的适用于 Java 的 KCL 2.x 消费端应用程序处理多个数据流](#shared-throughput-kcl-multistream)，则 leaseKey 的结构如下：`account-id:StreamName:streamCreationTimestamp:ShardId`。例如 `111111111:multiStreamTest-1:12345:shardId-000000000336`。

除了分片 ID 以外，每行还包含以下数据：
+ **checkpoint：**分片的最新检查点序号。此值在数据流的所有分片中都是唯一的。
+ **checkpointSubSequence数字：**使用 Kinesis Producer 库的聚合功能时，这是对**检查点**的扩展，用于跟踪 Kinesis 记录中的单个用户记录。
+ **leaseCounter：**用于租赁版本控制，这样工作程序可以检测其租赁已由其他工作程序获取。
+ **leaseKey：**租赁的唯一标识符。每份租约都是数据流中一个分片所特有的，一份由一个工作程序持有。
+ **leaseOwner：**持有此租赁的工作程序。
+ **ownerSwitchesSince检查点：**自上次写检查点以来，这份租约更换了多少次员工。
+ **parentShardId：**用于确保在子分片上开始处理之前，父分片已得到完全处理。这可以确保记录按照放入流中的相同顺序处理。
+ **hashrange：**`PeriodicShardSyncManager` 用来运行定期同步，以查找租约表中缺少的分片，并在需要时为分片创建租约。
**注意**  
从 KCL 1.14 和 KCL 2.3 开始的每个分片的租约表中都有此数据。有关租约和分片之间的 `PeriodicShardSyncManager` 和定期同步的更多信息，请参阅 [租约表如何与 Kinesis 数据流中的分片同步](#shared-throughput-kcl-consumers-leasetable-sync)。
+ **childshards：**`LeaseCleanupManager` 用来查看子分片的处理状态并决定是否可以从租约表中删除父分片。
**注意**  
从 KCL 1.14 和 KCL 2.3 开始的每个分片的租约表中都有此数据。
+ **shardID：**分片的 ID。
**注意**  
仅当您[使用相同的适用于 Java 的 KCL 2.x 消费端应用程序处理多个数据流](#shared-throughput-kcl-multistream)时，此数据才会出现在租约表中。只有适用于 Java 的 KCL 2.x 才支持此功能，且从适用于 Java 的 KCL 2.3 及更高版本开始才支持此功能。
+ **stream name** 数据流的标识符采用以下格式：`account-id:StreamName:streamCreationTimestamp`。
**注意**  
仅当您[使用相同的适用于 Java 的 KCL 2.x 消费端应用程序处理多个数据流](#shared-throughput-kcl-multistream)时，此数据才会出现在租约表中。只有适用于 Java 的 KCL 2.x 才支持此功能，且从适用于 Java 的 KCL 2.3 及更高版本开始才支持此功能。

### 吞吐量
<a name="shared-throughput-kcl-leasetable-throughput"></a>

如果您的 Amazon Kinesis Data Streams 收到了预置的吞吐量异常，您应为 DynamoDB 表增加预置的吞吐量。KCL 将创建预置吞吐量为 10 次读取/秒和 10 次写入/秒的表，但这可能无法满足您应用程序的需求。例如，如果您的 Amazon Kinesis Data Streams 执行频繁的检查点操作或对由很多分片组成的流执行操作，您可能需要更多吞吐量。

有关 DynamoDB 表中预置吞吐量的信息，请参阅《Amazon DynamoDB 开发人员指南》**中的[读/写容量模式](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html)和[使用表和数据](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html)。

### 租约表如何与 Kinesis 数据流中的分片同步
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

KCL 消费端应用程序中的工作程序使用租约来处理指定数据流中的分片。哪个工作程序在指定时间租赁哪个分片的相关信息都存储在租约表中。在 KCL 消费端应用程序运行期间，租约表必须与数据流中的最新分片信息保持同步。在消费端应用程序引导启动期间（初始化或重新启动消费端应用程序时），以及每当正在处理的分片结束时（重新分片），KCL 都会将租约表与从 Kinesis Data Streams 服务获取的分片信息同步。换言之，在消费端应用程序初始引导启动期间，以及每当消费端应用程序遇到数据流重新分片事件时，工作程序或 KCL 消费端应用程序都会与其正在处理的数据流同步。

**Topics**
+ [KCL 1.0 - 1.13 和 KCL 2.0 - 2.2 中的同步](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [KCL 2.x 中的同步从 KCL 2.3 及更高版本开始](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [KCL 1.x 中的同步从 KCL 1.14 及更高版本开始](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### KCL 1.0 - 1.13 和 KCL 2.0 - 2.2 中的同步
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

在 KCL 1.0-1.13 和 KCL 2.0-2.2 中，在使用者应用程序的引导期间以及每个数据流重新分片事件期间，KCL 通过调用或发现将租用表与从 Kinesis Data Streams 服务获取的分片信息同步。`ListShards` `DescribeStream` APIs在上面列出的所有 KCL 版本中，KCL 使用者应用程序的每个 worker 都要完成以下步骤，以便在使用者应用程序的引导期间和每个流 reshard 事件中执行 lease/shard 同步过程：
+ 获取正在处理的流中数据的所有分片
+ 从租约表中获取所有分片租约
+ 筛选出租约表中没有租约的所有开放分片
+ 迭代所有找到的开放分片以及没有开放父分片的所有开放分片：
  + 遍历层次结构树的原级路径，确定该分片是否为后代分片。如果正在处理原级分片（租约表中存在原级分片的租约条目）或者应该处理原级分片（例如初始位置为 `TRIM_HORIZON` 或 `AT_TIMESTAMP`），则该分片被视为后代分片
  + 如果上下文中的开放分片是后代分片，KCL 会根据初始位置对分片执行检查点操作，并在需要时为其父分片创建租约

#### KCL 2.x 中的同步从 KCL 2.3 及更高版本开始
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

从支持的最新版本 KCL 2.x（KCL 2.3）及更高版本开始，该库现在支持对同步过程进行以下更改。这些 lease/shard 同步更改显著减少了 KCL 使用者应用程序对 Kinesis Data Streams 服务进行的 API 调用次数，并优化了 KCL 使用者应用程序中的租赁管理。
+ 在应用程序的引导启动过程中，如果租约表为空，KCL 将利用 `ListShard` API 的筛选选项（`ShardFilter` 可选请求参数），仅针对在 `ShardFilter` 参数指定时间开放的分片的快照检索和创建租约。`ShardFilter` 参数可以让您筛选出 `ListShards` API 的响应。`ShardFilter` 参数唯一需要的属性是 `Type`。KCL 使用 `Type` 筛选属性及其以下有效值来识别并返回可能需要新租约的开放分片的快照：
  + `AT_TRIM_HORIZON` – 响应包含在 `TRIM_HORIZON` 时开放的所有分片。
  + `AT_LATEST` – 响应仅包含数据流中当前开放的分片。
  + `AT_TIMESTAMP` – 响应包含起始时间戳小于或等于指定时间戳且结束时间戳大于或等于指定时间戳的所有分片，或仍处于开放状态的所有分片。

  `ShardFilter` 用于为空租约表创建租约，以初始化在 `RetrievalConfig#initialPositionInStreamExtended` 中指定的分片快照的租约。

  有关 `ShardFilter`的更多信息，请参阅[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html)。
+ 不是所有工作人员都执行 lease/shard 同步以使租赁表与数据流中的最新分片保持同步，而是由一个当选的工作领导者执行租赁/分片同步。
+ KCL 2.3 使用`GetRecords`和的`ChildShards`返回参数对已关闭的分片执行 lease/shard 同步，从而允许 KCL 工作程序仅为其完成处理的分片的子分片创建租约。`SubscribeToShard` APIs `SHARD_END`为了在整个消费者应用程序中共享， lease/shard 同步的这种优化使用了 `GetRecords` API 的`ChildShards`参数。对于专用吞吐量（增强扇出）消费者应用程序， lease/shard 同步的这种优化使用 API 的`ChildShards``SubscribeToShard`参数。有关更多信息，请参阅 [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)、[SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) 和 [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。
+ 经过上述更改，KCL 的行为正在从所有工作程序学习所有现有分片的模式，转变为工作程序只学习每个工作程序拥有的分片的子分片的模式。因此，除了在使用者应用程序引导和重新分片事件期间发生的同步外，KCL 现在还会执行额外的定期 shard/lease 扫描，以识别租赁表中的任何潜在漏洞（换句话说，了解所有新分片），以确保处理数据流的完整哈希范围，并在需要时为它们创建租约。 `PeriodicShardSyncManager`是负责运行定期 lease/shard 扫描的组件。

  有关 KCL 2.3 `PeriodicShardSyncManager` 中的更多信息，请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java \$1L201](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213)-L213。

  在 KCL 2.3 中，可以使用新的配置选项来配置 `LeaseManagementConfig` 中的 `PeriodicShardSyncManager`：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/shared-throughput-kcl-consumers.html)

  现在还会发布新的 CloudWatch 指标来监控的`PeriodicShardSyncManager`运行状况。有关更多信息，请参阅 [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task)。
+ 包括对 `HierarchicalShardSyncer` 的优化，可仅为一层分片创建租约。

#### KCL 1.x 中的同步从 KCL 1.14 及更高版本开始
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

从支持的最新版本 KCL 1.x（KCL 1.14）及更高版本开始，该库现在支持对同步过程进行以下更改。这些 lease/shard 同步更改显著减少了 KCL 使用者应用程序对 Kinesis Data Streams 服务进行的 API 调用次数，并优化了 KCL 使用者应用程序中的租赁管理。
+ 在应用程序的引导启动过程中，如果租约表为空，KCL 将利用 `ListShard` API 的筛选选项（`ShardFilter` 可选请求参数），仅针对在 `ShardFilter` 参数指定时间开放的分片的快照检索和创建租约。`ShardFilter` 参数可以让您筛选出 `ListShards` API 的响应。`ShardFilter` 参数唯一需要的属性是 `Type`。KCL 使用 `Type` 筛选属性及其以下有效值来识别并返回可能需要新租约的开放分片的快照：
  + `AT_TRIM_HORIZON` – 响应包含在 `TRIM_HORIZON` 时开放的所有分片。
  + `AT_LATEST` – 响应仅包含数据流中当前开放的分片。
  + `AT_TIMESTAMP` – 响应包含起始时间戳小于或等于指定时间戳且结束时间戳大于或等于指定时间戳的所有分片，或仍处于开放状态的所有分片。

  `ShardFilter` 用于为空租约表创建租约，以初始化在 `KinesisClientLibConfiguration#initialPositionInStreamExtended` 中指定的分片快照的租约。

  有关 `ShardFilter`的更多信息，请参阅[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html)。
+ 不是所有工作人员都执行 lease/shard 同步以使租赁表与数据流中的最新分片保持同步，而是由一个当选的工作领导者执行租赁/分片同步。
+ KCL 1.14 使用`GetRecords`和的`ChildShards`返回参数对已关闭的`SubscribeToShard` APIs分片执行 lease/shard 同步，从而允许 KCL 工作程序仅为其完成处理的分片的子分片创建租约。`SHARD_END`有关更多信息，请参阅[GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)和[ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html)。
+ 经过上述更改，KCL 的行为正在从所有工作程序学习所有现有分片的模式，转变为工作程序只学习每个工作程序拥有的分片的子分片的模式。因此，除了在使用者应用程序引导和重新分片事件期间发生的同步外，KCL 现在还会执行额外的定期 shard/lease 扫描，以识别租赁表中的任何潜在漏洞（换句话说，了解所有新分片），以确保处理数据流的完整哈希范围，并在需要时为它们创建租约。 `PeriodicShardSyncManager`是负责运行定期 lease/shard 扫描的组件。

  如果 `KinesisClientLibConfiguration#shardSyncStrategyType` 设置为 `ShardSyncStrategyType.SHARD_END`，则 `PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` 用于确定租约表中包含漏洞的连续扫描次数的阈值，之后将强制执行分片同步。当 `KinesisClientLibConfiguration#shardSyncStrategyType` 设置为时 `ShardSyncStrategyType.PERIODIC`，`leasesRecoveryAuditorInconsistencyConfidenceThreshold` 将被忽略。

  有关 KCL 1.14 `PeriodicShardSyncManager` 中的更多信息，请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999) \$1L987-L999。

  在 KCL 1.14 中，可以使用新的配置选项来配置 `LeaseManagementConfig` 中的 `PeriodicShardSyncManager`：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/shared-throughput-kcl-consumers.html)

  现在还会发布新的 CloudWatch 指标来监控的`PeriodicShardSyncManager`运行状况。有关更多信息，请参阅 [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task)。
+ KCL 1.14 现在还支持延期租约清理。当分片超过数据流的保留期或因重新分片操作而关闭时，`LeaseCleanupManager` 会在到达 `SHARD_END` 时异步删除租约。

  可以使用新的配置选项来配置 `LeaseCleanupManager`。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ 包括对 `KinesisShardSyncer` 的优化，可仅为一层分片创建租约。

## 使用相同的适用于 Java 的 KCL 2.x 消费端应用程序处理多个数据流
<a name="shared-throughput-kcl-multistream"></a>

本节介绍了适用于 Java 的 KCL 2.x 中的以下更改，这些更改让您能够创建可同时处理多个数据流的 KCL 消费端应用程序。

**重要**  
只有适用于 Java 的 KCL 2.x 才支持多流处理功能，且从适用于 Java 的 KCL 2.3 及更高版本开始才支持此功能。  
其他可以实现 KCL 2.x 的语言都不支持多流处理功能。  
任何版本的 KCL 1.x 都不支持多流处理功能。
+ **MultistreamTracker 接口**

  要构建可以同时处理多个流的使用者应用程序，必须实现一个名为的新接口[MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java)。此接口包括返回要由 KCL 消费端应用程序处理的数据流及其配置列表的 `streamConfigList` 方法。请注意，正在处理的数据流可以在消费端应用程序运行时进行更改。KCL 会定期调用 `streamConfigList` 来了解要处理的数据流的变化。

  该`streamConfigList`方法填充[StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)列表。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  请注意，`StreamIdentifier` 和 `InitialPositionInStreamExtended` 是必填字段，`consumerArn` 是选填字段。只有在使用 KCL 2.x 实现增强型扇出消费端应用程序时，才必须提供 `consumerArn`。

  有关的更多信息`StreamIdentifier`，请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). 要创建 `StreamIdentifier`，我们建议您从 v2.5.0 及更高版本提供的 `streamArn` 和 `streamCreationEpoch` 创建一个多流实例。在不支持 `streamArm` 的 KCL v2.3 和 v2.4 中，请使用格式 `account-id:StreamName:streamCreationTimestamp` 创建一个多流实例。从下一个主要版本开始，此格式将弃用且不再受支持。

  `MultistreamTracker` 还包括可删除租约表中旧流租约的策略（`formerStreamsLeasesDeletionStrategy`）。请注意，在消费端应用程序运行时无法更改策略。欲了解更多信息，请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0 .java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)是一个应用程序范围的类，可用于指定构建 KCL 使用者应用程序时要使用的所有 KCL 2.x 配置设置。 `ConfigsBuilder`类现在支持该`MultistreamTracker`接口。你可以用一个数据流的名称进行初始化 ConfigsBuilder以使用来自以下内容的记录：

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  或者，`MultiStreamTracker`如果要实现同时处理多个流的 KCL 使用者应用程序，也可以使用进行初始化 ConfigsBuilder 。

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ 通过为您的 KCL 消费端应用程序实现多流支持功能，应用程序租约表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。
+ 在为 KCL 消费端应用程序实现多流支持功能后，leaseKey 采用以下结构：`account-id:StreamName:streamCreationTimestamp:ShardId`。例如 `111111111:multiStreamTest-1:12345:shardId-000000000336`。
**重要**  
当现有 KCL 消费端应用程序配置为仅处理一个数据流时，leaseKey（租约表的哈希键）就是分片 ID。如果您将此现有 KCL 消费端应用程序重新配置为处理多个数据流，则会破坏租约表，因为支持多流处理功能后，leaseKey 必须采用如下结构：`account-id:StreamName:StreamCreationTimestamp:ShardId`。

## 将 KCL 与 AWS Glue 架构注册表一起使用
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

您可以将 Kinesis 数据流与 AWS Glue 架构注册表集成。 AWS Glue 架构注册表能帮助您集中发现、控制和演变架构，同时确保注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。 AWS Glue Schema Registr end-to-end y 允许您改善流媒体应用程序中的数据质量和数据治理。有关更多信息，请参阅 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。设置此集成的方法之一是使用适用于 Java 的 KCL。

**重要**  
目前，只有使用用 Java 实现的 KCL 2.3 使用者的 Kinesis 数据流支持 Kinesis Streams AWS Glue 和 Schema Registry 集成。不提供多语言支持。不支持 KCL 1.0 消费端。不支持 KCL 2.3 之前的 KCL 2.x 消费端。

有关如何使用 KCL 设置 Kinesis Data Streams 与架构注册表集成的详细说明，请参阅 “用[例：将 Amazon Kinesis Data Streams 与 Glu KPL/KCL e 架构注册表集成” 中的 “使用库与 AWS](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)数据交互” 部分。

# 开发具有共享吞吐量的自定义消费端
<a name="shared-throughput-consumers"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

如果您在从 Kinesis Data Streams 接收数据时不需要专用吞吐量，并且在 200 毫秒下不需要读取传播延迟，则可以构建消费端应用程序，如以下主题所述。您可以使用 Kinesis Client Library（KCL）或 适用于 Java 的 AWS SDK。

**Topics**
+ [使用 KCL 开发具有共享吞吐量的自定义消费端](custom-kcl-consumers.md)

有关使用专用吞吐量构建可从 Kinesis Data Streams 接收记录的消费端的信息，请参阅 [开发具有专用吞吐量的增强扇出型消费端](enhanced-consumers.md)。

# 使用 KCL 开发具有共享吞吐量的自定义消费端
<a name="custom-kcl-consumers"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

开发具有共享吞吐量的定制消费端应用程序的方法之一是使用 Kinesis Client Library（KCL）。

根据您正在使用的 KCL 版本，从以下主题进行选择。

**Topics**
+ [开发 KCL 1.x 消费端](developing-consumers-with-kcl.md)
+ [开发 KCL 2.x 消费端](developing-consumers-with-kcl-v2.md)

# 开发 KCL 1.x 消费端
<a name="developing-consumers-with-kcl"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library（KCL）为 Amazon Kinesis Data Streams 开发消费端应用程序。

有关 KCL 的更多信息，请参阅[关于 KCL（先前版本）](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview)。

根据要使用的选项，从以下主题进行选择。

**Topics**
+ [在 Java 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-java.md)
+ [在 Node.js 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-nodejs.md)
+ [在 .NET 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-dotnet.md)
+ [在 Python 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-py.md)
+ [在 Ruby 中开发 Kinesis Client Library 消费端](kinesis-record-processor-implementation-app-ruby.md)

# 在 Java 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-java"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Java。要查看 Javadoc 参考资料，请参阅类的 [AWS Javadoc 主题](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)。 AmazonKinesisClient

要从中下载 Java KCL GitHub，请前往 K [inesis 客户端库 (](https://github.com/awslabs/amazon-kinesis-client)Java)。要查找 Apache Maven 上的 Java KCL，请转至 [KCL 搜索结果](https://search.maven.org/#search|ga|1|amazon-kinesis-client)页。要从中下载 Java KCL 使用者应用程序的示例代码 GitHub，请转到上的 [KCL for Java 示例项目](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis)页面。 GitHub

该示例应用程序使用 [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html)。可以使用 `configure` 文件中定义的静态 `AmazonKinesisApplicationSample.java` 方法更改日志记录配置。*有关如何在 Log4j 和 AWS Java 应用程序中使用 Apache 共享日志记录的更多信息，请参阅《开发人员指南》中的使用 [Log4j 进行日志记录](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html)。适用于 Java 的 AWS SDK *

在 Java 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现 IRecord处理器方法](#kinesis-record-processor-implementation-interface-java)
+ [为 IRecord处理器接口实现类工厂](#kinesis-record-processor-implementation-factory-java)
+ [创建工作线程](#kcl-java-worker)
+ [修改配置属性](#kinesis-record-processor-initialization-java)
+ [迁移到版本 2 的记录处理器接口](#kcl-java-v2-migration)

## 实现 IRecord处理器方法
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL 当前支持 `IRecordProcessor` 接口的两个版本：原始接口可与第一个版本的 KCL 一起可用；而版本 2 从 KCL 1.5.0 版才开始可用。这两个接口都完全受支持。您的选择取决于您的特定方案要求。要查看所有区别，请参阅您在本地构建的 Javadocs 或源代码。以下各节概述了开始使用的最低实施要求。

**Topics**
+ [原始接口（版本 1）](#kcl-java-interface-original)
+ [更新后的接口（版本 2）](#kcl-java-interface-v2)

### 原始接口（版本 1）
<a name="kcl-java-interface-original"></a>

原始 `IRecordProcessor` 接口 (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) 公开了下列记录处理器方法，您的消费端必须实施这些方法。该示例提供了可用作起点的实现（请参阅 `AmazonKinesisApplicationSampleRecordProcessor.java`）。

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**初始化**  
KCL 在实例化记录处理器时调用 `initialize` 方法，并将特定分片 ID 作为参数传递。此记录处理器仅处理此分片，并且通常情况下反过来说也成立（此分片仅由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录至少会由消费端中的工作程序处理一次。有关特定分片可能由多个工作程序进行处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
public void initialize(String shardId)
```

**processRecords**  
KCL 调用 `processRecords` 方法，并传递来自由 `initialize(shardId)` 方法指定的分片的数据记录的列表。记录处理器根据消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

除了数据本身之外，记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`Record` 类公开了以下方法，这些方法提供对记录的数据、序列号和分区键的访问。

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

在该示例中，私有方法 `processRecordsWithRetries` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过将检查指针（`IRecordProcessorCheckpointer`）传递到 `processRecords` 来为您执行此跟踪。记录处理器将对此接口调用 `checkpoint` 方法，以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。

对于拆分或合并操作，在原始分片的处理器调用 `checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未传递参数，KCL 将假定对 `checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器只应在已处理传递到它的列表中的所有记录后才调用 `checkpoint`。记录处理器不需要在每次调用 `checkpoint` 时调用 `processRecords`。例如，处理器可以在每第三次调用 `checkpoint` 时调用 `processRecords`。您可以选择性地将某个记录的确切序号指定为 `checkpoint` 的参数。在本例中，KCL 将假定所有记录都已处理，直至处理到该记录。

在该示例中，私有方法 `checkpoint` 展示了如何使用适当的异常处理和重试逻辑调用 `IRecordProcessorCheckpointer.checkpoint`。

KCL 依靠 `processRecords` 来处理由处理数据记录引起的任何异常。如果 `processRecords` 引发了异常，则 KCL 将跳过在异常发生前已传递的数据记录。也就是说，这些记录不会重新发送到引发异常的记录处理器或消费端中的任何其他记录处理器。

**shutdown**  
KCL 在处理结束（关闭原因为 `TERMINATE`）或工作程序不再响应（关闭原因为 `ZOMBIE`）时调用 `shutdown` 方法。

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

KCL 还会将 `IRecordProcessorCheckpointer` 接口传递到 `shutdown`。如果关闭原因为 `TERMINATE`，则记录处理器应完成处理任何数据记录，然后对此接口调用 `checkpoint` 方法。

### 更新后的接口（版本 2）
<a name="kcl-java-interface-v2"></a>

更新后的 `IRecordProcessor` 接口 (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) 公开了下列记录处理器方法，您的消费端必须实施这些方法：

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

原始版本的接口中的所有参数可通过容器对象上的 get 方法进行访问。例如，要检索 `processRecords()` 中的记录的列表，可使用 `processRecordsInput.getRecords()`。

自此接口的版本 2（KCL 1.5.0 及更高版本）起，除了原始接口提供的输入之外，以下新输入也可用：

起始序列号  
在传递给 `InitializationInput` 运算的 `initialize()` 对象中，将提供给记录处理器实例的记录的起始序列号。这是由之前处理同一分片的记录处理器实例进行最近一次检查点操作的序列号。此序列号在您的应用程序需要此信息时提供。

待进行检查点操作的序列号  
在传递给 `initialize()` 运算的 `InitializationInput` 对象中，在上一个记录处理器实例停止前可能无法提交的待进行检查点操作的序列号（如果有）。

## 为 IRecord处理器接口实现类工厂
<a name="kinesis-record-processor-implementation-factory-java"></a>

您还需要为实现记录处理器方法的类实现一个工厂。当消费端实例化工作程序时，它将传递对此工厂的引用。

以下示例使用原始记录处理器接口在文件 `AmazonKinesisApplicationSampleRecordProcessorFactory.java` 中实现工厂类。如果您希望此工厂类创建版本 2 记录处理器，请使用程序包名称 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`。

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## 创建工作线程
<a name="kcl-java-worker"></a>

如 [实现 IRecord处理器方法](#kinesis-record-processor-implementation-interface-java) 中所述，有两个版本的 KCL 记录处理器接口可供选择，这将影响您创建工作程序的方式。原始记录处理器接口使用以下代码结构创建工作线程：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

当使用版本 2 的记录处理器接口时，您可使用 `Worker.Builder` 创建工作线程而无需担心要使用的构造函数以及参数的顺序。更新后的记录处理器接口使用以下代码结构创建工作线程：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## 修改配置属性
<a name="kinesis-record-processor-initialization-java"></a>

该示例提供了配置属性的默认值。工作程序的此配置数据随后将整合到 `KinesisClientLibConfiguration` 对象中。此对象和对 `IRecordProcessor` 的类工厂的引用将传入用于实例化工作程序的调用。您可借助 Java 属性文件（请参阅 `AmazonKinesisApplicationSample.java`）用您自己的值覆盖任何这些属性。

### 应用程序名称
<a name="configuration-property-application-name"></a>

KCL 需要一个应用程序名称，该名称在您的应用程序中以及同一区域的 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-cred-java"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。例如，如果您在 EC2 实例上运行消费端，则我们建议您使用 IAM 角色启动实例。反映与此 IAM 角色关联的权限的 AWS 凭证可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端的凭证最安全。

示例应用程序首先尝试从实例元数据中检索 IAM 凭证：

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

如果示例应用程序无法从实例元数据中获取凭证，它会尝试从属性文件中检索凭证：

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

有关实例元数据的更多信息，请参阅《Amazon EC2 用户指南》**中的[实例元数据](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html)。

### 将工作线程 ID 用于多个实例
<a name="kinesis-record-processor-workerid-java"></a>

示例初始化代码通过使用本地计算机的名称并附加一个全局唯一的标识符为工作程序创建 ID (`workerId`)，如以下代码段所示。此方法支持消费端应用程序的多个实例在单台计算机上运行的方案。

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## 迁移到版本 2 的记录处理器接口
<a name="kcl-java-v2-migration"></a>

如果要迁移使用原始接口的代码，则除了上述步骤之外，还需要执行以下步骤：

1. 更改您的记录处理器类以导入版本 2 记录处理器接口：

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. 更改对输入的引用以在容器对象上使用 `get` 方法。例如，在 `shutdown()` 运算中，将“`checkpointer`”更改为“`shutdownInput.getCheckpointer()`”。

1. 更改您的记录处理器工厂类以导入版本 2 记录处理器工厂接口：

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. 更改工作线程的结构以使用 `Worker.Builder`。例如：

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# 在 Node.js 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Node.js。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果你安装适用于 Node.js 的 KCL 并完全用 Node.js 编写消费者应用程序，那么你仍然需要在系统上安装 Java，因为. MultiLangDaemon 此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Node.js KCL GitHub，请前往 K [inesis 客户端库 (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs)。

**示例代码下载**

Node.js 中有两个代码示例可用于 KCL：
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  在下列节中用于阐释在 Node.js 中构建 KCL 消费端应用程序的基础知识。
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   稍微复杂一些，使用了现实世界的情景，适合在您熟悉基本示例代码之后采用。此示例在这里不做讨论，但它有一个包含更多信息的自述文件。

在 Node.js 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现记录处理器](#kinesis-record-processor-implementation-interface-nodejs)
+ [修改配置属性](#kinesis-record-processor-initialization-nodejs)

## 实现记录处理器
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

使用适用于 Node.js 的 KCL 的最简易的潜在消费端必须实现 `recordProcessor` 函数，该函数反之包含函数 `initialize`、`processRecords` 和 `shutdown`。该示例提供了可用作起点的实现（请参阅 `sample_kcl_app.js`）。

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**初始化**  
KCL 在记录处理器启动时调用 `initialize` 函数。此记录处理器只处理作为 `initializeInput.shardId` 传递的分片 ID，并且通常情况下反过来说也成立（此分片只能由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL 使用包含一个数据记录的列表（这些记录来自在 `initialize` 函数中指定的分片）的输入来调用此函数。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
processRecords: function(processRecordsInput, completeCallback)
```

除了数据本身之外，记录还包含工作程序在处理数据时可使用的序号和分区键。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`record` 词典公开了以下键-值对来访问记录的数据、序号和分区键：

```
record.data
record.sequenceNumber
record.partitionKey
```

请注意，数据是 Base64 编码的。

在该基本示例中，函数 `processRecords` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 利用作为 `processRecordsInput.checkpointer` 传递的 `checkpointer` 对象执行此跟踪。您的记录处理器将调用 `checkpointer.checkpoint` 函数以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将在您重新启动分片的处理时使用此信息，以便在上一个已知的已处理记录处继续处理。

对于拆分或合并操作，在原始分片的处理器调用 `checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未将序列号传递到 `checkpoint` 函数，KCL 将假定对 `checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器**只**应在已处理传递到它的列表中的所有记录后才调用 `checkpoint`。记录处理器不需要在每次调用 `checkpoint` 时调用 `processRecords`。例如，处理器可以调`checkpoint`用每三次呼叫或记录处理器外部的某个事件（例如您实现的自定义 verification/validation 服务）。

您可以选择性地将某个记录的确切序号指定为 `checkpoint` 的参数。在本例中，KCL 将假定所有记录都已处理，直至处理到该记录。

基本示例应用程序显示了对 `checkpointer.checkpoint` 函数最简单的调用。您此时可以在该函数中为您的消费端添加您需要的其他检查点逻辑。

**shutdown**  
KCL 在处理结束（`shutdownInput.reason` 为 `TERMINATE`）或工作程序不再响应（`shutdownInput.reason` 为 `ZOMBIE`）时调用 `shutdown` 函数。

```
shutdown: function(shutdownInput, completeCallback)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

KCL 还会将 `shutdownInput.checkpointer` 对象传递到 `shutdown`。如果关闭原因为 `TERMINATE`，则应确保记录处理器已完成处理任何数据记录，然后对此接口调用 `checkpoint` 函数。

## 修改配置属性
<a name="kinesis-record-processor-initialization-nodejs"></a>

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性（请参阅基本示例中的 `sample.properties`）。

### 应用程序名称
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL 需要一个应用程序，该应用程序在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-credentials-nodejs"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。`sample.properties` 文件必须向[默认凭证提供程序链](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的凭证提供程序之一提供您的凭证。如果您在 Amazon EC2 实例上运行使用器，我们建议您使用 IAM 角色配置该实例。 AWS 反映与此 IAM 角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。

以下示例配置 KCL 以使用 `sample_kcl_app.js` 中提供的记录处理器，处理名为 `kclnodejssample` 的 Kinesis 数据流。

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# 在 .NET 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 .NET。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果您安装适用于.NET 的 KCL 并完全使用.NET 编写使用者应用程序，则仍然需要在系统上安装 Java，因为。 MultiLangDaemon此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载.NET KCL GitHub，请访问 K [inesis 客户端库 (](https://github.com/awslabs/amazon-kinesis-client-net).NET)。要下载.NET KCL 使用者应用程序的示例代码，请转到上的 [KCL for .NET 使用者项目示例](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer)页面。 GitHub

在 .NET 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现 IRecord处理器类方法](#kinesis-record-processor-implementation-interface-dotnet)
+ [修改配置属性](#kinesis-record-processor-initialization-dotnet)

## 实现 IRecord处理器类方法
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

消费端必须实现适用于 `IRecordProcessor` 的以下方法。示例消费端提供了可用作起点的实现（请参阅 `SampleRecordProcessor` 中的 `SampleConsumer/AmazonKinesisSampleConsumer.cs` 类）。

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**初始化**  
KCL 在实例化记录处理程序时调用此方法，并将特定分片 ID 传入 `input` 参数（`input.ShardId`）。此记录处理器只处理此分片，并且通常情况下反过来说也成立（此分片只能由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
KCL 调用此方法，并将由 `Initialize` 方法指定的分片中的数据记录的列表传入 `input` 参数（`input.Records`）。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
public void ProcessRecords(ProcessRecordsInput input)
```

除了数据本身之外，记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`Record` 类公开了以下代理来访问记录的数据、序号和分区键：

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

在该示例中，方法 `ProcessRecordsWithRetries` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过将 `Checkpointer` 对象传递到 `ProcessRecords`（`input.Checkpointer`）来为您执行此跟踪。记录处理器将调用 `Checkpointer.Checkpoint` 方法以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。

对于拆分或合并操作，在原始分片的处理器调用 `Checkpointer.Checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未传递参数，KCL 将假定对 `Checkpointer.Checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器只应在已处理传递到它的列表中的所有记录后才调用 `Checkpointer.Checkpoint`。记录处理器不需要在每次调用 `Checkpointer.Checkpoint` 时调用 `ProcessRecords`。例如，处理器在每第三次或第四次调用时调用 `Checkpointer.Checkpoint`。您可以选择性地将某个记录的确切序号指定为 `Checkpointer.Checkpoint` 的参数。在本例中，KCL 将假定记录都已处理，直至处理到该记录。

在该示例中，私有方法 `Checkpoint(Checkpointer checkpointer)` 展示了如何使用适当的异常处理和重试逻辑调用 `Checkpointer.Checkpoint` 方法。

适用于 .NET 的 KCL 处理异常的方式不同于其他 KCL 语言库，前者不处理因处理数据记录而引起的任何异常。用户代码中未捕获的任何异常都将使程序崩溃。

**关闭**  
KCL 在处理结束（关闭原因为 `TERMINATE`）或工作程序不再响应（关闭 `input.Reason` 值为 `ZOMBIE`）时调用 `Shutdown` 方法。

```
public void Shutdown(ShutdownInput input)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

KCL 还会将 `Checkpointer` 对象传递到 `shutdown`。如果关闭原因为 `TERMINATE`，则记录处理器应完成处理任何数据记录，然后对此接口调用 `checkpoint` 方法。

## 修改配置属性
<a name="kinesis-record-processor-initialization-dotnet"></a>

示例消费端提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性（请参阅 `SampleConsumer/kcl.properties`）。

### 应用程序名称
<a name="modify-kinesis-record-processor-application-name"></a>

KCL 需要一个应用程序，该应用程序在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-creds-dotnet"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必须向[默认凭证提供程序链](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的凭证提供程序之一提供您的凭证。如果您在 EC2 实例上运行消费端应用程序，则建议您使用 IAM 角色进行配置。反映与此 IAM 角色关联的权限 AWS 凭证可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端的凭证最安全。

该示例的属性文件将配置 KCL 以使用 `AmazonKinesisSampleConsumer.cs` 中提供的记录处理器处理名为“words”的 Kinesis 数据流。

# 在 Python 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-py"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Python。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果您安装适用于 Python 的 KCL 并完全使用 Python 编写使用者应用程序，则仍然需要在系统上安装 Java，因为。 MultiLangDaemon此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Python KCL GitHub，请前往 K [inesis 客户端库 (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。要下载 Python KCL 使用者应用程序的示例代码，请转到上的 [KCL for Python 示例项目](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)页面。 GitHub

在 Python 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现 RecordProcessor 类方法](#kinesis-record-processor-implementation-interface-py)
+ [修改配置属性](#kinesis-record-processor-initialization-py)

## 实现 RecordProcessor 类方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 类必须扩展 `RecordProcessorBase` 以实现以下方法。该示例提供了可用作起点的实现（请参阅 `sample_kclpy_app.py`）。

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**初始化**  
KCL 在实例化记录处理器时调用 `initialize` 方法，并将特定分片 ID 作为参数传递。此记录处理器只处理此分片，并且通常情况下反过来说也成立（此分片只能由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作程序进行处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL 调用此方法，并传递由 `initialize` 方法指定的分片中的数据记录的列表。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
def process_records(self, records, checkpointer) 
```

除了数据本身之外，记录还包含一个序号和一个分区键。工作程序可在处理数据时使用这些值。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`record` 词典公开了以下键-值对来访问记录的数据、序号和分区键：

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

请注意，数据是 Base64 编码的。

在该示例中，方法 `process_records` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 通过将 `Checkpointer` 对象传递到 `process_records` 来为您执行此跟踪。记录处理器将对此对象调用 `checkpoint` 方法，以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将使用此信息在已知的上一个已处理记录处重新启动对分片的处理。

对于拆分或合并操作，在原始分片的处理器调用 `checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未传递参数，KCL 将假定对 `checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器只应在已处理传递到它的列表中的所有记录后才调用 `checkpoint`。记录处理器不需要在每次调用 `checkpoint` 时调用 `process_records`。例如，处理器可在每第三次调用时调用 `checkpoint`。您可以选择性地将某个记录的确切序号指定为 `checkpoint` 的参数。在本例中，KCL 将假定所有记录都已处理，直至处理到该记录。

在该示例中，私有方法 `checkpoint` 展示了如何使用适当的异常处理和重试逻辑调用 `Checkpointer.checkpoint` 方法。

KCL 依靠 `process_records` 来处理由处理数据记录引起的任何异常。如果 `process_records` 引发了异常，则 KCL 将跳过在异常发生前已传递到 `process_records` 的数据记录。也就是说，这些记录不会重新发送到引发异常的记录处理器或消费端中的任何其他记录处理器。

**shutdown**  
 KCL 在处理结束（关闭原因为 `TERMINATE`）或工作程序不再响应（关闭 `reason` 为 `ZOMBIE`）时调用 `shutdown` 方法。

```
def shutdown(self, checkpointer, reason)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

 KCL 还会将 `Checkpointer` 对象传递到 `shutdown`。如果关闭 `reason` 是 `TERMINATE`，则记录处理器应完成处理任何数据记录，然后对此接口调用 `checkpoint` 方法。

## 修改配置属性
<a name="kinesis-record-processor-initialization-py"></a>

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性（请参阅 `sample.properties`）。

### 应用程序名称
<a name="kinesis-record-processor-application-name-py"></a>

KCL 需要一个应用程序名称，该名称在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定与此应用程序名称关联的所有工作线程在同一个流上一起运行。这些工作线程可分布在多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-creds-py"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必须向[默认凭证提供程序链](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的凭证提供程序之一提供您的凭证。如果您在 Amazon EC2 实例上运行消费端应用程序，则建议您使用 IAM 角色进行配置。反映与此 IAM 角色关联的权限 AWS 凭证可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。

该示例的属性文件将配置 KCL 以使用 `sample_kclpy_app.py` 中提供的记录处理器处理名为“words”的 Kinesis 数据流。

# 在 Ruby 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Ruby。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果您安装适用于 Ruby 的 KCL 并完全使用 Ruby 编写消费者应用程序，则仍然需要在系统上安装 Java，因为. MultiLangDaemon 此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Ruby KCL GitHub，请前往 K [inesis 客户端库 (](https://github.com/awslabs/amazon-kinesis-client-ruby)Ruby)。要下载 Ruby KCL 使用者应用程序的示例代码，请转到上的 [KCL for Ruby 示例项目](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples)页面。 GitHub

有关 KCL Ruby 支持库的更多信息，请参阅 [KCL Ruby Gems Documentation](http://www.rubydoc.info/gems/aws-kclrb)。

# 开发 KCL 2.x 消费端
<a name="developing-consumers-with-kcl-v2"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

本主题将说明如何使用 2.0 版本的 Kinesis Client Library（KCL）。

有关 KCL 的更多信息，请参阅 [Developing Consumers Using the Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html) 中提供的概述。

根据要使用的选项，从以下主题进行选择。

**Topics**
+ [在 Java 中开发 Kinesis Client Library 消费端](kcl2-standard-consumer-java-example.md)
+ [在 Python 中开发 Kinesis Client Library 消费端](kcl2-standard-consumer-python-example.md)
+ [使用 KCL 2.x 开发具有增强扇出功能的消费端](building-enhanced-consumers-kcl-retired.md)

# 在 Java 中开发 Kinesis Client Library 消费端
<a name="kcl2-standard-consumer-java-example"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

以下代码显示 `ProcessorFactory` 和 `RecordProcessor` 在 Java 中的实施示例。如果要利用增强型扇出功能，请参阅[利用使用增强型扇出功能的用户](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html)。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# 在 Python 中开发 Kinesis Client Library 消费端
<a name="kcl2-standard-consumer-python-example"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Python。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果您安装适用于 Python 的 KCL 并完全使用 Python 编写使用者应用程序，则仍然需要在系统上安装 Java，因为。 MultiLangDaemon此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Python KCL GitHub，请前往 K [inesis 客户端库 (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。要下载 Python KCL 使用者应用程序的示例代码，请转到上的 [KCL for Python 示例项目](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)页面。 GitHub

在 Python 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现 RecordProcessor 类方法](#kinesis-record-processor-implementation-interface-py)
+ [修改配置属性](#kinesis-record-processor-initialization-py)

## 实现 RecordProcessor 类方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 类必须扩展 `RecordProcessorBase` 类以实现以下方法：

```
initialize
process_records
shutdown_requested
```

此示例提供了可用作起点的实现。

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## 修改配置属性
<a name="kinesis-record-processor-initialization-py"></a>

该示例提供了配置属性的默认值，如以下脚本所示。您可使用自己的值覆盖任何这些属性。

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### 应用程序名称
<a name="kinesis-record-processor-application-name-py"></a>

KCL 需要一个应用程序名称，该名称在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定与此应用程序名称关联的所有工作线程在同一个流上一起运行。这些工作线程可分布在多个实例中。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 凭据
<a name="kinesis-record-processor-creds-py"></a>

您必须将您的 AWS 证书提供给[默认凭证提供者链中的一个凭证提供商](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。如果您在 Amazon EC2 实例上运行使用者应用程序，我们建议您使用 IAM 角色配置该实例。 AWS 反映与此 IAM 角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。

# 使用 KCL 2.x 开发具有增强扇出功能的消费端
<a name="building-enhanced-consumers-kcl-retired"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

在 Amazon Kinesis Data Streams 中使用*增强型扇出*功能的消费端，可以接收数据流中的记录，其中每分片每秒专用吞吐量高达 2MB 数据。此类消费端不必与接收流中数据的其他消费端争夺。有关更多信息，请参阅 [开发具有专用吞吐量的增强扇出型消费端](enhanced-consumers.md)。

可以使用 2.0 版或更高版本的 Kinesis Client Library（KCL）开发使用增强型扇出功能接收流中数据的应用程序。KCL 会自动为您的应用程序订阅流的所有分片，并确保您的使用者应用程序可以读取每个分片的吞吐量值 2 MB/sec 。如果要在未开启增强型扇出功能的情况下使用 KCL，请参阅 [Developing Consumers Using the Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html)。

**Topics**
+ [使用 KCL 2.x 在 Java 中开发具有增强扇出功能的消费端](building-enhanced-consumers-kcl-java.md)

# 使用 KCL 2.x 在 Java 中开发具有增强扇出功能的消费端
<a name="building-enhanced-consumers-kcl-java"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 2.0 版或更高版本的 Kinesis Client Library（KCL），在 Amazon Kinesis Data Streams 中开发使用增强型扇出功能接收流中数据的应用程序。以下代码显示 `ProcessorFactory` 和 `RecordProcessor` 在 Java 中的实施示例。

建议您使用 `KinesisClientUtil` 创建 `KinesisAsyncClient`，并在 `KinesisAsyncClient` 中配置 `maxConcurrency`。

**重要**  
Amazon Kinesis 户端可能会看到延迟大幅增加，除非您将 `KinesisAsyncClient` 配置为具有足够高的 `maxConcurrency`，以允许所有租期以及额外使用 `KinesisAsyncClient`。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# 将消费端从 KCL 1.x 迁移到 KCL 2.x
<a name="kcl-migration"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

本主题介绍 Kinesis Client Library（KCL）版本 1.x 和 2.x 之间的区别。其中还向您展示如何将消费端从 KCL 版本 1.x 迁移到版本 2.x。在迁移您的客户端后，它将从最后一个检查点位置开始处理记录。

KCL 版本 2.0 引入了以下接口更改：


**KCL 接口更改**  

| KCL 1.x 接口 | KCL 2.0 接口 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | 折叠为 software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [迁移记录处理器类](#recrod-processor-migration)
+ [迁移记录处理器工厂](#recrod-processor-factory-migration)
+ [迁移工作线程](#worker-migration)
+ [配置 Amazon Kinesis 客户端](#client-configuration)
+ [闲置时间删除](#idle-time-removal)
+ [客户端配置删除](#client-configuration-removals)

## 迁移记录处理器类
<a name="recrod-processor-migration"></a>

以下示例显示了为 KCL 1.x 实现的记录处理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**迁移记录处理器类**

1. 将接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改为 `software.amazon.kinesis.processor.ShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. 更新 `import` 和 `initialize` 方法的 `processRecords` 语句。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. 使用以下新方法替换 `shutdown` 方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

下面是记录处理器类的更新版本。

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## 迁移记录处理器工厂
<a name="recrod-processor-factory-migration"></a>

记录处理器工厂负责在获得租约时创建记录处理器。下面是 KCL 1.x 工厂的示例。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**迁移记录处理器工厂**

1. 将已实施的接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改为 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. 更改 `createProcessor` 的返回签名。

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

下面是 2.0 中的记录处理器工厂的示例：

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## 迁移工作线程
<a name="worker-migration"></a>

在 KCL 版本 2.0 中，名为 `Scheduler` 的新类取代了 `Worker` 类。下面是 KCL 1.x 工作程序的示例。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**迁移工作程序**

1. 将 `Worker` 类的 `import` 语句更改为 `Scheduler` 和 `ConfigsBuilder` 类的导入语句。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 创建 `ConfigsBuilder` 和 `Scheduler`，如以下示例所示。

   建议您使用 `KinesisClientUtil` 创建 `KinesisAsyncClient`，并在 `KinesisAsyncClient` 中配置 `maxConcurrency`。
**重要**  
Amazon Kinesis 户端可能会看到延迟大幅增加，除非您将 `KinesisAsyncClient` 配置为具有足够高的 `maxConcurrency`，以允许所有租期以及额外使用 `KinesisAsyncClient`。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## 配置 Amazon Kinesis 客户端
<a name="client-configuration"></a>

随着 Kinesis Client Library 版本 2.0 的发布，客户端的配置已从单个配置类（`KinesisClientLibConfiguration`）变为 6 个配置类。下表描述了迁移。


**配置字段及其新类**  

| 原始字段 | 新配置类 | 说明 | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | 此 KCL 应用程序的名称。用作 tableName 和 consumerName 的默认名称。 | 
| tableName | ConfigsBuilder | 允许覆盖用于 Amazon DynamoDB 租赁表的表名称。 | 
| streamName | ConfigsBuilder | 此应用程序从其中处理记录的流的名称。 | 
| kinesisEndpoint | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| dynamoDBEndpoint | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| initialPositionInStreamExtended | RetrievalConfig | 分片中 KCL 开始获取记录的位置，从应用程序的初始运行开始。 | 
| kinesisCredentialsProvider | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| dynamoDBCredentialsProvider | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| cloudWatchCredentialsProvider | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| failoverTimeMillis | LeaseManagementConfig | 在您可以将租赁所有者视为已失败之前必须经过的毫秒数。 | 
| workerIdentifier | ConfigsBuilder | 表示应用程序处理器的这种实例化的唯一标识符。此值必须唯一。 | 
| shardSyncIntervalMillis | LeaseManagementConfig | 分片同步调用之间的时间。 | 
| maxRecords | PollingConfig | 允许设置 Kinesis 返回的最大记录数。 | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | 此选项已删除。请参阅“闲置时间删除”。 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | 如果设置，即使 Kinesis 中未提供任何记录，也会调用记录处理器。 | 
| parentShardPollIntervalMillis | CoordinatorConfig | 记录处理器应轮询多少时间才能查看是否已完成父分片。 | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | 如果设置，只要子租赁已开始处理，即可删除租赁。 | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | 如果设置，将忽略具有打开的分片的子分片。这主要适用于 DynamoDB Streams。 | 
| kinesisClientConfig | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| dynamoDBClientConfig | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| cloudWatchClientConfig | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| taskBackoffTimeMillis | LifecycleConfig | 等待重试失败任务的时间。 | 
| metricsBufferTimeMillis | MetricsConfig | 控制 CloudWatch 指标发布。 | 
| metricsMaxQueueSize | MetricsConfig | 控制 CloudWatch 指标发布。 | 
| metricsLevel | MetricsConfig | 控制 CloudWatch 指标发布。 | 
| metricsEnabledDimensions | MetricsConfig | 控制 CloudWatch 指标发布。 | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | 此选项已删除。请参阅“检查点序列号验证”。 | 
| regionName | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| maxLeasesForWorker | LeaseManagementConfig | 应用程序的单个实例应接受的最大租赁数量。 | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | 应用程序一次应尝试窃取的最大租赁数量。 | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | 在 Kinesis 客户端库需要创建新的 DynamoDB 租用表时使用的 DynamoDB IOPs 读取。 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | 在 Kinesis 客户端库需要创建新的 DynamoDB 租用表时使用的 DynamoDB IOPs 读取。 | 
| initialPositionInStreamExtended | LeaseManagementConfig | 应用程序应在流中开始的初始位置。此值仅在创建初始租赁时使用。 | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | 如果租赁表包含现有租赁，请禁用同步的分片数据。待办事项： KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | 要使用的分片优先级。 | 
| shutdownGraceMillis | 不适用 | 此选项已删除。参见 “ MultiLang 移除”。 | 
| timeoutInSeconds | 不适用 | 此选项已删除。参见 “ MultiLang 移除”。 | 
| retryGetRecordsInSeconds | PollingConfig | 配置 GetRecords 尝试失败之间的延迟。 | 
| maxGetRecordsThreadPool | PollingConfig | 使用的线程池大小 GetRecords。 | 
| maxLeaseRenewalThreads | LeaseManagementConfig | 控制租赁续订线程池的大小。您的应用程序可以容纳的租赁越多，此池应该就越大。 | 
| recordsFetcherFactory | PollingConfig | 允许替换用于创建从流中检索的提取程序的工厂。 | 
| logWarningForTaskAfterMillis | LifecycleConfig | 任务尚未完成的情况下在记录警告之前要等待的时长。 | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 发生故障时在调用 ListShards 之间要等待的时间（以毫秒为单位）。 | 
| maxListShardsRetryAttempts | RetrievalConfig | ListShards 在放弃之前重试的最长时间。 | 

## 闲置时间删除
<a name="idle-time-removal"></a>

在 KCL 版本 1.x 中，`idleTimeBetweenReadsInMillis` 对应了两个数量：
+ 任务分派检查之间的时间量。您现在可以通过设置 `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis` 来在任务之间配置此时间。
+ 未从 Kinesis Data Streams 中返回任何记录时的睡眠时间量。在版本 2.0 中，带增强型扇出功能的记录是从其各自的检索器中推送的。分片消费端上的活动仅发生在推送的请求到达时。

## 客户端配置删除
<a name="client-configuration-removals"></a>

在版本 2.0 中，KCL 不再创建客户端。这取决于用户提供有效的客户端。进行此更改后，已删除控制客户端创建的所有配置参数。如果需要这些参数，您可以在向 `ConfigsBuilder` 提供客户端之前在客户端上设置它们。


****  

| 已删除字段 | 等效配置 | 
| --- | --- | 
| kinesisEndpoint | 使用以下首选端点配置开发工具包 KinesisAsyncClient：KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()。 | 
| dynamoDBEndpoint | 使用以下首选端点配置开发工具包 DynamoDbAsyncClient：DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()。 | 
| kinesisClientConfig | 使用以下所需配置来配置开发工具包 KinesisAsyncClient：KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| dynamoDBClientConfig | 使用以下所需配置来配置开发工具包 DynamoDbAsyncClient：DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| cloudWatchClientConfig | 使用以下所需配置来配置开发工具包 CloudWatchAsyncClient：CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| regionName | 使用首选区域配置开发工具包。这对所有开发工具包客户端均相同。例如 KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build()。 | 