

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Python で Kinesis クライアントライブラリコンシューマーを開発する
<a name="kinesis-record-processor-implementation-app-py"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Python について説明します。

KCL は Java ライブラリであり、Java 以外の言語のサポートは、*MultiLangDaemon* と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。GitHub の MultiLangDaemon の詳細については、[KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)のページを参照してください。

GitHub から Python KCL をダウンロードするには、[Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python) にアクセスしてください。Python KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub で[KCL for Python sample project](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)ページにアクセスしてください。

Python で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

**Topics**
+ [RecordProcessor クラスのメソッドを実装する](#kinesis-record-processor-implementation-interface-py)
+ [設定プロパティを変更する](#kinesis-record-processor-initialization-py)

## RecordProcessor クラスのメソッドを実装する
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` クラスでは、`RecordProcessorBase` を拡張して次のメソッドを実装する必要があります。このサンプルでは、開始点として使用できる実装を提供しています (`sample_kclpy_app.py` を参照してください)。

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**初期化**  
KCL は、レコードプロセッサがインスタンス化されると、`initialize` メソッドを呼び出し、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams は*少なくとも 1 回*のセマンティクスを使用しているからです。つまり、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、[シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。](kinesis-record-processor-scaling.md)を参照してください。

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL は、このメソッドを呼び出し、`initialize` メソッドで指定されたシャードのデータレコードのリストを渡します。実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。

```
def process_records(self, records, checkpointer) 
```

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。`record` ディクショナリは、レコードのデータ、シーケンス番号、およびパーティションキーにアクセスする次のキーと値のペアを公開します。

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

データは Base64 でエンコードされていることに注意してください。

サンプルでは、メソッド `process_records` に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、`Checkpointer` オブジェクトを `process_records` に渡すことで、この追跡をユーザーに代わって処理します。レコードプロセッサは、このオブジェクトの `checkpoint` メソッドを呼び出して、シャード内のレコードの処理の進行状況を KCL に通知します。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが `checkpoint` を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さないと、`checkpoint` への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、`checkpoint` を呼び出す必要があります。レコードプロセッサは、`checkpoint` の各呼び出しで `process_records` を呼び出す必要はありません。たとえば、プロセッサは、3 回呼び出すたびに、`checkpoint` を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして `checkpoint` に指定できます。この場合、KCL は、そのレコードまでのすべてのレコードだけが処理されたと見なします。

サンプルでは、プライベートメソッド `checkpoint` で、適切な例外処理と再試行のロジックを使用する `Checkpointer.checkpoint` メソッドを呼び出す方法を示しています。

KCL は、`process_records` を使用して、データレコードの処理から発生するすべての例外を処理します。例外が `process_records` からスローされた場合、KCL は、例外発生前に `process_records` に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。

**シャットダウン**  
 KCL は、処理が終了した場合 (シャットダウンの理由は `TERMINATE`) またはワーカーが応答していない場合 (シャットダウンの `reason` は `ZOMBIE`)、`shutdown` メソッドを呼び出します。

```
def shutdown(self, checkpointer, reason)
```

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

 また、KCL は、`Checkpointer` オブジェクトも `shutdown` に渡します。シャットダウンの `reason` が `TERMINATE` である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの `checkpoint` メソッドを呼び出します。

## 設定プロパティを変更する
<a name="kinesis-record-processor-initialization-py"></a>

このサンプルでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます (`sample.properties` を参照してください)。

### アプリケーション名
<a name="kinesis-record-processor-application-name-py"></a>

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。
+ このアプリケーション名と関連付けられたワーカーはすべて、同じストリーム上で連携して処理しているとみなされます。これらのワーカーは複数のインスタンスに分散している場合があります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
+ KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)を参照してください。

### 認証情報の設定
<a name="kinesis-record-processor-creds-py"></a>

デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーのいずれかが認証情報を利用できるようにする必要があります。`AWSCredentialsProvider` プロパティを使用して認証情報プロバイダーを設定できます。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) では、[デフォルトの認証情報プロバイダーチェーン](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)のいずれかの認証情報プロバイダーに対して、ユーザーの認証情報を使用可能にする必要があります。Amazon EC2 インスタンスでコンシューマーアプリケーションを実行している場合は、IAM ロールでインスタンスを設定することをお勧めします。この IAM ロールに関連付けられた許可を反映する AWS 認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。

サンプルのプロパティファイルでは、 で指定されているレコードプロセッサを使用してwordsという Kinesis data stream を処理するように KCL を設定します。