

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Java での Kinesis クライアントライブラリコンシューマーを開発する
<a name="kinesis-record-processor-implementation-app-java"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Java について説明します。Javadoc リファレンスを表示するには、[AWS Javadoc topic for Class AmazonKinesisClient](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)を参照してください。

GitHub から Java KCL をダウンロードするには、[Kinesis Client Library (Java)](https://github.com/awslabs/amazon-kinesis-client) にアクセスしてください。Apache Maven で Java KCL を検索するには、[KCL 検索結果](https://search.maven.org/#search|ga|1|amazon-kinesis-client)のページを参照してください。Java KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub の[KCL for Java sample project](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis)ページを参照してください。

このサンプルアプリケーションは [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html) を使用します。ログ設定は、`configure` ファイルで定義されている静的な `AmazonKinesisApplicationSample.java` メソッドを使用して変更できます。Log4j および Java アプリケーションで Apache Commons ログ記録を使用する方法の詳細については、「 *AWS SDK for Java デベロッパーガイド*」の「Log4j AWS を使用したログ記録」を参照してください。 [ Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) 

Java で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

**Topics**
+ [IRecordProcessor メソッドを実装する](#kinesis-record-processor-implementation-interface-java)
+ [IRecordProcessor インターフェイスのクラスファクトリを実装する](#kinesis-record-processor-implementation-factory-java)
+ [ワーカーを作成する](#kcl-java-worker)
+ [設定プロパティを変更する](#kinesis-record-processor-initialization-java)
+ [レコードプロセッサインターフェイスのバージョン 2 に移行する](#kcl-java-v2-migration)

## IRecordProcessor メソッドを実装する
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL は現在、`IRecordProcessor` インターフェイスの 2 つのバージョンをサポートしています。元のインターフェイスは最初のバージョンの KCL で利用可能です。バージョン 2 は KCL バージョン 1.5.0 から利用可能です。両方のインターフェイスが完全にサポートされています。選択するインターフェイスは、お使いのシナリオの要件によって異なります。相違点をすべて確認するには、ローカルに作成した Javadocs、またはソースコードを参照してください。以下のセクションでは、使い始めの最小限の実装を概説します。

**Topics**
+ [オリジナルインターフェイス (バージョン 1)](#kcl-java-interface-original)
+ [更新されたインターフェイス (バージョン 2)](#kcl-java-interface-v2)

### オリジナルインターフェイス (バージョン 1)
<a name="kcl-java-interface-original"></a>

オリジナルな `IRecordProcessor` interface (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。このサンプルでは、開始点として使用できる実装を提供しています (`AmazonKinesisApplicationSampleRecordProcessor.java` を参照してください)。

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**初期化**  
KCL は、レコードプロセッサがインスタンス化されると、`initialize` メソッドを呼び出し、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。Kinesis Data Streams は*少なくとも 1 回*のセマンティクスを使用しています。これは、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、[シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。](kinesis-record-processor-scaling.md)を参照してください。

```
public void initialize(String shardId)
```

**processRecords**  
KCL は、`processRecords` メソッドを呼び出し、`initialize(shardId)` メソッドで指定されたシャードのデータレコードのリストを渡します。レコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。`Record` クラスは、レコードのデータ、シーケンス番号、およびパーティションキーへのアクセスを提供する次のメソッドを公開します。

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

サンプルでは、プライベートメソッド `processRecordsWithRetries` に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、チェックポインタ (`IRecordProcessorCheckpointer`) を `processRecords` に渡すことで、この追跡をユーザーに代わって処理します。レコードプロセッサは、このインターフェイスで `checkpoint` メソッドを呼び出し、シャード内のレコードの処理の進行状況を KCL に知らせます。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが `checkpoint` を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さないと、`checkpoint` への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、`checkpoint` を呼び出す必要があります。レコードプロセッサは、`checkpoint` の各呼び出しで `processRecords` を呼び出す必要はありません。たとえば、プロセッサは、`checkpoint` を 3 回呼び出すたびに、`processRecords` を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして `checkpoint` に指定できます。この場合、KCL は、すべてのレコードがそのレコードまで処理されたと見なします。

このサンプルでは、プライベートメソッド `checkpoint` で、適切な例外処理と再試行のロジックを使用する `IRecordProcessorCheckpointer.checkpoint` を呼び出す方法を示しています。

KCL は、`processRecords` を使用して、データレコードの処理から発生するすべての例外を処理します。例外が `processRecords` からスローされた場合、KCL は、例外発生前に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。

**シャットダウン**  
KCL は、処理が終了した場合 (シャットダウンの理由は `TERMINATE`) またはワーカーが応答していない場合 (シャットダウンの理由は `ZOMBIE`)、`shutdown` メソッドを呼び出します。

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

KCL はまた、`IRecordProcessorCheckpointer` インターフェイスを `shutdown` に渡します。シャットダウンの理由が `TERMINATE` である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの `checkpoint` メソッドを呼び出します。

### 更新されたインターフェイス (バージョン 2)
<a name="kcl-java-interface-v2"></a>

更新された `IRecordProcessor` interface (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

コンテナオブジェクトのメソッドの呼び出しで、インターフェイスのオリジナルバージョンのすべての引数にアクセスできます。たとえば、`processRecords()` でレコードのリストを取得には、`processRecordsInput.getRecords()` が使用できます。

このインターフェイスのバージョン 2 (KCL 1.5.0 以降) では、オリジナルインターフェースで提供される入力に加えて次の新しい入力が使用できます。

シーケンス番号の開始  
`InitializationInput` オペレーションへ渡される `initialize()` オブジェクトでは、開始シーケンス番号はレコードプロセッサのインスタンスに配信されるレコードです。このシーケンス番号は、同じシャードで処理されたレコードプロセッサインスタンスの最後のチェックポイントです。これは、アプリケーションでこの情報が必要になる場合のために提供されます。

保留チェックポイントシーケンス番号  
`initialize()` オペレーションへ渡される `InitializationInput`オブジェクトの保留チェックポイントシーケンス番号 (ある場合) とは、前のレコードプロセッサインスタンスが停止する前にコミットできなかったものを示します。

## IRecordProcessor インターフェイスのクラスファクトリを実装する
<a name="kinesis-record-processor-implementation-factory-java"></a>

レコードプロセッサのメソッドを実装するクラスのファクトリも実装する必要があります。コンシューマーは、ワーカーをインスタンス化するときに、このファクトリへの参照を渡します。

サンプルでは、オリジナルのレコードプロセッサインターフェースを使用した、`AmazonKinesisApplicationSampleRecordProcessorFactory.java` ファイルのファクトリクラスを実装します。クラスファクトリでバージョン 2 レコードプロセッサを作成する場合には、`com.amazonaws.services.kinesis.clientlibrary.interfaces.v2` とい名のパッケージを使用してください。

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## ワーカーを作成する
<a name="kcl-java-worker"></a>

[IRecordProcessor メソッドを実装する](#kinesis-record-processor-implementation-interface-java)で説明しているように、KCL レコードプロセッサには選択できる 2 バージョンがあり、どちらを選ぶかでワーカーの作成方法に影響します。オリジナルレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

レコード プロセッサインターフェイスのバージョン 2 では、`Worker.Builder` を使用してワーカを作成でき、どのコンストラクタを使うかや引数の順序を考慮する必要はありません。更新されたレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## 設定プロパティを変更する
<a name="kinesis-record-processor-initialization-java"></a>

このサンプルでは、設定プロパティのデフォルト値を提供します。ワーカーのこの設定データは `KinesisClientLibConfiguration` オブジェクトにまとめられています。ワーカーをインスタンス化する呼び出しで、このオブジェクトと `IRecordProcessor` のクラスファクトリへの参照が渡されます。Java の properties ファイルを使用してこれらのプロパティを独自の値にオーバーライドできます (`AmazonKinesisApplicationSample.java` を参照してください)。

### アプリケーション名
<a name="configuration-property-application-name"></a>

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーション名が必要です。次のようにアプリケーション名の設定値を使用します。
+ このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
+ KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)を参照してください。

### 認証情報の設定
<a name="kinesis-record-processor-cred-java"></a>

デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーのいずれかが認証情報を利用できるようにする必要があります。例えば、EC2 インスタンスでコンシューマーを実行している場合は、IAM ロールでインスタンスを起動することをお勧めします。この IAM ロールに関連付けられた許可を反映する AWS 認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーの認証情報を管理するための最も安全な方法です。

サンプルアプリケーションは、最初にインスタンスメタデータから IAM 認証情報を取得しようとします。

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

サンプルアプリケーションは、インスタンスメタデータから認証情報を取得できない場合、properties ファイルから認証情報を取得しようとします。

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

インスタンスメタデータの詳細については、「Amazon EC2 ユーザーガイド」の「[インスタンスメタデータ](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html)」を参照してください。**

### 複数のインスタンスへのワーカー ID を使用する
<a name="kinesis-record-processor-workerid-java"></a>

サンプルの初期化コードは、次のコードスニペットに示すように、ローカルコンピュータ名にグローバル一意識別子を追加して、ワーカーの ID (`workerId`) を作成します。このアプローチによって、1 台のコンピュータでコンシューマーアプリケーションの複数のインスタンスを実行するシナリオに対応できます。

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## レコードプロセッサインターフェイスのバージョン 2 に移行する
<a name="kcl-java-v2-migration"></a>

オリジナルインターフェースで使われるコードを移行するためには、上記のステップに加えて、次の手順が必要となります。

1. レコードプロセッサのクラスを変更して、バージョン 2 レコードプロセッサインターフェイスにインポートします。

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. コンテナオブジェクトで `get` メソッドを使用するには、入力するリファレンスを変更します。たとえば、`shutdown()` オペレーションで、`checkpointer` を `shutdownInput.getCheckpointer()` に変更します。

1. レコードプロセッサのファクトリークラスを変更して、バージョン 2 レコードプロセッサファクトリーインターフェイスにインポートします。

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. ワーカーのコンストラクチャを変更して、`Worker.Builder` を使います。例えば、次のようになります。

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```