多流处理 KCL - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

多流处理 KCL

本节介绍所需的更改KCL,这些更改允许您创建可以同时处理多个数据流的使用KCL者应用程序。

重要
  • 仅在 KCL 2.3 或更高版本中支持多流处理。

  • 使用非 Java 语言编写的KCL使用者支持多流处理。multilangdaemon

  • 任何版本的 KCL 1.x 都支持多流处理。

  • MultistreamTracker 接口

    • 要构建可以同时处理多个流的使用者应用程序,必须实现一个名为的新接口MultistreamTracker。该接口包括返回要由使用KCL者应用程序处理的数据流及其配置列表streamConfigList的方法。请注意,正在处理的数据流可以在使用者应用程序运行时进行更改。 streamConfigList定期调用KCL,以了解要处理的数据流的变化。

    • streamConfigList填充StreamConfig列表。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

或者,MultiStreamTracker如果你想实现一个同时处理多个流的 ConfigsBuilder 使用KCL者应用程序,你可以使用进行初始化。

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • 通过为使用KCL者应用程序实现多流支持,应用程序租赁表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。

  • 在实现对KCL消费者应用程序的多流支持时, leaseKey 采用以下结构:account-id:StreamName:streamCreationTimestamp:ShardId。例如,111111111:multiStreamTest-1:12345:shardId-000000000336

重要

当您的现有使用KCL者应用程序配置为仅处理一个数据流时,leaseKey(这是租赁表的分区键)就是分片 ID。如果您将现有的使用KCL者应用程序重新配置为处理多个数据流,则会破坏您的租赁表,因为leaseKey结构必须如下所示:account-id:StreamName:StreamCreationTimestamp:ShardId以支持多流。