

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# コンシューマーを KCL 1.x から KCL 2.x に移行する
<a name="kcl-migration"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

このトピックでは、Kinesis Client Library (KCL) のバージョン 1.x と 2.x の違いについて説明します。また、コンシューマーを KCL のバージョン 1.x からバージョン 2.x に移行する方法も示します。クライアントを移行すると、最後にチェックポイントが作成された場所からレコードの処理が開始されます。

KCL のバージョン 2.0 では、以下のインターフェイスの変更が導入されています。


**KCL インターフェイスの変更**  

| KCL 1.x インターフェイス | KCL 2.0 インターフェイス | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | software.amazon.kinesis.processor.ShardRecordProcessor 内に折りたたみ | 

**Topics**
+ [レコードプロセッサを移行する](#recrod-processor-migration)
+ [レコードプロセッサファクトリーを移行する](#recrod-processor-factory-migration)
+ [ワーカーを移行する](#worker-migration)
+ [Amazon Kinesis Client を設定する](#client-configuration)
+ [アイドル時間の削除](#idle-time-removal)
+ [クライアント設定の削除](#client-configuration-removals)

## レコードプロセッサを移行する
<a name="recrod-processor-migration"></a>

以下の例は、KCL1.x に実装されたレコードプロセッサを示しています。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**レコードプロセッサのクラスを移行するには**

1. インターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` および `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` から `software.amazon.kinesis.processor.ShardRecordProcessor` に変更します。以下に例を示します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. `import` メソッド `initialize` とメソッドの `processRecords` ステートメントを更新します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. `shutdown` メソッドを以下の新しいメソッドに置き換えます。`leaseLost`、`shardEnded`、および `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## レコードプロセッサファクトリーを移行する
<a name="recrod-processor-factory-migration"></a>

レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。以下に示しているのは、KCL 1.x ファクトリーの例です。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**レコードプロセッサファクトリーを移行するには**

1. 実装されているインターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` から `software.amazon.kinesis.processor.ShardRecordProcessorFactory` に変更します。以下に例を示します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. `createProcessor` の戻り署名を変更します。

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

以下は、2.0 のレコードプロセッサファクトリーの例です。

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## ワーカーを移行する
<a name="worker-migration"></a>

バージョン 2.0 の KCL では、新しいクラス `Scheduler` によって `Worker` クラスが置き換えられます。KCL 1.x のワーカーの例を次に示します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**ワーカーを移行するには**

1. `Worker` クラスの `import` ステートメントを `Scheduler` クラスと `ConfigsBuilder` クラスのインポートステートメントに変更します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 次の例に示すように、`ConfigsBuilder` と `Scheduler` を作成します。

   `KinesisClientUtil` を使用して `KinesisAsyncClient` を作成し、`KinesisAsyncClient` で `maxConcurrency` を設定することをお勧めします。
**重要**  
すべてのリースと `KinesisAsyncClient` の追加使用のための十分な高い `maxConcurrency` を持つよう `KinesisAsyncClient` を設定しないと、Amazon Kinesis Client で非常に大きなレイテンシーが発生する可能性があります。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Amazon Kinesis Client を設定する
<a name="client-configuration"></a>

Kinesis Client Library のリリース 2.0 では、クライアントの設定が単一の設定クラス (`KinesisClientLibConfiguration`) から 6 つの設定クラスに移行されました。次の表で移行を説明します。


**設定フィールドとその新しいクラス**  

| 元のフィールド | 新しい設定クラス | 説明 | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | この KCL アプリケーションの名前。tableName および consumerName のデフォルトとして使用されます。 | 
| tableName | ConfigsBuilder | Amazon DynamoDB リーステーブルで使用されるテーブル名の上書きを許可します。 | 
| streamName | ConfigsBuilder | このアプリケーションがレコードを処理するストリームの名前。 | 
| kinesisEndpoint | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| dynamoDBEndpoint | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| initialPositionInStreamExtended | RetrievalConfig | アプリケーションの初期実行から開始し、KCL がレコードの取得を開始するシャード内の場所。 | 
| kinesisCredentialsProvider | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| dynamoDBCredentialsProvider | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| cloudWatchCredentialsProvider | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| failoverTimeMillis | LeaseManagementConfig | リース所有者が失敗したとみなすまでの経過時間 (ミリ秒)。 | 
| workerIdentifier | ConfigsBuilder | このアプリケーションプロセッサのインスタンス化を表す一意の識別子。一意である必要があります。 | 
| shardSyncIntervalMillis | LeaseManagementConfig | シャード同期コールの間隔。 | 
| maxRecords | PollingConfig | Kinesis が返すレコードの最大数の設定を許可します。 | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | このオプションは削除されました。アイドル時間の削除を参照してください。 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | 設定すると、Kinesis から提供されたレコードがない場合でもレコードプロセッサが呼び出されます。 | 
| parentShardPollIntervalMillis | CoordinatorConfig | 親シャードが完了したかどうかを確認するためにレコードプロセッサがポーリングを行う頻度。 | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | 設定すると、子リースの処理が開始されると即時にリースが削除されます。 | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | 設定すると、開いているシャードがある子シャードは無視されます。これは、主に DynamoDB Streams 用です。 | 
| kinesisClientConfig | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| dynamoDBClientConfig | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| cloudWatchClientConfig | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| taskBackoffTimeMillis | LifecycleConfig | 失敗したタスクを再試行するまでの待機時間。 | 
| metricsBufferTimeMillis | MetricsConfig | CloudWatch メトリックスの発行を制御します。 | 
| metricsMaxQueueSize | MetricsConfig | CloudWatch メトリックスの発行を制御します。 | 
| metricsLevel | MetricsConfig | CloudWatch メトリックスの発行を制御します。 | 
| metricsEnabledDimensions | MetricsConfig | CloudWatch メトリックスの発行を制御します。 | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | このオプションは削除されました。チェックポイントシーケンス番号の検証を参照してください。 | 
| regionName | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| maxLeasesForWorker | LeaseManagementConfig | アプリケーションの単一のインスタンスが受け入れるリースの最大数。 | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | アプリケーションが同時にスティールを試みるリースの最大数。 | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Kinesis Client Library で新しい DynamoDB リーステーブルを作成する場合に使用する DynamoDB 読み取り IOPS。 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Kinesis Client Library で新しい DynamoDB リーステーブルを作成する場合に使用する DynamoDB 読み取り IOPS。 | 
| initialPositionInStreamExtended | LeaseManagementConfig | アプリケーションが読み取りを開始するストリーム内の初期位置。これは最初のリースの作成時にのみ使用されます。 | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | リーステーブルに既存のリースがある場合、シャードデータの同期を無効にします。TODO: KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | どのシャードの優先順位付けを使用するか。 | 
| shutdownGraceMillis | 該当なし | このオプションは削除されました。MultiLang の削除を参照してください。 | 
| timeoutInSeconds | 該当なし | このオプションは削除されました。MultiLang の削除を参照してください。 | 
| retryGetRecordsInSeconds | PollingConfig | GetRecords が失敗した場合の試行間隔の遅延時間を設定します。 | 
| maxGetRecordsThreadPool | PollingConfig | GetRecords に使用されるスレッドプールのサイズ。 | 
| maxLeaseRenewalThreads | LeaseManagementConfig | リース更新スレッドプールのサイズを制御します。アプリケーションが処理するリースの数が多いほど、このプールも大きくする必要があります。 | 
| recordsFetcherFactory | PollingConfig | ストリームから取得するフェッチャーを作成するために使用されるファクトリーの置換を許可します。 | 
| logWarningForTaskAfterMillis | LifecycleConfig | タスクが完了していない場合に警告がログに記録されるまでの待機期間。 | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 障害が発生した場合に ListShards を呼び出す間隔 (ミリ秒)。 | 
| maxListShardsRetryAttempts | RetrievalConfig | 失敗とみなすまでの ListShards の再試行の最大回数。 | 

## アイドル時間の削除
<a name="idle-time-removal"></a>

KCL の 1.x バージョンでは、`idleTimeBetweenReadsInMillis` は 2 つの数量に相当します。
+ タスクの送信チェックの間隔。`CoordinatorConfig#shardConsumerDispatchPollIntervalMillis` を設定することで、タスク間の間隔を設定できるようになりました。
+ Kinesis Data Streams から返されるレコードがない場合に休止状態になるまでの時間。バージョン 2.0 では、拡張ファンアウトのレコードはそれぞれのレトリバーからプッシュされます。シャードコンシューマーのアクティビティは、プッシュされたリクエストが到着した場合にのみ発生します。

## クライアント設定の削除
<a name="client-configuration-removals"></a>

バージョン 2.0 では、KCL はクライアントを作成しなくなりました。有効なクライアントの提供はユーザーに任されます。この変更により、クライアントの作成を制御するすべての設定パラメータが削除されました。これらのパラメータが必要な場合は、クライアントを `ConfigsBuilder` に提供する前にクライアントで設定できます。


****  

| 削除されたフィールド | 同等の設定 | 
| --- | --- | 
| kinesisEndpoint | 優先エンドポイントを指定した SDK KinesisAsyncClient の設定: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()。 | 
| dynamoDBEndpoint | 優先エンドポイントを指定した SDK DynamoDbAsyncClient の設定: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()。 | 
| kinesisClientConfig | 必要な設定を指定した SDK KinesisAsyncClient の設定: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build() | 
| dynamoDBClientConfig | 必要な設定を指定した SDK DynamoDbAsyncClient の設定: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build() | 
| cloudWatchClientConfig | 必要な設定を指定した SDK CloudWatchAsyncClient の設定: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build() | 
| regionName | 優先リージョンを指定して SDK を設定します。これは、すべての SDK クライアントで同じです。例えば、KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build()。 | 