コンシューマーを 1.x KCL から 2.x KCL に移行する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

コンシューマーを 1.x KCL から 2.x KCL に移行する

このトピックでは、Kinesis Client Library () のバージョン 1.x と 2.x の違いについて説明しますKCL。また、コンシューマーを のバージョン 1.x からバージョン 2.x に移行する方法も示しますKCL。クライアントを移行すると、最後にチェックポイントが作成された場所からレコードの処理が開始されます。

のバージョン 2.0 では、次のインターフェイスの変更KCLが導入されています。

KCL インターフェイスの変更
KCL 1.x インターフェイス KCL 2.0 インターフェイス
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware software.amazon.kinesis.processor.ShardRecordProcessor 内に折りたたみ

レコードプロセッサを移行する

次の例は、1.x KCL に実装されたレコードプロセッサを示しています。

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
レコードプロセッサのクラスを移行するには
  1. インターフェイスを com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor および com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware から software.amazon.kinesis.processor.ShardRecordProcessor に変更します。以下に例を示します。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. import メソッド initialize とメソッドの processRecords ステートメントを更新します。

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. shutdown メソッドを以下の新しいメソッドに置き換えます。leaseLostshardEnded、および shutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

レコードプロセッサファクトリーを移行する

レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。1.x KCL ファクトリの例を次に示します。

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
レコードプロセッサファクトリーを移行するには
  1. 実装されているインターフェイスを com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory から software.amazon.kinesis.processor.ShardRecordProcessorFactory に変更します。以下に例を示します。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. createProcessor の戻り署名を変更します。

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

以下は、2.0 のレコードプロセッサファクトリーの例です。

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

ワーカーを移行する

のバージョン 2.0 ではKCL、 という新しいクラスが Worker クラスをScheduler置き換えます。1.x KCL ワーカーの例を次に示します。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
ワーカーを移行するには
  1. Worker クラスの import ステートメントを Scheduler クラスと ConfigsBuilder クラスのインポートステートメントに変更します。

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. 次の例に示すように、ConfigsBuilderScheduler を作成します。

    KinesisClientUtil を使用して KinesisAsyncClient を作成し、KinesisAsyncClientmaxConcurrency を設定することをお勧めします。

    重要

    すべてのリースと KinesisAsyncClient の追加使用のための十分な高い maxConcurrency を持つよう KinesisAsyncClient を設定しないと、Amazon Kinesis Client で非常に大きなレイテンシーが発生する可能性があります。

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Amazon Kinesis クライアントを設定する

Kinesis Client Library のリリース 2.0 では、クライアントの設定が単一の設定クラス (KinesisClientLibConfiguration) から 6 つの設定クラスに移行されました。次の表で移行を説明します。

