

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Java で KCL を使用してコンシューマーを開発する
<a name="develop-kcl-consumers-java"></a>

## 前提条件
<a name="develop-kcl-consumers-java-prerequisites"></a>

KCL 3.x の使用を始める前に、以下のものが揃っていることを確認してください。
+ Java Development Kit (JDK) 8 以降
+ AWS SDK for Java 2.x
+ 依存関係管理用の Maven または Gradle

KCL は、ワーカーが実行されているコンピューティングホストから CPU 使用率などの CPU 使用率メトリクスを収集し、ワーカー間でリソース使用量が均等になるように負荷を分散します。KCL がワーカーから CPU 使用率メトリクスを収集できるようにするには、次の前提条件を満たす必要があります。

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ オペレーティングシステムは Linux OS である必要があります。
+ EC2 インスタンスで [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html) を有効にする必要があります。

 **Amazon Elastic Container Service (Amazon ECS) on Amazon EC2**
+ オペレーティングシステムは Linux OS である必要があります。
+ [ECS タスクメタデータエンドポイントのバージョン 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html) を有効にする必要があります。
+ Amazon ECS コンテナエージェントのバージョンは 1.39.0 以降である必要があります。

 **での Amazon ECS AWS Fargate**
+ [Fargate タスクメタデータエンドポイントのバージョン 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) を有効にする必要があります。Fargate プラットフォームバージョン 1.4.0 以降を使用している場合、これはデフォルトで有効になっています。
+ Fargate プラットフォームバージョン 1.4.0 以降

 **Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2** 
+ オペレーティングシステムは Linux OS である必要があります。

 **での Amazon EKS AWS Fargate**
+ Fargate プラットフォーム 1.3.0 以降

