本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
KCL1.x 和 2.x 信息
注意
Kinesis 客户端库 (KCL) 版本 1.x 和 2.x 已过时。我们建议迁移到 3.x KCL 版,该版本提供了改进的性能和新功能。有关最新的KCL文档和迁移指南,请参阅使用 Kinesis 客户端库。
开发可以处理来自数据流KDS的数据的定制使用者应用程序的方法之一是使用 Kinesis 客户端库 () KCL。
主题
注意
对于 KCL 1.x 和 KCL 2.x,建议您根据使用场景升级到最新的 KCL 1.x 版本或 KCL 2.x 版本。KCL1.x 和 KCL 2.x 都会定期使用新版本进行更新,其中包括最新的依赖项和安全补丁、错误修复以及向后兼容的新功能。有关更多信息,请参阅 https://github.com/awslabs/amazon-kinesis-client/releases。
关于KCL(之前的版本)
KCL通过处理与分布式计算相关的许多复杂任务,帮助您使用和处理来自 Kinesis 数据流的数据。这些任务包括跨多个消费端应用程序实例的负载平衡、对消费端应用程序实例故障的响应、已处理记录的检查点操作以及对重新分片的反应。KCL会处理所有这些子任务,因此您可以将精力集中在编写自定义记录处理逻辑上。
与中提供KCL的 Kinesis Data APIs Streams 不同。 AWS SDKsKinesis Data APIs Streams 可帮助您管理 Kinesis Data Streams 的许多方面,包括创建流、重新分片以及放置和获取记录。围绕所有这些子任务KCL提供了一个抽象层,以便您可以专注于使用者应用程序的自定义数据处理逻辑。有关 Kinesis Data API Streams 的信息,请参阅《API亚马逊 Kinesis 参考》。
重要
KCL是一个 Java 库。使用名为 “” 的多语言接口提供对 Java 以外其他语言的 MultiLangDaemon支持。此守护程序基于 Java,当您使用 Java 以外的KCL语言时,它会在后台运行。例如,如果您安装适用于 Python 的,并完全使用 Python 编写使用者应用程序,则仍然需要在系统上安装 Java,因为 MultiLangDaemon。KCL此外, MultiLangDaemon 还有一些您可能需要根据自己的用例自定义的默认设置,例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub,请参阅 p KCL MultiLangDaemon roject
KCL充当你的记录处理逻辑和 Kinesis Data Streams 之间的中介。
KCL 先前版本
目前,您可以使用以下支持的任一版本KCL来构建您的自定义使用者应用程序:
-
KCL1.x
有关更多信息,请参阅 培养 KCL 1.x 消费者
-
KCL2.x
有关更多信息,请参阅 培养 KCL 2.x 消费者
您可以使用 KCL 1.x 或 KCL 2.x 来构建使用共享吞吐量的消费者应用程序。有关更多信息,请参阅 使用共享吞吐量开发自定义消费者 KCL。
要构建使用专用吞吐量(增强型扇出使用者)的使用者应用程序,只能使用 KCL 2.x。有关更多信息,请参阅 开发具有专用吞吐量的增强型扇出消费者。
有关 KCL 1.x 和 KCL 2.x 之间区别的信息,以及如何从 KCL 1.x 迁移到 KCL 2.x 的说明,请参阅。将消费者从 KCL 1.x 迁移到 KCL 2.x
KCL概念(之前的版本)
-
KCL消费者应用程序 — 一种使用KCL数据流定制的应用程序,旨在读取和处理数据流中的记录。
-
消费者应用程序实例-KCL 消费者应用程序通常是分布式的,一个或多个应用程序实例同时运行,以便协调故障和动态负载平衡数据记录处理。
-
Worker — 使用KCL者应用程序实例用来开始处理数据的高级类。
重要
每个使用KCL者应用程序实例都有一个工作程序。
工作程序负责初始化和监督各种任务,包括同步分片和租约信息、跟踪分片分配以及处理来自分片的数据。工作程序KCL提供使用者应用程序的配置信息,例如该使用KCL者应用程序将要处理其数据记录的数据流的名称以及访问该数据流所需的 AWS 凭证。工作程序还会启动特定的使用KCL者应用程序实例,将数据记录从数据流传送到记录处理器。
重要
在 KCL 1.x 中,这个类被称为 Worker。有关更多信息(这些是 Java KCL 存储库),请参见 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java。
在 KCL 2.x 中,这个类被称为调度器。KCL2.x 中调度器的目的与 1.x 中工作程序的目的相同。KCL有关 KCL 2.x 中调度器类的更多信息,请参见 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler .java。 -
租约 – 定义工作程序和分片之间绑定的数据。分布式KCL消费者应用程序使用租约将数据记录处理分成一群工作人员。在任何给定时间,每个数据记录分片都通过leaseKey变量标识的租约绑定到特定的工作人员。
默认情况下,工作人员可以同时持有一份或多份租约(以 maxLeasesForWorker 变量的值为准)。
重要
每个工作程序都将争相持有数据流中所有可用分片的所有可用租约。但是,无论何时,只有一个工作程序可以成功持有每份租约。
例如,如果您的消费端应用程序实例 A 和工作程序 A 正在处理包含 4 个分片的数据流,则工作程序 A 可以同时持有分片 1、2、3 和 4 的租约。但是,如果您有 A 和 B 两个消费端应用程序实例,以及工作程序 A 和工作程序 B,并且这些实例正在处理包含 4 个分片的数据流,则工作程序 A 和工作程序 B 不能同时持有分片 1 的租约。一个工作程序持有特定分片的租约,直到该工作程序准备好停止处理该分片的数据记录或该工作程序失败为止。当一个工作程序停止持有租约时,另一个工作程序会接管并持有租约。
有关更多信息,(这些是 Java KCL 存储库),请参阅 KCL 1.x 版的 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java
和适用于 2.x 的 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java。 KCL -
租赁表-一个唯一的 Amazon DynamoDB 表,用于跟踪数据流中KDS由使用者应用程序的工作程序租用和处理的分片。KCL使用者应用程序运行时,租赁表必须与数据流中的最新分片信息保持同步(在工作程序内部和所有工作程序之间)。KCL有关更多信息,请参阅 使用租赁表来跟踪使用者应用程序处理的KCL分片。
-
记录处理器 — 定义您的KCL消费者应用程序如何处理从数据流中获取的数据的逻辑。在运行时,使用KCL者应用程序实例化一个工作程序,该工作程序为其持有租约的每个分片实例化一个记录处理器。
使用租赁表来跟踪使用者应用程序处理的KCL分片
什么是租约表
对于每个 Amazon Kinesis Data Streams 应用程序KCL,都使用一个唯一的租赁表(存储在 Amazon DynamoDB 表中)来跟踪数据流KDS中由使用者应用程序的工作程序租用和处理的分片。KCL
重要
KCL使用使用者应用程序的名称来创建此使用者应用程序使用的租赁表的名称,因此每个使用者应用程序的名称必须是唯一的。
您可在消费端应用程序运行的同时使用 Amazon DynamoDB 控制台查看租约表。
如果您的使用KCL者应用程序的租赁表在应用程序启动时不存在,则其中一个工作人员会为此应用程序创建租赁表。
重要
除开与 Kinesis Data Streams 本身关联的费用,您的账户将被收取与 DynamoDB 表关联的费用。
租约表中的每行表示您消费端应用程序正在处理的分片。如果您的使用KCL者应用程序仅处理一个数据流leaseKey
,则分片 ID 是租用表的哈希键。如果你是使用相同的 KCL 2.x 处理多个数据流,适用于 Java 消费者应用程序, 那么的结构 leaseKey 看起来像这样:account-id:StreamName:streamCreationTimestamp:ShardId
. 例如,111111111:multiStreamTest-1:12345:shardId-000000000336
。
除了分片 ID 以外,每行还包含以下数据:
-
checkpoint:分片的最新检查点序号。此值在数据流的所有分片中都是唯一的。
-
checkpointSubSequence数字:使用 Kinesis Producer 库的聚合功能时,这是对检查点的扩展,用于跟踪 Kinesis 记录中的单个用户记录。
-
leaseCounter:用于租赁版本控制,以便工作人员可以检测其租约是否已被其他工作人员占用。
-
leaseKey:租赁的唯一标识符。每份租约都是数据流中一个分片所特有的,一份由一个工作程序持有。
-
leaseOwner: 持有此租约的员工。
-
ownerSwitchesSince检查点:自上次写检查点以来,这份租约更换了多少次员工。
-
parentShardId:用于确保在子分片上开始处理之前,父分片已得到完全处理。这可以确保记录按照放入流中的相同顺序处理。
-
hashrange:
PeriodicShardSyncManager
用来运行定期同步,以查找租约表中缺少的分片,并在需要时为分片创建租约。注意
从 KCL 1.14 和 KCL 2.3 开始的每个分片的租用表中都有此数据。有关租约和分片之间的
PeriodicShardSyncManager
和定期同步的更多信息,请参阅 租约表如何与 Kinesis 数据流中的分片同步。 -
childshards:
LeaseCleanupManager
用来查看子分片的处理状态并决定是否可以从租约表中删除父分片。注意
从 KCL 1.14 和 KCL 2.3 开始的每个分片的租用表中都有此数据。
-
shardID:分片的 ID。
注意
仅当您使用相同的 KCL 2.x 处理多个数据流,适用于 Java 消费者应用程序时,此数据才会出现在租约表中。只有适用于 Java 的 KCL 2.x 版本中才支持此功能,从 Java 的 KCL 2.3 及更高版本开始。
-
stream name 数据流的标识符采用以下格式:
account-id:StreamName:streamCreationTimestamp
。注意
仅当您使用相同的 KCL 2.x 处理多个数据流,适用于 Java 消费者应用程序时,此数据才会出现在租约表中。只有适用于 Java 的 KCL 2.x 版本中才支持此功能,从 Java 的 KCL 2.3 及更高版本开始。
吞吐量
如果您的 Amazon Kinesis Data Streams 收到了预置的吞吐量异常,您应为 DynamoDB 表增加预置的吞吐量。KCL 将创建预置吞吐量为 10 次读取/秒和 10 次写入/秒的表,但这对于您的应用程序可能不够。例如,如果您的 Amazon Kinesis Data Streams 执行频繁的检查点操作或对由很多分片组成的流执行操作,您可能需要更多吞吐量。
有关 DynamoDB 表中预置吞吐量的信息,请参阅《Amazon DynamoDB 开发人员指南》中的读/写容量模式和使用表和数据。
租约表如何与 Kinesis 数据流中的分片同步
KCL消费者应用程序中的工作人员使用租约来处理来自给定数据流的分片。哪个工作程序在指定时间租赁哪个分片的相关信息都存储在租约表中。使用KCL者应用程序运行时,租赁表必须与数据流中的最新分片信息保持同步。 KCL在使用者应用程序引导期间(初始化或重新启动使用者应用程序时)以及正在处理的分片结束时(重新分片),都会将租用表与从 Kinesis Data Streams 服务获取的分片信息同步。换句话说,在初始使用KCL者应用程序引导期间,以及每当使用者应用程序遇到数据流重新分片事件时,工作程序或使用者应用程序都会与他们正在处理的数据流同步。
KCL1.0-1.13 和 KCL 2.0-2.2 中的同步
在 KCL 1.0-1.13 和 KCL 2.0-2.2 中,在使用者应用程序的引导期间以及每个数据流重新分片事件期间,通过调用或发现将租用表与从 Kinesis Data Streams 服务获取的分片信息KCL同步。ListShards
DescribeStream
APIs在上面列出的所有KCL版本中,使用KCL者应用程序的每个 worker 都要完成以下步骤,以便在使用者应用程序的引导期间和每个流 reshard 事件中执行租用/分片同步过程:
-
获取正在处理的流中数据的所有分片
-
从租约表中获取所有分片租约
-
筛选出租约表中没有租约的所有开放分片
-
迭代所有找到的开放分片以及没有开放父分片的所有开放分片:
-
遍历层次结构树的原级路径,确定该分片是否为后代分片。如果正在处理原级分片(租约表中存在原级分片的租约条目)或者应该处理原级分片(例如初始位置为
TRIM_HORIZON
或AT_TIMESTAMP
),则该分片被视为后代分片 -
如果上下文中打开的分片是后代,则会根据初始位置KCL检查分片,并在需要时为其父分片创建租约
-
在 KCL 2.x 中同步,从 KCL 2.3 及更高版本开始
从支持的最新版本的 KCL 2.x (KCL2.3) 及更高版本开始,该库现在支持对同步过程进行以下更改。这些租赁/分片同步更改可显著减少使用KCL者应用程序对 Kinesis Data Streams 服务的API调用次数,并优化使用者应用程序中的租赁管理。KCL
-
在应用程序的引导过程中,如果租赁表为空,则KCL使用
ListShard
API的筛选选项(ShardFilter
可选的请求参数)来检索和创建租约,仅针对在参数指定时间打开的分片的快照进行检索和创建租约。ShardFilter
该ShardFilter
参数使您可以过滤掉的响应ListShards
API。该ShardFilter
参数的唯一必需属性是Type
。 KCL使用Type
filter 属性及其以下有效值来识别并返回可能需要新租约的已打开分片的快照:-
AT_TRIM_HORIZON
– 响应包含在TRIM_HORIZON
时开放的所有分片。 -
AT_LATEST
– 响应仅包含数据流中当前开放的分片。 -
AT_TIMESTAMP
– 响应包含起始时间戳小于或等于指定时间戳且结束时间戳大于或等于指定时间戳的所有分片,或仍处于开放状态的所有分片。
ShardFilter
用于为空租约表创建租约,以初始化在RetrievalConfig#initialPositionInStreamExtended
中指定的分片快照的租约。有关
ShardFilter
的更多信息,请参阅 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html。 -
-
而不是所有工作人员都在执行同lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard步。
-
KCL2.3 使用
GetRecords
和的ChildShards
返回参数对已关闭的SubscribeToShard
APIs分片执行租用/分片同步,从而允许KCL工作程序仅SHARD_END
为其已处理完的分片的子分片创建租约。为了在整个使用者应用程序中共享,租赁/分片同步的这种优化使用了的参数。ChildShards
GetRecords
API对于专用吞吐量(增强扇出)消费者应用程序,租赁/分片同步的这种优化使用了的参数。ChildShards
SubscribeToShard
API有关更多信息,请参阅GetRecords、SubscribeToShards和ChildShard。 -
通过上述更改,的行为KCL正在从所有工作人员学习所有现有分片的模式转变为工作人员只学习每个工作人员拥有的分片的子分片的模式。因此,除了在消费者应用程序引导和重新分片事件期间发生的同步外,KCL现在还会执行额外的定期分片/租赁扫描,以识别租赁表中的任何潜在漏洞(换句话说,了解所有新分片),以确保处理数据流的完整哈希范围,并在需要时为它们创建租约。
PeriodicShardSyncManager
是负责定期运行租赁/分片扫描的组件。有关 KCL 2.3
PeriodicShardSyncManager
中的更多信息,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java #L201-L213。 在 KCL 2.3 中,新的配置选项可在以下版本
PeriodicShardSyncManager
中进行配置LeaseManagementConfig
:名称 默认值 描述 leasesRecoveryAuditorExecutionFrequencyMillis 120000(2 分钟)
审计程序作业扫描租约表中部分租约的频率(以毫秒为单位)。如果审计程序检测到某个流的租约中存在任何漏洞,则会根据
leasesRecoveryAuditorInconsistencyConfidenceThreshold
触发分片同步。leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
定期审计程序作业的置信度阈值,用于确定租约表中数据流的租约是否不一致。如果审计程序连续多次发现同一数据流的不一致之处,则会触发分片同步。
现在还会发布新的 CloudWatch 指标来监控的
PeriodicShardSyncManager
运行状况。有关更多信息,请参阅 PeriodicShardSyncManager。 -
包括对
HierarchicalShardSyncer
的优化,可仅为一层分片创建租约。
在 KCL 1.x 中同步,从 KCL 1.14 及更高版本开始
从支持的最新版本 KCL 1.x (KCL1.14) 及更高版本开始,该库现在支持对同步过程进行以下更改。这些租赁/分片同步更改可显著减少使用KCL者应用程序对 Kinesis Data Streams 服务的API调用次数,并优化使用者应用程序中的租赁管理。KCL
-
在应用程序的引导过程中,如果租赁表为空,则KCL使用
ListShard
API的筛选选项(ShardFilter
可选的请求参数)来检索和创建租约,仅针对在参数指定时间打开的分片的快照进行检索和创建租约。ShardFilter
该ShardFilter
参数使您可以过滤掉的响应ListShards
API。该ShardFilter
参数的唯一必需属性是Type
。 KCL使用Type
filter 属性及其以下有效值来识别并返回可能需要新租约的已打开分片的快照:-
AT_TRIM_HORIZON
– 响应包含在TRIM_HORIZON
时开放的所有分片。 -
AT_LATEST
– 响应仅包含数据流中当前开放的分片。 -
AT_TIMESTAMP
– 响应包含起始时间戳小于或等于指定时间戳且结束时间戳大于或等于指定时间戳的所有分片,或仍处于开放状态的所有分片。
ShardFilter
用于为空租约表创建租约,以初始化在KinesisClientLibConfiguration#initialPositionInStreamExtended
中指定的分片快照的租约。有关
ShardFilter
的更多信息,请参阅 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html。 -
-
而不是所有工作人员都在执行同lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard步。
-
KCL1.14 使用
GetRecords
和的ChildShards
返回参数对已关闭的SubscribeToShard
APIs分片执行租用/分片同步,从而允许KCL工作程序仅SHARD_END
为其完成处理的分片的子分片创建租约。有关更多信息,请参阅GetRecords 和ChildShard。 -
通过上述更改,的行为KCL正在从所有工作人员学习所有现有分片的模式转变为工作人员只学习每个工作人员拥有的分片的子分片的模式。因此,除了在消费者应用程序引导和重新分片事件期间发生的同步外,KCL现在还会执行额外的定期分片/租赁扫描,以识别租赁表中的任何潜在漏洞(换句话说,了解所有新分片),以确保处理数据流的完整哈希范围,并在需要时为它们创建租约。
PeriodicShardSyncManager
是负责定期运行租赁/分片扫描的组件。如果
KinesisClientLibConfiguration#shardSyncStrategyType
设置为ShardSyncStrategyType.SHARD_END
,则PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold
用于确定租约表中包含漏洞的连续扫描次数的阈值,之后将强制执行分片同步。当KinesisClientLibConfiguration#shardSyncStrategyType
设置为时ShardSyncStrategyType.PERIODIC
,leasesRecoveryAuditorInconsistencyConfidenceThreshold
将被忽略。有关 KCL 1.14 版本的更多信息
PeriodicShardSyncManager
,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java #L987-L999。 在 KCL 1.14 中,新的配置选项可在以下版本
PeriodicShardSyncManager
中LeaseManagementConfig
进行配置:名称 默认值 描述 leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
定期审计程序作业的置信度阈值,用于确定租约表中数据流的租约是否不一致。如果审计程序连续多次发现同一数据流的不一致之处,则会触发分片同步。
现在还会发布新的 CloudWatch 指标来监控的
PeriodicShardSyncManager
运行状况。有关更多信息,请参阅 PeriodicShardSyncManager。 -
KCL1.14 现在还支持延期租赁清理。当分片超过数据流的保留期或因重新分片操作而关闭时,
LeaseCleanupManager
会在到达SHARD_END
时异步删除租约。可以使用新的配置选项来配置
LeaseCleanupManager
。名称 默认值 描述 leaseCleanupInterval米利斯 1 minute
运行租约清理线程的时间间隔。
completedLeaseCleanupIntervalMillis 5 分钟 检查租约是否完成的时间间隔。
garbageLeaseCleanupIntervalMillis 30 分钟 检查租约是否为垃圾租约(即超过数据流的保留期)的时间间隔。
-
包括对
KinesisShardSyncer
的优化,可仅为一层分片创建租约。
使用相同的 KCL 2.x 处理多个数据流,适用于 Java 消费者应用程序
本节介绍了 Java KCL 2.x 中的以下更改,这些更改使您能够创建可以同时处理多个数据流的使用KCL者应用程序。
重要
仅在 Java 的 KCL 2.x 版本中支持多流处理,从 Java 的 KCL 2.3 及更高版本开始。
任何其他可以实现 KCL 2.x 的语言都NOT支持多流处理。
任何版本的 KCL 1. NOT x 都支持多流处理。
-
MultistreamTracker 接口
要构建可以同时处理多个流的使用者应用程序,必须实现一个名为的新接口MultistreamTracker
。该接口包括返回要由使用KCL者应用程序处理的数据流及其配置列表 streamConfigList
的方法。请注意,正在处理的数据流可以在使用者应用程序运行时进行更改。streamConfigList
由定期调用,KCL以了解要处理的数据流的变化。该
streamConfigList
方法填充StreamConfig列表。 package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
请注意,
StreamIdentifier
和InitialPositionInStreamExtended
是必填字段,consumerArn
是选填字段。consumerArn
只有在使用 KCL 2.x 实现增强型扇出消费者应用程序时,才必须提供。有关的更多信息
StreamIdentifier
,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. 要创建 StreamIdentifier
,我们建议您从 v2.5.0 及更高版本提供的streamArn
和streamCreationEpoch
创建一个多流实例。在不支持的 v2. KCL 3 和 v2.4 中streamArm
,使用格式创建多流实例。account-id:StreamName:streamCreationTimestamp
从下一个主要版本开始,此格式将弃用且不再受支持。MultistreamTracker
还包括可删除租约表中旧流租约的策略(formerStreamsLeasesDeletionStrategy
)。请注意,策略CANNOT将在使用者应用程序运行时进行更改。欲了解更多信息,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client.java src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy -
ConfigsBuilder
是一个应用程序范围的类,可用于指定构建使用者应用程序时要使用的所有 KCL 2.x 配置设置。KCL ConfigsBuilder
类现在支持该MultistreamTracker
接口。你可以用一个数据流的名称进行初始化 ConfigsBuilder以使用来自以下内容的记录:/** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
或者,
MultiStreamTracker
如果你想实现一个同时处理多个流的 ConfigsBuilder 使用KCL者应用程序,你可以使用进行初始化。* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
通过为使用KCL者应用程序实现多流支持,应用程序租赁表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。
-
在实现对KCL消费者应用程序的多流支持时, leaseKey 采用以下结构:
account-id:StreamName:streamCreationTimestamp:ShardId
。例如,111111111:multiStreamTest-1:12345:shardId-000000000336
。重要
当您的现有使用KCL者应用程序配置为仅处理一个数据流时, leaseKey (这是租赁表的哈希键)就是分片 ID。如果您将这个现有的使用KCL者应用程序重新配置为处理多个数据流,则会破坏您的租赁表,因为在支持多流的情况下, leaseKey 结构必须如下所示:。
account-id:StreamName:StreamCreationTimestamp:ShardId
与 AWS Glue 架构注册表KCL一起使用
您可以将 Kinesis 数据流与 AWS Glue 架构注册表集成。 AWS Glue 架构注册表能帮助您集中发现、控制和演变架构,同时确保注册架构持续验证生成的数据。架构定义了数据记录的结构和格式。架构是用于可靠数据发布、使用或存储的版本化规范。 AWS Glue Schema Registr end-to-end y 允许您改善流媒体应用程序中的数据质量和数据治理。有关更多信息,请参阅 AWS Glue Schema Registry。设置此集成的方法之一是通过 Java KCL 中的。
重要
目前,只有使用 Java 实现的 2.3 KCL 使用者的 Kinesis 数据流支持 Kinesis Streams AWS Glue 和 Schema Registry 集成。不提供多语言支持。 KCL不支持 1.0 消费者。 KCL不支持 2.3 之前的 KCL 2.x 使用者。
有关如何使用设置 Kinesis Data Streams 与架构注册表集成的详细说明,请参阅 “用KCL例:将 Amazon Kinesis 数据流与 AWS Gl KPL ue 架构注册表集成” 中的 “使用 KCL /库与数据交互” 部分。