設定フィールドとその新しいクラス
元のフィールド 新しい設定クラス 説明
applicationName ConfigsBuilder KCL アプリケーションの名前。tableName および consumerName のデフォルトとして使用されます。
tableName ConfigsBuilder Amazon DynamoDB リーステーブルで使用されるテーブル名の上書きを許可します。
streamName ConfigsBuilder このアプリケーションがレコードを処理するストリームの名前。
kinesisEndpoint ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
dynamoDBEndpoint ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
initialPositionInStreamExtended RetrievalConfig がアプリケーションの最初の実行からレコードの取得KCLを開始するシャード内の場所。
kinesisCredentialsProvider ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
dynamoDBCredentialsProvider ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
cloudWatchCredentialsProvider ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
failoverTimeMillis LeaseManagementConfig リース所有者が失敗したとみなすまでの経過時間 (ミリ秒)。
workerIdentifier ConfigsBuilder このアプリケーションプロセッサのインスタンス化を表す一意の識別子。一意である必要があります。
shardSyncIntervalMillis LeaseManagementConfig シャード同期コールの間隔。
maxRecords PollingConfig Kinesis が返すレコードの最大数の設定を許可します。
idleTimeBetweenReadsInMillis CoordinatorConfig このオプションは削除されました。アイドル時間の削除を参照してください。
callProcessRecordsEvenForEmptyRecordList ProcessorConfig 設定すると、Kinesis から提供されたレコードがない場合でもレコードプロセッサが呼び出されます。
parentShardPollIntervalMillis CoordinatorConfig 親シャードが完了したかどうかを確認するためにレコードプロセッサがポーリングを行う頻度。
cleanupLeasesUponShardCompletion LeaseManagementConfig 設定すると、子リースの処理が開始されると即時にリースが削除されます。
ignoreUnexpectedChildShards LeaseManagementConfig 設定すると、開いているシャードがある子シャードは無視されます。これは、主に DynamoDB Streams 用です。
kinesisClientConfig ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
dynamoDBClientConfig ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
cloudWatchClientConfig ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
taskBackoffTimeMillis LifecycleConfig 失敗したタスクを再試行するまでの待機時間。
metricsBufferTimeMillis MetricsConfig CloudWatch メトリクスの発行を制御します。
metricsMaxQueueSize MetricsConfig CloudWatch メトリクスの発行を制御します。
metricsLevel MetricsConfig CloudWatch メトリクスの発行を制御します。
metricsEnabledDimensions MetricsConfig CloudWatch メトリクスの発行を制御します。
validateSequenceNumberBeforeCheckpointing CheckpointConfig このオプションは削除されました。チェックポイントシーケンス番号の検証を参照してください。
regionName ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
maxLeasesForWorker LeaseManagementConfig アプリケーションの単一のインスタンスが受け入れるリースの最大数。
maxLeasesToStealAtOneTime LeaseManagementConfig アプリケーションが同時にスティールを試みるリースの最大数。
initialLeaseTableReadCapacity LeaseManagementConfig Kinesis Client Library IOPsが新しい DynamoDB リーステーブルを作成する必要がある場合に使用される DynamoDB 読み取り。
initialLeaseTableWriteCapacity LeaseManagementConfig Kinesis Client Library IOPsが新しい DynamoDB リーステーブルを作成する必要がある場合に使用される DynamoDB 読み取り。
initialPositionInStreamExtended LeaseManagementConfig アプリケーションが読み取りを開始するストリーム内の初期位置。これは最初のリースの作成時にのみ使用されます。
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig リーステーブルに既存のリースがある場合、シャードデータの同期を無効にします。TODO: KinesisEco-438
shardPrioritization CoordinatorConfig どのシャードの優先順位付けを使用するか。
shutdownGraceMillis 該当なし このオプションは削除されました。 MultiLang 「削除」を参照してください。
timeoutInSeconds 該当なし このオプションは削除されました。 MultiLang 「削除」を参照してください。
retryGetRecordsInSeconds PollingConfig 失敗の GetRecords 試行間の遅延を設定します。
maxGetRecordsThreadPool PollingConfig に使用されるスレッドプールのサイズ GetRecords。
maxLeaseRenewalThreads LeaseManagementConfig リース更新スレッドプールのサイズを制御します。アプリケーションが処理するリースの数が多いほど、このプールも大きくする必要があります。
recordsFetcherFactory PollingConfig ストリームから取得するフェッチャーを作成するために使用されるファクトリーの置換を許可します。
logWarningForTaskAfterMillis LifecycleConfig タスクが完了していない場合に警告がログに記録されるまでの待機期間。
listShardsBackoffTimeInMillis RetrievalConfig 障害が発生した場合に ListShards を呼び出す間隔 (ミリ秒)。
maxListShardsRetryAttempts RetrievalConfig 失敗とみなすまでの ListShards の再試行の最大回数。

アイドル時間の削除

の 1.x バージョンではKCL、 は 2 つの数量idleTimeBetweenReadsInMillisに対応していました。

  • タスクの送信チェックの間隔。CoordinatorConfig#shardConsumerDispatchPollIntervalMillis を設定することで、タスク間の間隔を設定できるようになりました。

  • Kinesis Data Streams から返されるレコードがない場合に休止状態になるまでの時間。バージョン 2.0 では、拡張ファンアウトのレコードはそれぞれのレトリバーからプッシュされます。シャードコンシューマーのアクティビティは、プッシュされたリクエストが到着した場合にのみ発生します。

クライアント設定の削除

バージョン 2.0 では、 はクライアントを作成しKCLなくなりました。有効なクライアントの提供はユーザーに任されます。この変更により、クライアントの作成を制御するすべての設定パラメータが削除されました。これらのパラメータが必要な場合は、クライアントを ConfigsBuilder に提供する前にクライアントで設定できます。

削除されたフィールド 同等の設定
kinesisEndpoint 優先エンドポイント SDKKinesisAsyncClientを使用して を設定しますKinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()
dynamoDBEndpoint 優先エンドポイント SDKDynamoDbAsyncClientを使用して を設定しますDynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()
kinesisClientConfig 必要な設定SDKKinesisAsyncClientで を設定しますKinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()
dynamoDBClientConfig 必要な設定SDKDynamoDbAsyncClientで を設定しますDynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()
cloudWatchClientConfig 必要な設定SDKCloudWatchAsyncClientで を設定しますCloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()
regionName 優先リージョンSDKで を設定します。これはすべてのSDKクライアントで同じです。例えば、KinesisAsyncClient.builder().region(Region.US_WEST_2).build()です。