本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将消费者从 KCL 1.x 迁移到 KCL 2.x
注意
Kinesis 客户端库 (KCL) 版本 1.x 和 2.x 已过时。我们建议迁移到 3.x KCL 版,该版本提供了改进的性能和新功能。有关最新的KCL文档和迁移指南,请参阅使用 Kinesis 客户端库。
本主题解释了 Kinesis 客户端库 1.x 和 2.x 版本之间的区别 ()。KCL它还向您展示如何将使用者从 KCL 的版本 1.x 迁移到版本 2.x。在迁移您的客户端后,它将从最后一个检查点位置开始处理记录。
KCL 的版本 2 引入了以下接口更改:
KCL1.x 接口 | KCL2.0 接口 |
---|---|
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
折叠为 software.amazon.kinesis.processor.ShardRecordProcessor |
迁移记录处理器类
以下示例显示了为 KCL 1.x 实施的记录处理器:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
迁移记录处理器类
-
将接口从
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
和com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
更改为software.amazon.kinesis.processor.ShardRecordProcessor
,如下所示:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
更新
import
和initialize
方法的processRecords
语句。// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
使用以下新方法替换
shutdown
方法:leaseLost
、shardEnded
和shutdownRequested
。// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
下面是记录处理器类的更新版本。
package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
迁移记录处理器工厂
记录处理器工厂负责在获得租赁时创建记录处理器。下面是 KCL 1.x 工厂的示例。
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
迁移记录处理器工厂
-
将已实施的接口从
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
更改为software.amazon.kinesis.processor.ShardRecordProcessorFactory
,如下所示。// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
更改
createProcessor
的返回签名。// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
下面是 2.0 中的记录处理器工厂的示例:
package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }
迁移工作线程
在 KCL 版本 2.0 中,名为 Scheduler
的新类取代了 Worker
类。下面是 KCL 1.x 工作程序的示例。
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
迁移工作程序
-
将
Worker
类的import
语句更改为Scheduler
和ConfigsBuilder
类的导入语句。// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
创建
ConfigsBuilder
和Scheduler
,如以下示例所示。建议您使用
KinesisClientUtil
创建KinesisAsyncClient
,并在KinesisAsyncClient
中配置maxConcurrency
。重要
Amazon Kinesis 户端可能会看到延迟大幅增加,除非您将
KinesisAsyncClient
配置为具有足够高的maxConcurrency
,以允许所有租期以及额外使用KinesisAsyncClient
。import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
配置 Amazon Kinesis 客户端
随着 Kinesis Client Library 版本 2.0 的发布,客户端的配置已从单个配置类(KinesisClientLibConfiguration
)变为 6 个配置类。下表描述了迁移。
原始字段 | 新配置类 | 描述 |
---|---|---|
applicationName |
ConfigsBuilder |
此 KCL 应用程序的名称。用作 tableName 和 consumerName 的默认名称。 |
tableName |
ConfigsBuilder |
允许覆盖用于 Amazon DynamoDB 租赁表的表名称。 |
streamName |
ConfigsBuilder |
此应用程序从其中处理记录的流的名称。 |
kinesisEndpoint |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
dynamoDBEndpoint |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
initialPositionInStreamExtended |
RetrievalConfig |
从应用程序的初始运行KCL开始,从分片中开始获取记录的位置。 |
kinesisCredentialsProvider |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
dynamoDBCredentialsProvider |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
cloudWatchCredentialsProvider |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
failoverTimeMillis |
LeaseManagementConfig | 在您可以将租赁所有者视为已失败之前必须经过的毫秒数。 |
workerIdentifier |
ConfigsBuilder |
表示应用程序处理器的这种实例化的唯一标识符。此值必须唯一。 |
shardSyncIntervalMillis |
LeaseManagementConfig | 分片同步调用之间的时间。 |
maxRecords |
PollingConfig |
允许设置 Kinesis 返回的最大记录数。 |
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
此选项已删除。请参阅“闲置时间删除”。 |
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
如果设置,即使 Kinesis 中未提供任何记录,也会调用记录处理器。 |
parentShardPollIntervalMillis |
CoordinatorConfig |
记录处理器应轮询多少时间才能查看是否已完成父分片。 |
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
如果设置,只要子租赁已开始处理,即可删除租赁。 |
ignoreUnexpectedChildShards |
LeaseManagementConfig |
如果设置,将忽略具有打开的分片的子分片。这主要适用于 DynamoDB Streams。 |
kinesisClientConfig |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
dynamoDBClientConfig |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
cloudWatchClientConfig |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
taskBackoffTimeMillis |
LifecycleConfig |
等待重试失败任务的时间。 |
metricsBufferTimeMillis |
MetricsConfig |
控制 CloudWatch 指标发布。 |
metricsMaxQueueSize |
MetricsConfig |
控制 CloudWatch 指标发布。 |
metricsLevel |
MetricsConfig |
控制 CloudWatch 指标发布。 |
metricsEnabledDimensions |
MetricsConfig |
控制 CloudWatch 指标发布。 |
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
此选项已删除。请参阅“检查点序列号验证”。 |
regionName |
ConfigsBuilder |
此选项已删除。请参阅“客户端配置删除”。 |
maxLeasesForWorker |
LeaseManagementConfig |
应用程序的单个实例应接受的最大租赁数量。 |
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
应用程序一次应尝试窃取的最大租赁数量。 |
initialLeaseTableReadCapacity |
LeaseManagementConfig |
在 Kinesis 客户端库需要创建新的 DynamoDB 租用表时使用的 DynamoDB IOPs 读取。 |
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
在 Kinesis 客户端库需要创建新的 DynamoDB 租用表时使用的 DynamoDB IOPs 读取。 |
initialPositionInStreamExtended |
LeaseManagementConfig | 应用程序应在流中开始的初始位置。此值仅在创建初始租赁时使用。 |
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
如果租赁表包含现有租赁,请禁用同步的分片数据。TODO: KinesisEco -438 |
shardPrioritization |
CoordinatorConfig |
要使用的分片优先级。 |
shutdownGraceMillis |
不适用 | 此选项已删除。参见 “ MultiLang 移除”。 |
timeoutInSeconds |
不适用 | 此选项已删除。参见 “ MultiLang 移除”。 |
retryGetRecordsInSeconds |
PollingConfig |
配置 GetRecords 尝试失败之间的延迟。 |
maxGetRecordsThreadPool |
PollingConfig |
使用的线程池大小 GetRecords。 |
maxLeaseRenewalThreads |
LeaseManagementConfig |
控制租赁续订线程池的大小。您的应用程序可以容纳的租赁越多,此池应该就越大。 |
recordsFetcherFactory |
PollingConfig |
允许替换用于创建从流中检索的提取程序的工厂。 |
logWarningForTaskAfterMillis |
LifecycleConfig |
任务尚未完成的情况下在记录警告之前要等待的时长。 |
listShardsBackoffTimeInMillis |
RetrievalConfig |
发生故障时在调用 ListShards 之间要等待的时间(以毫秒为单位)。 |
maxListShardsRetryAttempts |
RetrievalConfig |
ListShards 在放弃之前重试的最长时间。 |
闲置时间删除
在 1.x 版本的 KCL 中,idleTimeBetweenReadsInMillis
对应于两个数量:
-
任务分派检查之间的时间量。您现在可以通过设置
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
来在任务之间配置此时间。 -
未从 Kinesis Data Streams 中返回任何记录时的睡眠时间量。在版本 2.0 中,带增强型扇出功能的记录是从其各自的检索器中推送的。分片消费端上的活动仅发生在推送的请求到达时。
客户端配置删除
在 2.0 版本中,KCL 不再创建客户端。这取决于用户提供有效的客户端。进行此更改后,已删除控制客户端创建的所有配置参数。如果需要这些参数,您可以在向 ConfigsBuilder
提供客户端之前在客户端上设置它们。
已删除字段 | 等效配置 |
---|---|
kinesisEndpoint |
SDKKinesisAsyncClient 使用首选端点配置:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis
endpoint>")).build() 。 |
dynamoDBEndpoint |
SDKDynamoDbAsyncClient 使用首选端点配置:DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb
endpoint>")).build() 。 |
kinesisClientConfig |
SDKKinesisAsyncClient 使用所需的配置配置:KinesisAsyncClient.builder().overrideConfiguration(<your
configuration>).build() 。 |
dynamoDBClientConfig |
SDKDynamoDbAsyncClient 使用所需的配置配置:DynamoDbAsyncClient.builder().overrideConfiguration(<your
configuration>).build() 。 |
cloudWatchClientConfig |
SDKCloudWatchAsyncClient 使用所需的配置配置:CloudWatchAsyncClient.builder().overrideConfiguration(<your
configuration>).build() 。 |
regionName |
SDK使用首选区域配置。这对于所有SDK客户来说都是一样的。例如,KinesisAsyncClient.builder().region(Region.US_WEST_2).build() 。 |