翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
コンシューマーを 1.x KCL から 2.x KCL に移行する
このトピックでは、Kinesis Client Library () のバージョン 1.x と 2.x の違いについて説明しますKCL。また、コンシューマーを のバージョン 1.x からバージョン 2.x に移行する方法も示しますKCL。クライアントを移行すると、最後にチェックポイントが作成された場所からレコードの処理が開始されます。
のバージョン 2.0 では、次のインターフェイスの変更KCLが導入されています。
KCL インターフェイスの変更 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
KCL 1.x インターフェイス | KCL 2.0 インターフェイス | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
software.amazon.kinesis.processor.ShardRecordProcessor 内に折りたたみ |
レコードプロセッサを移行する
次の例は、1.x KCL に実装されたレコードプロセッサを示しています。
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
レコードプロセッサのクラスを移行するには
-
インターフェイスを
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
およびcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
からsoftware.amazon.kinesis.processor.ShardRecordProcessor
に変更します。以下に例を示します。// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
import
メソッドinitialize
とメソッドのprocessRecords
ステートメントを更新します。// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
shutdown
メソッドを以下の新しいメソッドに置き換えます。leaseLost
、shardEnded
、およびshutdownRequested
。// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。
package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
レコードプロセッサファクトリーを移行する
レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。1.x KCL ファクトリの例を次に示します。
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
レコードプロセッサファクトリーを移行するには
-
実装されているインターフェイスを
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
からsoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
に変更します。以下に例を示します。// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
createProcessor
の戻り署名を変更します。// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
以下は、2.0 のレコードプロセッサファクトリーの例です。
package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }
ワーカーを移行する
のバージョン 2.0 ではKCL、 という新しいクラスが Worker
クラスをScheduler
置き換えます。1.x KCL ワーカーの例を次に示します。
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
ワーカーを移行するには
-
Worker
クラスのimport
ステートメントをScheduler
クラスとConfigsBuilder
クラスのインポートステートメントに変更します。// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
次の例に示すように、
ConfigsBuilder
とScheduler
を作成します。KinesisClientUtil
を使用してKinesisAsyncClient
を作成し、KinesisAsyncClient
でmaxConcurrency
を設定することをお勧めします。重要
すべてのリースと
KinesisAsyncClient
の追加使用のための十分な高いmaxConcurrency
を持つようKinesisAsyncClient
を設定しないと、Amazon Kinesis Client で非常に大きなレイテンシーが発生する可能性があります。import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Amazon Kinesis クライアントを設定する
Kinesis Client Library のリリース 2.0 では、クライアントの設定が単一の設定クラス (KinesisClientLibConfiguration
) から 6 つの設定クラスに移行されました。次の表で移行を説明します。
設定フィールドとその新しいクラス | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
元のフィールド | 新しい設定クラス | 説明 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
applicationName |
ConfigsBuilder |
KCL アプリケーションの名前。tableName および consumerName のデフォルトとして使用されます。 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tableName |
ConfigsBuilder |
Amazon DynamoDB リーステーブルで使用されるテーブル名の上書きを許可します。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamName |
ConfigsBuilder |
このアプリケーションがレコードを処理するストリームの名前。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisEndpoint |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBEndpoint |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialPositionInStreamExtended |
RetrievalConfig |
がアプリケーションの最初の実行からレコードの取得KCLを開始するシャード内の場所。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisCredentialsProvider |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBCredentialsProvider |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloudWatchCredentialsProvider |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
failoverTimeMillis |
LeaseManagementConfig | リース所有者が失敗したとみなすまでの経過時間 (ミリ秒)。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workerIdentifier |
ConfigsBuilder |
このアプリケーションプロセッサのインスタンス化を表す一意の識別子。一意である必要があります。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardSyncIntervalMillis |
LeaseManagementConfig | シャード同期コールの間隔。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxRecords |
PollingConfig |
Kinesis が返すレコードの最大数の設定を許可します。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
このオプションは削除されました。アイドル時間の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
設定すると、Kinesis から提供されたレコードがない場合でもレコードプロセッサが呼び出されます。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
parentShardPollIntervalMillis |
CoordinatorConfig |
親シャードが完了したかどうかを確認するためにレコードプロセッサがポーリングを行う頻度。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
設定すると、子リースの処理が開始されると即時にリースが削除されます。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ignoreUnexpectedChildShards |
LeaseManagementConfig |
設定すると、開いているシャードがある子シャードは無視されます。これは、主に DynamoDB Streams 用です。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisClientConfig |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBClientConfig |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloudWatchClientConfig |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
taskBackoffTimeMillis |
LifecycleConfig |
失敗したタスクを再試行するまでの待機時間。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsBufferTimeMillis |
MetricsConfig |
CloudWatch メトリクスの発行を制御します。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsMaxQueueSize |
MetricsConfig |
CloudWatch メトリクスの発行を制御します。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsLevel |
MetricsConfig |
CloudWatch メトリクスの発行を制御します。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsEnabledDimensions |
MetricsConfig |
CloudWatch メトリクスの発行を制御します。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
このオプションは削除されました。チェックポイントシーケンス番号の検証を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
regionName |
ConfigsBuilder |
このオプションは削除されました。クライアント設定の削除を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeasesForWorker |
LeaseManagementConfig |
アプリケーションの単一のインスタンスが受け入れるリースの最大数。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
アプリケーションが同時にスティールを試みるリースの最大数。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialLeaseTableReadCapacity |
LeaseManagementConfig |
Kinesis Client Library IOPsが新しい DynamoDB リーステーブルを作成する必要がある場合に使用される DynamoDB 読み取り。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
Kinesis Client Library IOPsが新しい DynamoDB リーステーブルを作成する必要がある場合に使用される DynamoDB 読み取り。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialPositionInStreamExtended |
LeaseManagementConfig | アプリケーションが読み取りを開始するストリーム内の初期位置。これは最初のリースの作成時にのみ使用されます。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
リーステーブルに既存のリースがある場合、シャードデータの同期を無効にします。TODO: KinesisEco-438 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardPrioritization |
CoordinatorConfig |
どのシャードの優先順位付けを使用するか。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shutdownGraceMillis |
該当なし | このオプションは削除されました。 MultiLang 「削除」を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
timeoutInSeconds |
該当なし | このオプションは削除されました。 MultiLang 「削除」を参照してください。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
retryGetRecordsInSeconds |
PollingConfig |
失敗の GetRecords 試行間の遅延を設定します。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxGetRecordsThreadPool |
PollingConfig |
に使用されるスレッドプールのサイズ GetRecords。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeaseRenewalThreads |
LeaseManagementConfig |
リース更新スレッドプールのサイズを制御します。アプリケーションが処理するリースの数が多いほど、このプールも大きくする必要があります。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
recordsFetcherFactory |
PollingConfig |
ストリームから取得するフェッチャーを作成するために使用されるファクトリーの置換を許可します。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logWarningForTaskAfterMillis |
LifecycleConfig |
タスクが完了していない場合に警告がログに記録されるまでの待機期間。 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
listShardsBackoffTimeInMillis |
RetrievalConfig |
障害が発生した場合に ListShards を呼び出す間隔 (ミリ秒)。 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxListShardsRetryAttempts |
RetrievalConfig |
失敗とみなすまでの ListShards の再試行の最大回数。 |
アイドル時間の削除
の 1.x バージョンではKCL、 は 2 つの数量idleTimeBetweenReadsInMillis
に対応していました。
-
タスクの送信チェックの間隔。
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
を設定することで、タスク間の間隔を設定できるようになりました。 -
Kinesis Data Streams から返されるレコードがない場合に休止状態になるまでの時間。バージョン 2.0 では、拡張ファンアウトのレコードはそれぞれのレトリバーからプッシュされます。シャードコンシューマーのアクティビティは、プッシュされたリクエストが到着した場合にのみ発生します。
クライアント設定の削除
バージョン 2.0 では、 はクライアントを作成しKCLなくなりました。有効なクライアントの提供はユーザーに任されます。この変更により、クライアントの作成を制御するすべての設定パラメータが削除されました。これらのパラメータが必要な場合は、クライアントを ConfigsBuilder
に提供する前にクライアントで設定できます。
削除されたフィールド | 同等の設定 |
---|---|
kinesisEndpoint |
優先エンドポイント SDKKinesisAsyncClient を使用して を設定しますKinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build() 。 |
dynamoDBEndpoint |
優先エンドポイント SDKDynamoDbAsyncClient を使用して を設定しますDynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build() 。 |
kinesisClientConfig |
必要な設定SDKKinesisAsyncClient で を設定しますKinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build() 。 |
dynamoDBClientConfig |
必要な設定SDKDynamoDbAsyncClient で を設定しますDynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build() 。 |
cloudWatchClientConfig |
必要な設定SDKCloudWatchAsyncClient で を設定しますCloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build() 。 |
regionName |
優先リージョンSDKで を設定します。これはすべてのSDKクライアントで同じです。例えば、KinesisAsyncClient.builder().region(Region.US_WEST_2).build() です。 |