**重要**  
KCL がワーカーから CPU 使用率メトリクスを収集できない場合、ワーカーごとのスループットに基づいてリースを割り当て、フリート内のワーカー間で負荷を分散する方式にフォールバックします。詳細については、「[KCL がワーカーにリースを割り当て、負荷を分散する方法](kcl-dynamoDB.md#kcl-assign-leases)」を参照してください。

## 依存関係をインストールして追加する
<a name="develop-kcl-consumers-java-installation"></a>

Maven を使用している場合は、以下の依存関係を `pom.xml` ファイルに追加します。3.x.x を最新の KCL バージョンに置き換えていることを確認してください。

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Gradle を使用している場合は、以下を `build.gradle` ファイルに追加します。3.x.x を最新の KCL バージョンに置き換えていることを確認してください。

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

最新の KCL バージョンは、[Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client) で確認できます。

## コンシューマーを実装する
<a name="develop-kcl-consumers-java-implemetation"></a>

KCL コンシューマーアプリケーションは、次の主要コンポーネントで構成されます。

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [スケジューラー](#implementation-scheduler)
+ [メインコンシューマーアプリケーション](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor は、Kinesis データストリームのレコードを処理するビジネスロジックが実装される中核コンポーネントです。アプリケーションが Kinesis ストリームから受け取ったデータをどのように処理するかを定義します。

主な役割:
+ シャードの処理を初期化する
+ Kinesis ストリームからのレコードのバッチを処理する
+ シャードの処理をシャットダウンする (シャードが分割または結合された場合、またはリースが別のホストに引き継がれる場合など)
+ チェックポイントを処理して進行状況を追跡する

以下に実装例を示します。

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

以下に、この例で使用されている各メソッドの詳細な説明を示します。

**initialize(InitializationInput initializationInput)**
+ 目的: レコードを処理するために必要なリソースや状態を設定します。
+ 呼び出されるタイミング: KCL がこのレコードプロセッサにシャードを割り当てたときに 1 回だけ呼び出されます。
+ キーポイント:
  + `initializationInput.shardId()`: このプロセッサが処理するシャードの ID。
  + `initializationInput.extendedSequenceNumber()`: 処理を開始するシーケンス番号。

**processRecords(ProcessRecordsInput processRecordsInput)**
+ 目的: 受信レコードを処理し、必要に応じてチェックポイントの進捗を処理します。
+ 呼び出されるタイミング: レコードプロセッサがそのシャードのリースを保持している間、繰り返し呼び出されます。
+ キーポイント:
  + `processRecordsInput.records()`: 処理するレコードのリスト。
  + `processRecordsInput.checkpointer()`: チェックポイントの進捗の処理に使用されます。
  + KCL の処理が失敗しないよう、レコード処理中に発生する例外を必ず処理してください。
  + このメソッドはべき等である必要があります。予期しないワーカーのクラッシュや再起動により、チェックポイントされていないレコードが再度処理されるなど、同じレコードが複数回処理される可能性があるためです。
  + データ整合性を確保するため、チェックポイントを実行する前に、バッファされたデータを必ずすべてフラッシュしてください。

**leaseLost(LeaseLostInput leaseLostInput)**
+ 目的: このシャードの処理に固有のリソースをクリーンアップします。
+ 呼び出されるタイミング: 別のスケジューラがこのシャードのリースを引き継いだときに呼び出されます。
+ キーポイント:
  + このメソッドではチェックポイントを実行できません。

**shardEnded(ShardEndedInput shardEndedInput)**
+ 目的: このシャードおよびチェックポイントの処理を完了します。
+ 呼び出されるタイミング: シャードが分割または結合され、このシャードのすべてのデータが処理されたことを示すタイミングで呼び出されます。
+ キーポイント:
  + `shardEndedInput.checkpointer()`: 最終的なチェックポイント処理を実行するために使用します。
  + このメソッドでは、処理を完了するためにチェックポイントの実行が必須です。
  + ここでフラッシュ処理とチェックポイントを行わないと、シャードが再度オープンされた際に、データ損失や重複処理が発生する可能性があります。

**shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)**
+ 目的: KCL のシャットダウン時にチェックポイントを実行し、リソースをクリーンアップします。
+ 呼び出されるタイミング: KCL がシャットダウンする際 (例: アプリケーションが終了するとき) に呼び出されます。
+ キーポイント:
  + `shutdownRequestedInput.checkpointer()`: シャットダウン前にチェックポイントを実行するために使用します。
  + アプリケーションが停止する前に進捗が保存されるよう、このメソッド内でチェックポイント処理を実装していることを確認してください。
  + ここでデータのフラッシュとチェックポイントを行わないと、アプリケーションの再起動時にデータ損失やレコードの再処理が発生する可能性があります。

**重要**  
KCL 3.x では、前のワーカーがシャットダウンされる前にチェックポイントを実行することで、リースが別のワーカーに引き継がれてもデータの再処理が最小限で済むようになっています。`shutdownRequested()` メソッドにチェックポイントロジックを実装していない場合、このメリットは得られません。`shutdownRequested()` メソッド内にチェックポイントロジックが実装されていることを確認してください。

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory は、新しい RecordProcessor インスタンスを作成する役割を担います。KCL は、このファクトリを使用して、アプリケーションが処理する必要のある各シャードに対して新しい RecordProcessor を作成します。

主な役割:
+ 必要に応じて新しい RecordProcessor インスタンスを作成する
+ 各 RecordProcessor が正しく初期化されていることを保証する

以下に実装例を示します。

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

この例では、ファクトリは shardRecordProcessor() が呼び出されるたびに新しい SampleRecordProcessor を作成します。必要に応じて、初期化ロジックを追加する形で拡張できます。

### スケジューラー
<a name="implementation-scheduler"></a>

スケジューラは、KCL アプリケーション内のすべての動作を調整する高レベルのコンポーネントです。データ処理全体のオーケストレーションを担います。

主な役割:
+ RecordProcessor のライフサイクルを管理する
+ シャードのリース管理を処理する
+ チェックポイント処理を調整する
+ アプリケーション内の複数ワーカー間でシャード処理の負荷を分散する
+ 正常なシャットダウンおよびアプリケーション終了シグナルを処理する

スケジューラは通常、メインアプリケーション内で作成および開始されます。スケジューラの実装例は、この後の「メインコンシューマーアプリケーション」セクションで確認できます。

### メインコンシューマーアプリケーション
<a name="implementation-main"></a>

メインコンシューマーアプリケーションは、すべてのコンポーネントを結び付ける役割を果たします。KCL コンシューマーのセットアップ、必要なクライアントの作成、スケジューラの構成、およびアプリケーションのライフサイクル管理を担当します。

主な役割:
+  AWS サービスクライアントの設定 (Kinesis、DynamoDB、CloudWatch)
+ KCL アプリケーションを構成する
+ スケジューラを作成して起動する
+ アプリケーションのシャットダウン処理を行う

以下に実装例を示します。

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL は、デフォルトで専用スループットを持つ拡張ファンアウト (EFO) コンシューマーを作成します。拡張ファンアウトの詳細については、[専用スループットを備えた拡張ファンアウトを開発する](enhanced-consumers.md) を参照してください。コンシューマーが 2 未満の場合、または 200 ミリ秒未満の読み取り伝搬遅延を必要としない場合は、共有スループットコンシューマーを使用するために、スケジューラオブジェクトで次の設定を行う必要があります。

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

次のコードは、共有スループットコンシューマーを使用するスケジューラオブジェクトを作成する例です。

**インポート**:

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**コード**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```