本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
多流处理 KCL
本节介绍所需的更改KCL,这些更改允许您创建可以同时处理多个数据流的使用KCL者应用程序。
重要
-
仅在 KCL 2.3 或更高版本中支持多流处理。
-
使用非 Java 语言编写的KCL使用者不支持多流处理。
multilangdaemon
-
任何版本的 KCL 1.x 都不支持多流处理。
-
MultistreamTracker 接口
-
要构建可以同时处理多个流的使用者应用程序,必须实现一个名为的新接口MultistreamTracker
。该接口包括返回要由使用KCL者应用程序处理的数据流及其配置列表 streamConfigList
的方法。请注意,正在处理的数据流可以在使用者应用程序运行时进行更改。streamConfigList
定期调用KCL,以了解要处理的数据流的变化。 -
streamConfigList
填充StreamConfig列表。
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
-
StreamIdentifier
和为InitialPositionInStreamExtended
必填字段,而consumerArn
为可选字段。consumerArn
只有在使用KCL实现增强型扇出消费者应用程序时,才必须提供。 -
有关的更多信息
StreamIdentifier
,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. 要创建 StreamIdentifier
,我们建议您从streamArn
和中创建一个多流实例,该实例在 KCL 2.5.0 或更高版本中可用。streamCreationEpoch
在不支持的 v2. KCL 3 和 v2.4 中streamArm
,使用格式创建多流实例。account-id:StreamName:streamCreationTimestamp
从下一个主要版本开始,此格式将弃用且不再受支持。 -
MultistreamTracker 还包括删除租赁表中旧直播租约的策略 (formerStreamsLeasesDeletionStrategy)。请注意,策略CANNOT将在使用者应用程序运行时进行更改。欲了解更多信息,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ /.java amazon-kinesis-client
。src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy
-
或者,MultiStreamTracker
如果你想实现一个同时处理多个流的 ConfigsBuilder 使用KCL者应用程序,你可以使用进行初始化。
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
通过为使用KCL者应用程序实现多流支持,应用程序租赁表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。
-
在实现对KCL消费者应用程序的多流支持时, leaseKey 采用以下结构:
account-id:StreamName:streamCreationTimestamp:ShardId
。例如,111111111:multiStreamTest-1:12345:shardId-000000000336
。
重要
当您的现有使用KCL者应用程序配置为仅处理一个数据流时,leaseKey
(这是租赁表的分区键)就是分片 ID。如果您将现有的使用KCL者应用程序重新配置为处理多个数据流,则会破坏您的租赁表,因为leaseKey
结构必须如下所示:account-id:StreamName:StreamCreationTimestamp:ShardId
以支持多流。