

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Kinesis Client Library を使用する
<a name="kcl"></a>

## Kinesis Client Library (KCL) とは何ですか?
<a name="kcl-library-what-is"></a>

Kinesis Client Library (KCL) は、Amazon Kinesis Data Streams からデータを取得して処理する作業を簡素化するために設計された、スタンドアロンの Java ソフトウェアライブラリです。KCL は、分散コンピューティングに伴う複雑な処理を代行するため、開発者はデータ処理のビジネスロジックの実装に集中できます。KCL は、複数ワーカー間での負荷分散、ワーカー障害への対応、処理済みレコードのチェックポイント管理、ストリーム内のシャード数の変化への対応などの処理を管理します。

KCL は、基盤となるライブラリの新しいバージョンの取り込みや、セキュリティ強化、バグ修正を行うために頻繁に更新されています。既知の問題を回避し、最新の改善点を活用するためにも、常に最新バージョンの KCL を使用することをお勧めします。最新の KCL バージョンについては、[KCL の Github](https://github.com/awslabs/amazon-kinesis-client) を参照してください。

**重要**  
既知のバグや問題を回避するために、最新バージョンの KCL を使用することをお勧めします。KCL 2.6.0 以前を使用している場合、ストリーム容量の変化時にシャード処理が停止する可能性があるまれな事象を回避するため、KCL 2.6.1 以降へアップグレードしてください。
KCL は Java ライブラリです。Java 以外の言語をサポートするために、MultiLangDaemon と呼ばれる Java ベースのデーモンが提供されています。MultiLangDaemon は、STDIN および STDOUT を介して KCL アプリケーションと連携します。GitHub の MultiLangDaemon の詳細については、[Java 以外の言語で KCL を使用してコンシューマーを開発する](develop-kcl-consumers-non-java.md) を参照してください。
KCL 3.x で AWS SDK for Java バージョン 2.27.19 ～ 2.27.23 を使用しないでください。これらのバージョンには、KCL が使用する DynamoDB に関連した例外エラーを引き起こす問題が含まれています。この問題を回避するには、 AWS SDK for Java バージョン 2.28.0 以降を使用することをお勧めします。

## KCL の主な機能と利点
<a name="kcl-benefits"></a>

KCL の主な機能と関連する利点は次のとおりです。
+ **スケーラビリティ**: KCL は、複数のワーカー間で処理負荷を分散することで、アプリケーションを動的にスケールさせることを可能にします。手動または自動スケーリングを使用してアプリケーションをスケールインまたはスケールアウトしても、負荷の再分散について心配する必要はありません。
+ **負荷分散**: KCL は利用可能なワーカー間で処理負荷を自動的に均等化し、ワーカー間の作業量をバランスよく分散します。
+ **チェックポイント**: KCL は処理済みレコードのチェックポイント管理を行い、アプリケーションが最後に正常に処理した位置から処理を再開できるようにします。
+ **耐障害性**: KCL には耐障害性メカニズムが組み込まれているため、個々のワーカーが障害を起こした場合でもデータ処理が継続されます。また、KCL は 少なくとも 1 回の配信を保証します。
+ **ストリームレベルの変更の処理**: KCL は、データ量の変化によって発生するシャードの分割や結合に適応します。また、親シャードの処理とチェックポイントが完了するまで子シャードを処理しないようにすることで、レコードの順序が維持されます。
+ **モニタリング**: KCL は Amazon CloudWatch と連携し、コンシューマーレベルのモニタリングを行います。
+ **多言語サポート**: KCL は Java をネイティブでサポートしており、MultiLangDaemon を通じて複数の Java 以外のプログラミング言語にも対応できます。

# KCL の概念
<a name="kcl-concepts"></a>

このセクションでは、Kinesis Client Library (KCL) の主要な概念と、その相互関係について説明します。これらの概念は、KCL コンシューマーアプリケーションを開発および管理するうえで基本となるものです。
+ **KCL コンシューマーアプリケーション** - Kinesis Client Library を使用して、Kinesis データストリームからレコードを読み取って処理するように設計されたカスタムビルドアプリケーション。
+ **ワーカー** – KCL コンシューマーアプリケーションは、通常 1 つ以上のワーカーが同時に実行される分散構成で動作します。KCL はワーカー同士を調整し、ストリームからのデータを分散方式で取得できるようにするとともに、複数のワーカー間で処理負荷が均等になるように管理します。
+ **スケジューラ** – KCL ワーカーがデータの処理を開始するために使用する高レベルのクラス。各 KCL ワーカーには 1 つのスケジューラがあります。スケジューラは、Kinesis データストリームからのシャード情報の同期、ワーカー間におけるシャード割り当ての追跡、そしてワーカーに割り当てられたシャードに基づくストリームからのデータ処理といった、さまざまなタスクを初期化し、監督します。スケジューラは、処理対象のストリーム名や AWS 認証情報など、スケジューラの動作に影響するさまざまな設定を行うことができます。また、スケジューラはストリームからレコードプロセッサへのデータレコードの受け渡しを開始する役割も担います。
+ **レコードプロセッサ** - レコードプロセッサは、KCL コンシューマーアプリケーションがデータストリームから受信したデータをどのように処理するかというロジックを定義します。レコードプロセッサ内には、独自のカスタムデータ処理ロジックを実装する必要があります。KCL ワーカーはスケジューラをインスタンス化します。次に、スケジューラは、ワーカーがリースを保持している各シャードに対して 1 つのレコードプロセッサをインスタンス化します。1 つのワーカーは複数のレコードプロセッサを実行できます。
+ **リース** – ワーカーとシャード間の割り当てを定義します。KCL コンシューマーアプリケーションは、複数のワーカーにデータレコード処理を分散するためにリースを使用します。各シャードは、任意の時点でリースによって 1 つのワーカーにのみ結び付けられ、各ワーカーは同時に 1 つ以上のリースを保持できます。ワーカーが停止または障害によってリースを保持できなくなると、KCL は別のワーカーにそのリースを引き継がせます。リースの詳細については、[Github documentation: Lease Lifecycle](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle) を参照してください。
+ **リーステーブル** – KCL コンシューマーアプリケーションにおけるすべてのリースを追跡するために使用される、専用の Amazon DynamoDB テーブルです。各 KCL コンシューマーアプリケーションは、独自のリーステーブルを作成します。このリーステーブルは、すべてのワーカー間で状態を保持し、データ処理を調整するために使用されます。詳細については、「[KCL における DynamoDB メタデータテーブルと負荷分散](kcl-dynamoDB.md)」を参照してください。
+ **チェックポイント** — シャード内で最後に正常に処理されたレコードの位置を永続的に保存するプロセス。KCL はチェックポイントを管理し、ワーカーが障害を起こした場合やアプリケーションが再起動した場合でも、最後にチェックポイントされた位置から処理を再開できるようにします。チェックポイントは、リースのメタデータの一部として DynamoDB のリーステーブルに保存されます。これにより、ワーカーは前のワーカーが処理を停止した位置から処理を続行できます。

# KCL における DynamoDB メタデータテーブルと負荷分散
<a name="kcl-dynamoDB"></a>

KCL は、リースやワーカーの CPU 使用率メトリクスといったメタデータを管理します。KCL はこれらのメタデータを DynamoDB テーブルを使用して追跡します。Amazon Kinesis Data Streams アプリケーションごとに、KCL はメタデータを管理するために、リーステーブル、ワーカーメトリクステーブル、コーディネーター状態テーブルという 3 つの DynamoDB テーブルを作成します。

**注記**  
KCL 3.x では、新たにワーカーメトリクステーブルとコーディネーター状態テーブルの 2 つのメタデータテーブルが追加されました。****

**重要**  
 KCL アプリケーションが DynamoDB 内でメタデータテーブルを作成し管理できるようにするためには、適切な権限を付与する必要があります。詳細については、「[KCL コンシューマーアプリケーションに必要な IAM アクセス許可](kcl-iam-permissions.md)」を参照してください。  
KCL コンシューマーアプリケーションは、これら 3 つの DynamoDB メタデータテーブルを自動的には削除しません。不要なコストを回避するため、コンシューマーアプリケーションを廃止するときには、KCL コンシューマーアプリケーションによって作成されたこれらの DynamoDB メタデータテーブルを必ず削除してください。

## リーステーブル
<a name="kcl-leasetable"></a>

リーステーブルは、KCL コンシューマーアプリケーションのスケジューラによってリースされ、処理されているシャードを追跡するために使用される、専用の Amazon DynamoDB テーブルです。各 KCL コンシューマーアプリケーションは、独自のリーステーブルを作成します。KCL はデフォルトで、コンシューマーアプリケーションの名前をリーステーブル名として使用します。設定を使用してカスタムテーブル名を設定できます。また、KCL は効率的なリース検出のため、リーステーブルに leaseOwner をパーティションキーとする[グローバルセカンダリインデックス](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html)を作成します。グローバルセカンダリインデックスは、ベースとなるリーステーブルの leaseKey 属性をミラーします。アプリケーションの起動時にKCL コンシューマーアプリケーションのリーステーブルが存在しない場合は、いずれかのワーカーがご使用のアプリケーションのリーステーブルを作成します。

コンシューマーアプリケーションの実行中に、[Amazon DynamoDB コンソール](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html)を使用してリーステーブルを表示できます。

**重要**  
各 KCL コンシューマーアプリケーション名は、リーステーブル名の重複を防ぐために一意でなければなりません。
アカウントには、Kinesis Data Streams 自体に関連するコストに加えて、DynamoDB テーブルに関連するコストが発生します。

リーステーブルの各行は、コンシューマーアプリケーションのスケジューラによって処理中のシャードを表します。主要なフィールドは以下のとおりです。
+ **leaseKey:** 単一ストリーム処理の場合、これはシャード ID です。KCL を使用したマルチストリーム処理の場合は、`account-id:StreamName:streamCreationTimestamp:ShardId` の形式で構成されます。leaseKey はリーステーブルのパーティションキーです。マルチストリーム処理の詳細については、[KCL を使用したマルチストリーム処理](kcl-multi-stream.md) を参照してください。
+ **checkpoint:** シャードの最新チェックポイントのシーケンス番号。
+ **checkpointSubSequenceNumber:** Kinesis Producer Library の集約機能を使用する場合、これは Kinesis レコード内の個々のユーザレコードを追跡する**チェックポイント**の拡張です。
+ **leaseCounter:** ワーカーが現在そのリースを積極的に処理しているかどうかを確認するために使用されます。リースの所有権が別のワーカーに移った場合、leaseCounter は増加します。
+ **leaseOwner:** このリースを現在保持しているワーカー。
+ **ownerSwitchesSinceCheckpoint:** 最後のチェックポイント以降、このリースの担当ワーカーが何回変更されたかを示します。
+ **parentShardId:** このシャードの親シャードの ID。親シャードが完全に処理されるまで子シャードの処理が開始されないようにすることで、レコード処理の正しい順序を維持します。
+ **childShardId:** このシャードの分割または結合によって生成された子シャード ID の一覧。リシャーディング処理におけるシャードの系統を追跡し、処理順序を管理するために使用されます。
+ **startingHashKey:** このシャードのハッシュキー範囲の下限。
+ **endingHashKey:** このシャードのハッシュキー範囲の上限。

マルチストリーム処理を KCL で使用している場合、リーステーブルには次の 2 つの追加フィールドが表示されます。詳細については、「[KCL を使用したマルチストリーム処理](kcl-multi-stream.md)」を参照してください。
+ **shardID:** シャードの ID。
+ **streamName:** 以下の形式のデータストリームの識別子: `account-id:StreamName:streamCreationTimestamp`。

## ワーカーメトリクステーブル
<a name="kcl-worker-metrics-table"></a>

ワーカーメトリクステーブルは、各 KCL アプリケーションに対して固有の Amazon DynamoDB テーブルであり、各ワーカーからの CPU 使用率メトリクスを記録するために使用されます。これらのメトリクスは、KCL が効率的なリース割り当てを行い、ワーカー間でリソース利用を均等化するために利用されます。KCL は、このワーカーメトリクステーブルの名前としてデフォルトで、 `KCLApplicationName-WorkerMetricStats` を使用します。

## コーディネーター状態テーブル
<a name="kcl-coordinator-state-table"></a>

コーディネーター状態テーブルは、各 KCL アプリケーションに対して固有の Amazon DynamoDB テーブルであり、ワーカーの内部状態情報を保存するために使用されます。たとえば、コーディネーター状態テーブルには、リーダー選出に関するデータや、KCL 2.x から KCL 3.x へのインプレース移行に関連するメタデータが保存されます。KCL は、このコーディネーター状態テーブルの名前としてデフォルトで `KCLApplicationName-CoordinatorState` を使用します。

## KCL が作成するメタデータテーブルの DynamoDB キャパシティモード
<a name="kcl-capacity-mode"></a>

Kinesis Client Library (KCL) は、リーステーブル、ワーカーメトリクステーブル、コーディネーター状態テーブルといった DynamoDB のメタデータテーブルを、デフォルトで[オンデマンドキャパシティモード](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html)で作成します。このモードでは、容量計画を行う必要がなく、トラフィックに応じて読み込み容量および書き込み容量が自動的にスケーリングします。これらのメタデータテーブルをより効率的に運用するため、キャパシティモードはオンデマンドのまま使用することを強くお勧めします。

リーステーブルを[プロビジョンドキャパシティモード](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html)に切り替える場合は、次のベストプラクティスに従ってください。
+ 使用パターンを分析する:
  + Amazon CloudWatch メトリクスを使用して、アプリケーションの読み取りおよび書き込みパターン (RCU、WCU) と使用状況をモニタリングします。
  + ピーク時および平均的なスループット要件を把握します。
+ 必要なキャパシティを算出する:
  + 分析結果に基づいて、必要な読み込み容量ユニット (RCU) と書き込み容量ユニット (WCU) を見積もります。
  + シャード数、チェックポイント頻度、ワーカー数といった要素も考慮します。
+ 自動スケーリングを実装する:
  + [DynamoDB 自動スケーリング](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling)を使用して、プロビジョンドキャパシティを自動的に調整し、適切な最小キャパシティの下限および最大キャパシティの上限を設定します。
  + DynamoDB 自動スケーリングを使用することで、KCL のメタデータテーブルがキャパシティ制限に達してスロットリングされる事態を防ぐことができます。
+ 定期的なモニタリングと最適化:
  + `ThrottledRequests` に関する CloudWatch メトリクスを継続的にモニタリングします。
  + ワークロードの経時変化に応じて、キャパシティを調整します。

KCL コンシューマーアプリケーションのメタデータ用 DynamoDB テーブルで `ProvisionedThroughputExceededException` が発生した場合は、DynamoDB テーブルのプロビジョンドスループットキャパシティを増やす必要があります。コンシューマーアプリケーションを最初に作成した際に特定の読み込み容量ユニット (RCU) および書き込み容量ユニット (WCU) を設定していても、使用量が増えるにつれてそれだけでは不十分になる可能性があります。たとえば、KCL コンシューマーアプリケーションが頻繁にチェックポイントを実行する場合や、多数のシャードを持つストリームを処理する場合は、より多くの容量ユニットが必要になることがあります。DynamoDB のプロビジョンドスループットについての詳細は、Amazon DynamoDB デベロッパーガイドの [DynamoDB のスループットキャパシティ](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html)と[テーブルの更新](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable)を参照してください。

## KCL がワーカーにリースを割り当て、負荷を分散する方法
<a name="kcl-assign-leases"></a>

KCL は、ワーカーを実行しているコンピューティングホストの CPU 使用率メトリクスを継続的に収集してモニタリングし、ワークロードが均等に分散されるようにします。これらの CPU 使用率メトリクスは、DynamoDB のワーカーメトリクステーブルに保存されます。KCL は、一部のワーカーの CPU 使用率が他と比べて高くなっていることを検出すると、負荷の高いワーカーの負荷を下げるために、ワーカー間でリースを再割り当てします。目的は、コンシューマーアプリケーションフリート全体でワークロードをより均等に分散し、特定のワーカーに過度な負荷が集中するのを防ぐことです。KCL がコンシューマーアプリケーションフリート全体で CPU 使用率を分散するため、適切なワーカー数を選択することでコンシューマーアプリケーションフリートのキャパシティを適正化できます。また、自動スケーリングを使用してコンピューティングキャパシティを効率的に管理し、コストを削減することも可能です。

**重要**  
KCL がワーカーから CPU 使用率メトリクスを収集できるのは、特定の前提条件が満たされている場合のみです。詳細については、「[前提条件](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites)」を参照してください。KCL がワーカーから CPU 使用率メトリクスを収集できない場合、ワーカーごとのスループットに基づいてリースを割り当て、フリート内のワーカー間で負荷を分散する方式にフォールバックします。KCL は、各ワーカーがある時点で受け取っているスループットをモニタリングし、割り当てられたリースから得られる総スループット量が均等になるようにリースを再割り当てします。

# KCL を使用してコンシューマーを開発する
<a name="develop-kcl-consumers"></a>

Kinesis データストリームのデータを処理するコンシューマーアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。

KCL は、複数の言語で使用できます。このトピックでは、Java および Java 以外の言語で KCL コンシューマーを開発する方法について説明します。
+ Kinesis Client Library Javadoc リファレンスを表示するには、[Amazon Kinesis Client Library Javadoc](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html) を参照してください。
+ GitHub から Java 用 KCL をダウンロードするには、[Amazon Kinesis Client Library for Java](https://github.com/awslabs/amazon-kinesis-client) を参照してください。
+ Apache Maven で Java 用 KCL を見つけるには、[KCL Maven Central Repository](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client) を参照してください。

**Topics**
+ [Java で KCL を使用してコンシューマーを開発する](develop-kcl-consumers-java.md)
+ [Java 以外の言語で KCL を使用してコンシューマーを開発する](develop-kcl-consumers-non-java.md)

# Java で KCL を使用してコンシューマーを開発する
<a name="develop-kcl-consumers-java"></a>

## 前提条件
<a name="develop-kcl-consumers-java-prerequisites"></a>

KCL 3.x の使用を始める前に、以下のものが揃っていることを確認してください。
+ Java Development Kit (JDK) 8 以降
+ AWS SDK for Java 2.x
+ 依存関係管理用の Maven または Gradle

KCL は、ワーカーが実行されているコンピューティングホストから CPU 使用率などの CPU 使用率メトリクスを収集し、ワーカー間でリソース使用量が均等になるように負荷を分散します。KCL がワーカーから CPU 使用率メトリクスを収集できるようにするには、次の前提条件を満たす必要があります。

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ オペレーティングシステムは Linux OS である必要があります。
+ EC2 インスタンスで [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html) を有効にする必要があります。

 **Amazon Elastic Container Service (Amazon ECS) on Amazon EC2**
+ オペレーティングシステムは Linux OS である必要があります。
+ [ECS タスクメタデータエンドポイントのバージョン 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html) を有効にする必要があります。
+ Amazon ECS コンテナエージェントのバージョンは 1.39.0 以降である必要があります。

 **での Amazon ECS AWS Fargate**
+ [Fargate タスクメタデータエンドポイントのバージョン 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) を有効にする必要があります。Fargate プラットフォームバージョン 1.4.0 以降を使用している場合、これはデフォルトで有効になっています。
+ Fargate プラットフォームバージョン 1.4.0 以降

 **Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2** 
+ オペレーティングシステムは Linux OS である必要があります。

 **での Amazon EKS AWS Fargate**
+ Fargate プラットフォーム 1.3.0 以降

**重要**  
KCL がワーカーから CPU 使用率メトリクスを収集できない場合、ワーカーごとのスループットに基づいてリースを割り当て、フリート内のワーカー間で負荷を分散する方式にフォールバックします。詳細については、「[KCL がワーカーにリースを割り当て、負荷を分散する方法](kcl-dynamoDB.md#kcl-assign-leases)」を参照してください。

## 依存関係をインストールして追加する
<a name="develop-kcl-consumers-java-installation"></a>

Maven を使用している場合は、以下の依存関係を `pom.xml` ファイルに追加します。3.x.x を最新の KCL バージョンに置き換えていることを確認してください。

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Gradle を使用している場合は、以下を `build.gradle` ファイルに追加します。3.x.x を最新の KCL バージョンに置き換えていることを確認してください。

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

最新の KCL バージョンは、[Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client) で確認できます。

## コンシューマーを実装する
<a name="develop-kcl-consumers-java-implemetation"></a>

KCL コンシューマーアプリケーションは、次の主要コンポーネントで構成されます。

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [スケジューラー](#implementation-scheduler)
+ [メインコンシューマーアプリケーション](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor は、Kinesis データストリームのレコードを処理するビジネスロジックが実装される中核コンポーネントです。アプリケーションが Kinesis ストリームから受け取ったデータをどのように処理するかを定義します。

主な役割:
+ シャードの処理を初期化する
+ Kinesis ストリームからのレコードのバッチを処理する
+ シャードの処理をシャットダウンする (シャードが分割または結合された場合、またはリースが別のホストに引き継がれる場合など)
+ チェックポイントを処理して進行状況を追跡する

以下に実装例を示します。

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

以下に、この例で使用されている各メソッドの詳細な説明を示します。

**initialize(InitializationInput initializationInput)**
+ 目的: レコードを処理するために必要なリソースや状態を設定します。
+ 呼び出されるタイミング: KCL がこのレコードプロセッサにシャードを割り当てたときに 1 回だけ呼び出されます。
+ キーポイント:
  + `initializationInput.shardId()`: このプロセッサが処理するシャードの ID。
  + `initializationInput.extendedSequenceNumber()`: 処理を開始するシーケンス番号。

**processRecords(ProcessRecordsInput processRecordsInput)**
+ 目的: 受信レコードを処理し、必要に応じてチェックポイントの進捗を処理します。
+ 呼び出されるタイミング: レコードプロセッサがそのシャードのリースを保持している間、繰り返し呼び出されます。
+ キーポイント:
  + `processRecordsInput.records()`: 処理するレコードのリスト。
  + `processRecordsInput.checkpointer()`: チェックポイントの進捗の処理に使用されます。
  + KCL の処理が失敗しないよう、レコード処理中に発生する例外を必ず処理してください。
  + このメソッドはべき等である必要があります。予期しないワーカーのクラッシュや再起動により、チェックポイントされていないレコードが再度処理されるなど、同じレコードが複数回処理される可能性があるためです。
  + データ整合性を確保するため、チェックポイントを実行する前に、バッファされたデータを必ずすべてフラッシュしてください。

**leaseLost(LeaseLostInput leaseLostInput)**
+ 目的: このシャードの処理に固有のリソースをクリーンアップします。
+ 呼び出されるタイミング: 別のスケジューラがこのシャードのリースを引き継いだときに呼び出されます。
+ キーポイント:
  + このメソッドではチェックポイントを実行できません。

**shardEnded(ShardEndedInput shardEndedInput)**
+ 目的: このシャードおよびチェックポイントの処理を完了します。
+ 呼び出されるタイミング: シャードが分割または結合され、このシャードのすべてのデータが処理されたことを示すタイミングで呼び出されます。
+ キーポイント:
  + `shardEndedInput.checkpointer()`: 最終的なチェックポイント処理を実行するために使用します。
  + このメソッドでは、処理を完了するためにチェックポイントの実行が必須です。
  + ここでフラッシュ処理とチェックポイントを行わないと、シャードが再度オープンされた際に、データ損失や重複処理が発生する可能性があります。

**shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)**
+ 目的: KCL のシャットダウン時にチェックポイントを実行し、リソースをクリーンアップします。
+ 呼び出されるタイミング: KCL がシャットダウンする際 (例: アプリケーションが終了するとき) に呼び出されます。
+ キーポイント:
  + `shutdownRequestedInput.checkpointer()`: シャットダウン前にチェックポイントを実行するために使用します。
  + アプリケーションが停止する前に進捗が保存されるよう、このメソッド内でチェックポイント処理を実装していることを確認してください。
  + ここでデータのフラッシュとチェックポイントを行わないと、アプリケーションの再起動時にデータ損失やレコードの再処理が発生する可能性があります。

**重要**  
KCL 3.x では、前のワーカーがシャットダウンされる前にチェックポイントを実行することで、リースが別のワーカーに引き継がれてもデータの再処理が最小限で済むようになっています。`shutdownRequested()` メソッドにチェックポイントロジックを実装していない場合、このメリットは得られません。`shutdownRequested()` メソッド内にチェックポイントロジックが実装されていることを確認してください。

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory は、新しい RecordProcessor インスタンスを作成する役割を担います。KCL は、このファクトリを使用して、アプリケーションが処理する必要のある各シャードに対して新しい RecordProcessor を作成します。

主な役割:
+ 必要に応じて新しい RecordProcessor インスタンスを作成する
+ 各 RecordProcessor が正しく初期化されていることを保証する

以下に実装例を示します。

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

この例では、ファクトリは shardRecordProcessor() が呼び出されるたびに新しい SampleRecordProcessor を作成します。必要に応じて、初期化ロジックを追加する形で拡張できます。

### スケジューラー
<a name="implementation-scheduler"></a>

スケジューラは、KCL アプリケーション内のすべての動作を調整する高レベルのコンポーネントです。データ処理全体のオーケストレーションを担います。

主な役割:
+ RecordProcessor のライフサイクルを管理する
+ シャードのリース管理を処理する
+ チェックポイント処理を調整する
+ アプリケーション内の複数ワーカー間でシャード処理の負荷を分散する
+ 正常なシャットダウンおよびアプリケーション終了シグナルを処理する

スケジューラは通常、メインアプリケーション内で作成および開始されます。スケジューラの実装例は、この後の「メインコンシューマーアプリケーション」セクションで確認できます。

### メインコンシューマーアプリケーション
<a name="implementation-main"></a>

メインコンシューマーアプリケーションは、すべてのコンポーネントを結び付ける役割を果たします。KCL コンシューマーのセットアップ、必要なクライアントの作成、スケジューラの構成、およびアプリケーションのライフサイクル管理を担当します。

主な役割:
+  AWS サービスクライアントの設定 (Kinesis、DynamoDB、CloudWatch)
+ KCL アプリケーションを構成する
+ スケジューラを作成して起動する
+ アプリケーションのシャットダウン処理を行う

以下に実装例を示します。

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 KCL は、デフォルトで専用スループットを持つ拡張ファンアウト (EFO) コンシューマーを作成します。拡張ファンアウトの詳細については、[専用スループットを備えた拡張ファンアウトを開発する](enhanced-consumers.md) を参照してください。コンシューマーが 2 未満の場合、または 200 ミリ秒未満の読み取り伝搬遅延を必要としない場合は、共有スループットコンシューマーを使用するために、スケジューラオブジェクトで次の設定を行う必要があります。

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

次のコードは、共有スループットコンシューマーを使用するスケジューラオブジェクトを作成する例です。

**インポート**:

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**コード**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Java 以外の言語で KCL を使用してコンシューマーを開発する
<a name="develop-kcl-consumers-non-java"></a>

このセクションでは、Python、Node.js、.NET、Ruby などの言語で Kinesis Client Library (KCL) を使用してコンシューマーを実装する方法について説明します。

KCL は Java ライブラリです。Java 以外の言語のサポートは、`MultiLangDaemon` と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースであり、Java 以外の言語で KCL を使用する場合、バックグラウンドで実行されます。そのため、KCL を Java以外の言語向けにインストールし、コンシューマーアプリケーションを完全に Java 以外の言語で記述している場合でも、`MultiLangDaemon` を使用するために、システムに Java をインストールしておく必要があります。さらに、`MultiLangDaemon` にはデフォルト設定があり、ユースケースによってはカスタマイズが必要になる場合があります (接続先の AWS リージョンなど)。GitHub の `MultiLangDaemon` の詳細については、[KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang) を参照してください。

主要な概念はどの言語でも共通ですが、言語ごとに特有の考慮点や実装があります。KCL コンシューマー開発の基本概念については、[Java で KCL を使用してコンシューマーを開発する](develop-kcl-consumers-java.md)を参照してください。Python、Node.js、.NET、Ruby で KCL コンシューマーを開発する方法の詳細および最新情報については、次の GitHub リポジトリを参照してください。
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**重要**  
JDK 8 を使用している場合、以下の Java 以外の言語向け KCL ライブラリのバージョンは使用しないでください。これらのバージョンには、JDK 8 と互換性のない依存関係 (logback) が含まれています。  
KCL Python 3.0.2 および 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
JDK 8 を使用する場合は、問題が発生するこれらのバージョンより前または後にリリースされたバージョンを使用することをお勧めします。

# KCL を使用したマルチストリーム処理
<a name="kcl-multi-stream"></a>

このセクションでは、複数のデータストリームを同時に処理できる KCL コンシューマーアプリケーションを作成するために必要な KCL の変更点について説明します。
**重要**  
マルチストリーム処理は KCL 2.3 以降でのみサポートされています。
`multilangdaemon` を使用して実行される、Java 以外の言語で記述された KCL コンシューマーでは、マルチストリーム処理はサポートされていません。**
マルチストリーム処理は KCL 1.x のどのバージョンでもサポートされていません。**
+ **MultistreamTracker インターフェイス**
  + 複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、[MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java) という新しいインターフェイスを実装する必要があります。このインターフェースには、KCL コンシューマーアプリケーションによって処理されるデータストリームとその設定のリストを返す `streamConfigList` メソッドが含まれています。処理中のデータストリームは、コンシューマーアプリケーションのランタイム中に変更できることに注意してください。`streamConfigList` は、処理するデータストリームの変更について学習するために KCL によって定期的に呼び出されます。
  + `streamConfigList` が [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) リストに入力します。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + `StreamIdentifier` および `InitialPositionInStreamExtended` は必須フィールドですが、`consumerArn` は省略可能です。KCL を使用して拡張ファンアウトコンシューマーアプリケーションを実装する場合にのみ、`consumerArn` を提供する必要があります。
  + の詳細については`StreamIdentifier`、[https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129) を参照してください。`StreamIdentifier` を作成するには、`streamArn` および KCL 2.5.0 以降で利用可能な `streamCreationEpoch` からマルチストリームインスタンスを作成することをお勧めします。`streamArm` をサポートしていない KCL v2.3 および v2.4 では、`account-id:StreamName:streamCreationTimestamp` 形式を使用してマルチストリームインスタンスを作成します。この形式は廃止され、次のメジャーリリース以降はサポートされなくなります。
  +  MultistreamTracker には、リーステーブル内の古いストリームのリースを削除するための戦略 (formerStreamsLeasesDeletionStrategy) も含まれています。コンシューマーアプリケーションのランタイム中は、ストラテジーを変更できないことに注意してください。詳細については、[https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java) を参照してください。
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) は、KCL 2.x 以降向けの KCL コンシューマーアプリケーションを構築する際に使用する KCL のすべての設定項目をアプリケーション全体で指定できるクラスです。`ConfigsBuilder` クラスは `MultistreamTracker` インターフェイスをサポートするようになりました。ConfigsBuilder は、レコードを消費する 1 つのデータストリームの名前を使用して初期化できます。 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

または、複数のストリームを同時に処理する KCL コンシューマーアプリケーションを実装する場合、`MultiStreamTracker` で ConfigsBuilder を初期化することもできます。

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ KCL コンシューマーアプリケーションにマルチストリームサポートが実装されているため、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれるようになりました。
+ KCL コンシューマーアプリケーションのマルチストリームサポートが実装されている場合、leaseKey は次の構造を取ります: `account-id:StreamName:streamCreationTimestamp:ShardId`。例えば、`111111111:multiStreamTest-1:12345:shardId-000000000336`。

**重要**  
既存の KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理するように構成されている場合、`leaseKey` (リーステーブルのパーティションキー) はシャード ID です。この既存の KCL コンシューマーアプリケーションを複数のデータストリームを処理するように再構成すると、リーステーブルが壊れます。これは、マルチストリームをサポートするためには、`leaseKey` の構造が次の形式である必要があるためです: `account-id:StreamName:StreamCreationTimestamp:ShardId`。

# KCL で AWS Glue スキーマレジストリを使用する
<a name="kcl-glue-schema"></a>

Kinesis Data Streams を AWS Glue スキーマレジストリと統合できます。 AWS Glue スキーマレジストリを使用すると、生成されたデータが登録されたスキーマによって継続的に検証されるようにしながら、スキーマを一元的に検出、制御、および進化させることができます。スキーマ は、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glue スキーマレジストリを使用すると、ストリーミングアプリケーション内のend-to-endのデータ品質とデータガバナンスを向上させることができます。詳細については、「[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)」を参照してください。この統合を設定する方法の 1 つは、Java で KCL を使用することです。

**重要**  
AWS Glue Kinesis Data Streams のスキーマレジストリ統合は、KCL 2.3 以降でのみサポートされています。
AWS Glue Kinesis Data Streams のスキーマレジストリ統合は、 で実行される Java 以外の言語で記述された KCL コンシューマーでは**サポートされていません`multilangdaemon`。
AWS Glue Kinesis Data Streams のスキーマレジストリ統合は、KCL 1.x のどのバージョンでも**サポートされていません。

KCL を使用して Kinesis Data Streams と AWS Glue スキーマレジストリの統合を設定する方法の詳細については、[「ユースケース: Amazon Kinesis Data Streams と AWS Glue スキーマレジストリの統合」の「KPL/KCL ライブラリを使用したデータの操作」セクションを参照してください。](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)

# KCL コンシューマーアプリケーションに必要な IAM アクセス許可
<a name="kcl-iam-permissions"></a>

 KCL コンシューマーアプリケーションに関連付けられている IAM ロールまたはユーザーに、次の許可を追加する必要があります。

 のセキュリティのベストプラクティス AWS により、きめ細かなアクセス許可を使用してさまざまなリソースへのアクセスを制御できます。 AWS Identity and Access Management (IAM) では、 でユーザーとユーザーのアクセス許可を管理できます AWS。IAM ポリシーは、許可されるアクションとそのアクションが適用されるリソースを明示的にリストアップします。

次の表は、KCL コンシューマーアプリケーションに一般的に必要となる最小限の IAM アクセス許可を示しています。


**KCL コンシューマーアプリケーションの最小限の IAM アクセス許可**  

| サービス | アクション | リソース (ARN) | 目的 | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  KCL アプリケーションがデータを処理する Kinesis データストリーム。`arn:aws:kinesis:region:account:stream/StreamName`  |  レコードを読み取る前に、コンシューマーは、データストリームが存在すること、アクティブであること、シャードを含んでいることを確認します。 コンシューマーをシャードに登録します。  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | KCL アプリケーションがデータを処理する Kinesis データストリーム。`arn:aws:kinesis:region:account:stream/StreamName` |  シャードからレコードを読み取ります。  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  KCL アプリケーションがデータを処理する Kinesis データストリーム。このアクションは、拡張ファンアウト (EFO) コンシューマーを使用する場合にのみ追加します。 `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  拡張ファンアウト (EFO) コンシューマー用にシャードをサブスクライブします。  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  リーステーブル (KCL によって作成された DynamoDB のメタデータテーブル。 `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  これらのアクションは、KCL が DynamoDB で作成されたリーステーブルを管理するために必要です。  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  KCL によって作成されたワーカーメトリクスとコーディネーター状態テーブル (DynamoDB のメタデータテーブル)。 `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  KCL が DynamoDB でワーカーメトリクスとコーディネーター状態メタデータテーブルを管理するには、これらのアクションが必要です。  | 
| Amazon DynamoDB | `Query` |  リーステーブルのグローバルセカンダリインデックス `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  KCL が DynamoDB で作成されたリーステーブルのグローバルセカンダリインデックスを読み取るには、このアクションが必要です。  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  アプリケーションをモニタリングするのに便利なメトリクスを CloudWatch にアップロードします。CloudWatch では `PutMetricData` アクションが実行される特定のリソースが存在しないため、アスタリスク (\$1) が使用されています。  | 

**注記**  
ARNsStreamName」、KCLApplicationName」をそれぞれ独自の AWS リージョン名前、 AWS アカウント 数値、Kinesis データストリーム名、および KCL アプリケーション名に置き換えます。KCL 3.x は、DynamoDB にさらに 2 つのメタデータテーブルを作成します。KCL によって作成された DynamoDB メタデータテーブルの詳細については、[KCL における DynamoDB メタデータテーブルと負荷分散](kcl-dynamoDB.md) を参照してください。KCL が作成するメタデータテーブルの名前を設定でカスタマイズしている場合は、KCL アプリケーション名ではなく、その指定したテーブル名を使用してください。

以下は、KCL コンシューマーアプリケーションのサンプルポリシードキュメントです。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

このサンプルポリシーを使用する前に、次の項目を確認してください。
+ REGION を your AWS リージョン (us-east-1 など) に置き換えます。
+ ACCOUNT\$1ID を AWS アカウント ID に置き換えます。
+ STREAM\$1NAME を自分の Kinesis データストリーム名に置き換えます。
+ CONSUMER\$1NAME を自分のコンシューマー名に置き換えます (KCL を使用する場合、通常はアプリケーション名)。
+ KCL\$1APPLICATION\$1NAME を自分の KCL アプリケーション名に置き換えます。

# KCL の設定
<a name="kcl-configuration"></a>

特定の要件に合わせて Kinesis Client Library の機能をカスタマイズできるよう、設定プロパティを指定できます。次の表に、設定プロパティとクラスを示します。

**重要**  
KCL 3.x の負荷分散アルゴリズムは、ワーカーごとのリース数を均等にすることではなく、ワーカー間の CPU 使用率を均等化することを目的としています。`maxLeasesForWorker` を低く設定しすぎると、KCL のワークロードを効果的に分散する能力が制限される可能性があります。`maxLeasesForWorker` 設定を使用する場合は、可能な限り最適な負荷分散が行えるよう、その値を高めに設定することを検討してください。


**この表は、KCL の設定プロパティを示しています。**  

| 設定プロパティ | 設定クラス | 説明 | デフォルトの値 | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | この KCL アプリケーションの名前。tableName および consumerName のデフォルトとして使用されます。 | 該当しない | 
| tableName | ConfigsBuilder |  Amazon DynamoDB リーステーブルで使用されるテーブル名の上書きを許可します。  | 該当しない | 
| streamName | ConfigsBuilder |  このアプリケーションがレコードを処理するストリームの名前。  | 該当しない | 
| workerIdentifier | ConfigsBuilder |  このアプリケーションプロセッサのインスタンス化を表す一意の識別子。一意である必要があります。  | 該当しない | 
| failoverTimeMillis | LeaseManagementConfig |  リース所有者が失敗したとみなすまでの経過時間 (ミリ秒)。シャードの数が多いアプリケーションでは、リースの追跡に必要な DynamoDB の IOPS を減らすために、この値をより大きく設定する場合があります。  | 10,000 (10 秒) | 
| shardSyncIntervalMillis | LeaseManagementConfig |  シャード同期コールの間隔。  | 60,000 (60 秒) | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  設定すると、子リースの処理が開始されると即時にリースが削除されます。  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  設定すると、開いているシャードがある子シャードは無視されます。これは、主に DynamoDB Streams 用です。  | FALSE | 
| maxLeasesForWorker | LeaseManagementConfig |  1 つのワーカーが受け持つことができるリースの最大数です。この値を低く設定しすぎると、ワーカーがすべてのシャードを処理できず、データ損失を招く可能性があります。また、ワーカー間のリース割り当てが最適にならない原因にもなります。設定する際は、シャード合計数、ワーカー数、ワーカーの処理能力を考慮してください。  | 無制限 | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  リース更新スレッドプールのサイズを制御します。アプリケーションが処理するリースの数が多いほど、このプールも大きくする必要があります。  | 20 | 
| billingMode | LeaseManagementConfig |  DynamoDB に作成されるリーステーブルのキャパシティモードを決定します。選択肢はオンデマンドモード (PAY\$1PER\$1REQUEST) とプロビジョンドモードの 2 種類です。容量計画を行う必要がなく、ワークロードに応じて自動的にスケールするため、デフォルト設定であるオンデマンドモードの使用をお勧めします。  | PAY\$1PER\$1REQUEST (オンデマンドモード) | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Kinesis Client Library がプロビジョンドキャパシティモードで新しい DynamoDB リーステーブルを作成する必要がある場合に使用される DynamoDB の読み込み容量です。billingMode 設定でデフォルトのオンデマンドキャパシティモードを使用している場合、この設定は無視して構いません。 | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Kinesis Client Library で新しい DynamoDB リーステーブルを作成する必要がある場合に使用される DynamoDB 読み込み容量です。billingMode 設定でデフォルトのオンデマンドキャパシティモードを使用している場合、この設定は無視して構いません。 | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  アプリケーションが読み取りを開始するストリーム内の初期位置。これは最初のリースの作成時にのみ使用されます。  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  負荷分散アルゴリズムがワーカー間でシャードの再割り当てを検討するタイミングを決定する、パーセンテージ値です。 これは、KCL 3.x で導入された新しい設定です。  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  1 回のリバランス処理で、過負荷状態のワーカーから移動させる負荷量を抑制するために使用されるパーセンテージ値です。 これは、KCL 3.x で導入された新しい設定です。  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  希望するスループット量を超えることになったとしても、過負荷状態のワーカーから追加のリースを取得する必要があるかどうかを決定します。 これは、KCL 3.x で導入された新しい設定です。  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  リースの再割り当てや負荷分散を行う際に、KCL がワーカーのリソースメトリクス (CPU 使用率など) を無視するかどうかを指定します。KCL に CPU 使用率に基づく負荷分散を行わせたくない場合は、この値を TRUE に設定してください。 これは、KCL 3.x で導入された新しい設定です。  | FALSE | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  リース割り当て中にワーカーに割り当てる最大スループットの量。 これは、KCL 3.x で導入された新しい設定です。  | 無制限 | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  ワーカー間でリースを引き継ぐ際の動作を制御します。この値を true に設定すると、KCL はリースを別のワーカーに引き渡す前に、そのシャードの RecordProcessor が処理を完了できる十分な時間を確保し、リースをスムーズに引き継ぐよう試みます。これによりデータの整合性やスムーズな引き継ぎが可能になりますが、リースの引き継ぎに要する時間が長くなる場合があります。 この値を false に設定した場合、RecordProcessor が正常にシャットダウンするのを待たずに、リースは即座に引き渡されます。これによりリースの引き継ぎは高速になりますが、処理が完了しないまま中断されるリスクがあります。 注: スムーズなリース引き継ぎ機能の恩恵を受けるためには、RecordProcessor の shutdownRequested() メソッド内にチェックポイント処理を実装しておく必要があります。 これは、KCL 3.x で導入された新しい設定です。  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  現在のシャードの RecordProcessor が適切にシャットダウンするのを待つための最小待機時間 (ミリ秒) を指定します。この時間が経過すると、次の所有者へのリースが強制的に引き継がれます。 processRecords メソッドの処理時間が通常のデフォルト値より長い場合は、この設定値を大きくすることを検討してください。これにより、リースが引き継がれる前に RecordProcessor が処理を完了できる十分な時間を確保できます。 これは、KCL 3.x で導入された新しい設定です。  | 30,000 (30 秒) | 
| maxRecords | PollingConfig |  Kinesis が返すレコードの最大数の設定を許可します。  | 10,000 | 
| retryGetRecordsInSeconds | PollingConfig |  GetRecords が失敗した場合の試行間隔の遅延時間を設定します。  | なし | 
| maxGetRecordsThreadPool | PollingConfig |  GetRecords に使用されるスレッドプールのサイズ。  | なし | 
| idleTimeBetweenReadsInMillis | PollingConfig |  KCL がデータストリームからデータを取得するために GetRecords を呼び出す際、その呼び出し間隔として待機する時間を指定します。単位はミリ秒です。  | 1,500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  設定すると、Kinesis から提供されたレコードがない場合でもレコードプロセッサが呼び出されます。  | FALSE | 
| parentShardPollIntervalMillis | CoordinatorConfig |  親シャードが完了したかどうかを確認するためにレコードプロセッサがポーリングを行う頻度。単位はミリ秒です。  | 10,000 (10 秒) | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  リーステーブルに既存のリースがある場合、シャードデータの同期を無効にします。  |  FALSE  | 
| shardPrioritization | CoordinatorConfig |  どのシャードの優先順位付けを使用するか。  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  アプリケーションをどの KCL バージョン互換モードで実行するかを決定します。この設定は、旧バージョンの KCL から移行する場合にのみ使用されます。3.x へ移行する際は、この設定を `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` に設定する必要があります。移行が完了したら、この設定は削除できます。  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  失敗した KCL タスクを再試行するまでの待機時間。単位はミリ秒です。  | 500 (0.5 秒) | 
| logWarningForTaskAfterMillis | LifecycleConfig |  タスクが完了していない場合に警告がログに記録されるまでの待機期間。  | なし | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 障害が発生した場合に ListShards を呼び出す間隔 (ミリ秒)。単位はミリ秒です。 | 1,500 (1.5 秒) | 
| maxListShardsRetryAttempts | RetrievalConfig | 失敗とみなすまでの ListShards の再試行の最大回数。 | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  メトリクスを CloudWatch に送信する前に、バッファリングして保持する最大時間 (ミリ秒) を指定します。  | 10,000 (10 秒) | 
| metricsMaxQueueSize | MetricsConfig |  メトリクスを CloudWatch に送信する前に、バッファリングして保持できるメトリクスの最大数を指定します。  | 10,000 | 
| metricsLevel | MetricsConfig |  有効化して CloudWatch に送信するメトリクスの詳細度を指定します。 指定できる値: NONE、SUMMARY、DETAILED。  |  MetricsLevel.DETAILED  | 
| metricsEnabledDimensions | MetricsConfig |  CloudWatch メトリクスで許可されるディメンションを制御します。  | すべてのディメンション | 

**KCL 3.x で廃止された設定**

KCL 3.x では、次の設定プロパティは廃止されています。


**この表は、KCL 3.x の廃止された設定プロパティを示しています。**  

| 設定プロパティ | 設定クラス | 説明 | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  アプリケーションが同時にスティールを試みるリースの最大数。KCL 3.x ではこの設定は無視され、ワーカーのリソース使用状況に基づいてリースが再割り当てされます。  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  ワーカーが、ターゲットとなるリース数に関係なく、フェイルオーバー時間の 3 倍の期間更新されていない大幅に期限切れのリースや新しいシャードのリースを優先して取得するかどうかを制御します。ただし、最大リース数の制限は引き続き適用されます。KCL 3.x ではこの設定は無視され、期限切れのリースは常に複数のワーカーへ分散されます。  | 

**重要**  
旧バージョンの KCL から KCL 3.x へ移行する間は、廃止された設定プロパティも引き続き保持しておく必要があります。移行期間中、KCL ワーカーはまず KCL 2.x 互換モードで起動し、アプリケーション内のすべての KCL ワーカーが KCL 3.x を実行できる状態であることを検出すると、KCL 3.x の機能モードへ切り替わります。KCL ワーカーが KCL 2.x 互換モードで動作している間は、これらの廃止された設定が必要になります。

# KCL バージョンライフサイクルポリシー
<a name="kcl-version-lifecycle-policy"></a>

このトピックでは、Amazon Kinesis Client Library (KCL) のバージョンライフサイクルポリシーの概要を説明します。 では、新機能と機能強化、バグ修正、セキュリティパッチ、依存関係の更新をサポートするために、KCL バージョンの新しいリリース AWS を定期的に提供しています。最新の機能、セキュリティ更新、基本的な依存関係に対応するため、KCL を常に最新バージョンへ更新しておくことをお勧めします。サポートが終了した KCL バージョンを継続使用することは**お勧めしません**。

主要な KCL バージョンのライフサイクルは、次の 3 つのフェーズで構成されます。
+ **一般提供 (GA)** – このフェーズでは、メジャーバージョンが完全にサポートされています。 は、Kinesis Data Streams の新機能や API アップデートのサポート、バグやセキュリティの修正を含む、定期的なマイナーバージョンとパッチバージョンのリリース AWS を提供します。
+ **メンテナンスモード** – パッチバージョンのリリース AWS を制限して、重大なバグ修正とセキュリティの問題にのみ対処します。Kinesis Data Streams の新機能や API に対する更新は、このメジャーバージョンには行われません。
+ **サポート終了** – そのメジャーバージョンに対して更新やリリースは一切提供されません。以前に公開されたリリースは引き続き公開パッケージマネージャーから入手でき、コードは GitHub に残ります。サポートが終了したバージョンの使用は、ユーザーの裁量で行われます。最新のメジャーバージョンにアップグレードすることをお勧めします。


| メジャーバージョン | 現在のフェーズ | リリース日 | メンテナンスモード日 | サポート終了日 | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | メンテナンスモード | 2013-12-19 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | 一般提供 | 2018-08-02 | -- | -- | 
| KCL 3.x | 一般提供 | 2024-11-06 | -- | -- | 

# 旧バージョンの KCL からの移行
<a name="kcl-migration-previous-versions"></a>

このトピックでは、Kinesis Client Library (KCL) の旧バージョンから移行する方法について説明します。

## KCL 3.0 の新機能
<a name="kcl-migration-new-3-0"></a>

Kinesis Client Library (KCL) 3.0 では、これまでのバージョンと比べて複数の大幅な強化が行われています。
+  コンシューマーアプリケーションのフリート内で、過負荷のワーカーから負荷の低いワーカーへ処理を自動的に再分配することで、アプリケーションのコンピューティングコストを削減します。この新しい負荷分散アルゴリズムにより、ワーカー間の CPU 使用率が均等に分散されるため、ワーカーの過剰プロビジョニングが不要になります。
+  リーステーブルに対する読み取りオペレーションを最適化することで、KCL に関連する DynamoDB のコストを削減します。
+ 現在のワーカーが処理済みレコードのチェックポイントを完了できるようにすることで、リースが別のワーカーに引き継がれる際のデータ再処理を最小限に抑えます。
+  を使用してパフォーマンスとセキュリティ機能 AWS SDK for Java 2.x を向上させ、 AWS SDK for Java 1.x への依存を完全に排除します。

詳細については、[KCL 3.0 リリースノート](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md)を参照してください。

**Topics**
+ [KCL 3.0 の新機能](#kcl-migration-new-3-0)
+ [KCL 2.x から KCL 3.x に移行する](kcl-migration-from-2-3.md)
+ [以前のバージョンにロールバックする](kcl-migration-rollback.md)
+ [ロールバック後に KCL 3.x にロールフォワードする](kcl-migration-rollforward.md)
+ [プロビジョンドキャパシティモードを使用するリーステーブルのベストプラクティス](kcl-migration-lease-table.md)
+ [KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)

# KCL 2.x から KCL 3.x に移行する
<a name="kcl-migration-from-2-3"></a>

このトピックでは、KCL 2.x のコンシューマーを KCL 3.x へ移行するための手順を段階的に説明します。KCL 3.x は、KCL 2.x コンシューマーのインプレース移行をサポートしています。ワーカーをローリング方式で順次移行している間も、Kinesis データストリームからのデータ取得を継続できます。

**重要**  
KCL 3.x は KCL 2.x と同じインターフェイスおよびメソッドを維持しています。したがって、移行中にレコード処理コードを更新する必要はありません。ただし、適切な設定を行い、移行に必要な手順を確認する必要があります。スムーズに移行するために、以下の移行手順に従うことを強くお勧めします。

## ステップ 1: 前提条件
<a name="kcl-migration-from-2-3-prerequisites"></a>

KCL 3.x の使用を始める前に、以下のものが揃っていることを確認してください。
+ Java Development Kit (JDK) 8 以降
+ AWS SDK for Java 2.x
+ 依存関係管理用の Maven または Gradle

**重要**  
KCL 3.x で AWS SDK for Java バージョン 2.27.19 ～ 2.27.23 を使用しないでください。これらのバージョンには、KCL が使用する DynamoDB に関連した例外エラーを引き起こす問題が含まれています。この問題を回避するには、 AWS SDK for Java バージョン 2.28.0 以降を使用することをお勧めします。

## ステップ 2: 依存関係を追加する
<a name="kcl-migration-from-2-3-dependencies"></a>

Maven を使用している場合は、以下の依存関係を `pom.xml` ファイルに追加します。3.x.x を最新の KCL バージョンに置き換えていることを確認してください。

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Gradle を使用している場合は、以下を `build.gradle` ファイルに追加します。3.x.x を最新の KCL バージョンに置き換えていることを確認してください。

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

最新の KCL バージョンは、[Maven Central Repository](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client) で確認できます。

## ステップ 3: 移行関連の設定をセットアップする
<a name="kcl-migration-from-2-3-configuration"></a>

KCL 2.x から KCL 3.x に移行するには、次の設定パラメータを設定する必要があります。
+ CoordinatorConfig.clientVersionConfig: この設定は、アプリケーションをどの KCL バージョン互換モードで実行するかを決定します。KCL 2.x から 3.x へ移行する際は、この設定を `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` に設定する必要があります。この設定を行うには、スケジューラオブジェクトを作成する際に次の行を追加してください。

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

以下は、KCL 2.x から 3.x へ移行する際に、`CoordinatorConfig.clientVersionConfig` を設定する方法の例です。必要に応じて、その他の設定も要件に合わせて調整できます。

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

KCL 2.x と KCL 3.x では負荷分散アルゴリズムが異なるため、コンシューマーアプリケーション内のすべてのワーカーが、同じ時点では同一の負荷分散アルゴリズムを使用していることが重要です。異なる負荷分散アルゴリズムを使用するワーカーが同時に動作していると、両アルゴリズムが独立して動作するため、負荷分散が最適化されない可能性があります。

この KCL 2.x 互換設定により、KCL 3.x アプリケーションは KCL 2.x と互換性のあるモードで動作し、コンシューマーアプリケーション内のすべてのワーカーが KCL 3.x にアップグレードされるまで、KCL 2.x の負荷分散アルゴリズムを利用できます。移行が完了すると、KCL は自動的に完全な KCL 3.x の機能モードへ切り替わり、稼働中のすべてのワーカーで新しい KCL 3.x の負荷分散アルゴリズムを使用し始めます。

**重要**  
`ConfigsBuilder` を使用せず、設定を行うために `LeaseManagementConfig` オブジェクトを作成している場合は、KCL 3.x 以降では `applicationName` という追加のパラメータを指定する必要があります。詳細については、[LeaseManagementConfig コンストラクタでのコンパイルエラー](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig)を参照してください。KCL の設定には、`ConfigsBuilder` を使用することをお勧めします。`ConfigsBuilder` は、KCL アプリケーションをより柔軟かつ保守しやすい方法で設定できる手段を提供します。

## ステップ 4: shutdownRequested() メソッド実装のベストプラクティスに従う
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x では、リース再割り当て時にリースが別のワーカーへ引き継がれる際のデータ再処理を最小限に抑えるため、スムーズなリース引き継ぎと呼ばれる機能が導入されています。**これは、リースが引き継がれる前に、処理済みの最新シーケンス番号をリーステーブルにチェックポイントすることで実現されます。スムーズなリース引き継ぎを正しく機能させるためには、`RecordProcessor` クラスの `shutdownRequested` メソッド内で、`checkpointer` オブジェクトを必ず呼び出すようにする必要があります。`shutdownRequested` メソッド内で `checkpointer` オブジェクトを呼び出さない場合は、次の例に示すように実装できます。

**重要**  
次の実装例は、スムーズなリース引き継ぎの最小要件です。必要に応じて、チェックポイントに関連する追加のロジックを含めるように拡張できます。非同期処理を実行している場合は、チェックポイントを呼び出す前に、ダウンストリームに配信されたすべてのレコードが処理されていることを確認してください。
スムーズなリース引き継ぎによって、リース移行時のデータ再処理が発生する可能性は大幅に低減されますが、その可能性が完全になくなるわけではありません。データの整合性と一貫性を維持するには、ダウンストリームのコンシューマーアプリケーションをべき等になるように設計します。つまり、重複してレコードを処理する可能性があっても、システム全体に悪影響を及ぼさずに処理できるようにしておく必要があります。

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## ステップ 5: KCL 3.x でワーカーメトリクスを収集するための前提条件を確認する
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x は、ワーカー間で負荷を均等に分散するため、CPU 使用率などの CPU 利用メトリクスを収集します。コンシューマーアプリケーションのワーカーは、Amazon EC2、Amazon ECS、Amazon EKS、または AWS Fargate上で実行できます。KCL 3.x がワーカーから CPU 使用率メトリクスを収集できるのは、以下の前提条件が満たされている場合のみです。

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ オペレーティングシステムは Linux OS である必要があります。
+ EC2 インスタンスで [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html) を有効にする必要があります。

 **Amazon Elastic Container Service (Amazon ECS) on Amazon EC2**
+ オペレーティングシステムは Linux OS である必要があります。
+ [ECS タスクメタデータエンドポイントのバージョン 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html) を有効にする必要があります。
+ Amazon ECS コンテナエージェントのバージョンは 1.39.0 以降である必要があります。

 **での Amazon ECS AWS Fargate**
+ [Fargate タスクメタデータエンドポイントのバージョン 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) を有効にする必要があります。Fargate プラットフォームバージョン 1.4.0 以降を使用している場合、これはデフォルトで有効になっています。
+ Fargate プラットフォームバージョン 1.4.0 以降

 **Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2** 
+ オペレーティングシステムは Linux OS である必要があります。

 **での Amazon EKS AWS Fargate**
+ Fargate プラットフォーム 1.3.0 以降

**重要**  
前提条件が満たされず、KCL 3.x がワーカーから CPU 使用率メトリクスを収集できない場合、KCL はリースごとのスループット量に基づいて負荷を再分散します。このフォールバックの負荷再分散メカニズムにより、各ワーカーに割り当てられたリースから得られる合計スループット量が、ワーカー間で均等になるように調整されます。詳細については、「[KCL がワーカーにリースを割り当て、負荷を分散する方法](kcl-dynamoDB.md#kcl-assign-leases)」を参照してください。

## ステップ 6: KCL 3.x 用に IAM アクセス許可を更新する
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

KCL 3.x コンシューマーアプリケーションに関連付けられている IAM ロールまたは IAM ポリシーに、次の許可を追加する必要があります。これは、KCL アプリケーションで使用中の既存の IAM ポリシーを更新する作業を伴います。詳細については、「[KCL コンシューマーアプリケーションに必要な IAM アクセス許可](kcl-iam-permissions.md)」を参照してください。

**重要**  
既存の KCL アプリケーションでは、KCL 2.x では不要であったため、IAM ポリシーに次の IAM アクションおよびリソースが追加されていない可能性があります。KCL 3.x アプリケーションを実行する前に、これらが追加されていることを必ず確認してください。  
アクション: `UpdateTable`  
リソース (ARN): `arn:aws:dynamodb:region:account:table/KCLApplicationName`
アクション: `Query`  
リソース (ARN): `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
アクション: `CreateTable`、`DescribeTable`、`Scan`、`GetItem`、`PutItem`、`UpdateItem`、`DeleteItem`  
リソース (ARN): `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`、`arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
ARNsKCLApplicationName」をそれぞれ独自のアプリケーション名 AWS リージョン、 AWS アカウント 数値、および KCL アプリケーション名に置き換えます。KCL が作成するメタデータテーブルの名前を設定でカスタマイズしている場合は、KCL アプリケーション名ではなく、その指定したテーブル名を使用してください。

## ステップ 7: KCL 3.x のコードをワーカーにデプロイする
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

移行に必要な設定を行い、これまでの移行チェックリストをすべて完了したら、コードをビルドしてワーカーへデプロイできます。

**注記**  
`LeaseManagementConfig` コンストラクタでコンパイルエラーが発生した場合は、トラブルシューティング情報について [LeaseManagementConfig コンストラクタでのコンパイルエラー](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig)を参照してください。

## ステップ 8: 移行を完了する
<a name="kcl-migration-from-2-3-finish"></a>

KCL 3.x のコードをデプロイしている間、KCL は引き続き KCL 2.x のリース割り当てアルゴリズムを使用します。すべてのワーカーに KCL 3.x のコードを正常にデプロイすると、KCL がそれを自動的に検知し、ワーカーのリソース使用状況に基づく新しいリース割り当てアルゴリズムに切り替わります。新しいリース割り当てアルゴリズムの詳細については、[KCL がワーカーにリースを割り当て、負荷を分散する方法](kcl-dynamoDB.md#kcl-assign-leases) を参照してください。

デプロイ中は、CloudWatch に出力される次のメトリクスを使用して移行状況をモニタリングできます。これらのメトリクスは `Migration` オペレーションの下でモニタリングできます。すべてのメトリクスは KCL アプリケーション単位のメトリクスであり、メトリクスレベルは `SUMMARY` に設定されています。`CurrentState:3xWorker` メトリクスの `Sum` 統計値が、KCL アプリケーション内のワーカー合計数と一致した場合、KCL 3.x への移行が正常に完了したことを示します。

**重要**  
 すべてのワーカーが新しいリース割り当てアルゴリズムを実行できる状態になってから、KCL がその新しいアルゴリズムに切り替えるまでには、最低でも 10 分かかります。


**KCL 移行プロセスの CloudWatch メトリクス**  

| メトリクス | 説明 | 
| --- | --- | 
| CurrentState:3xWorker |  KCL 3.x へ正常に移行し、新しいリース割り当てアルゴリズムを実行している KCL ワーカーの数です。このメトリクスの `Sum` 値がワーカーの総数と一致した場合、KCL 3.x への移行が正常に完了したことを示します。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  移行プロセス中に KCL 2.x 互換モードで実行されている KCL ワーカーの数です。このメトリクスの値がゼロ以外である場合、移行がまだ進行中であることを示します。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  移行プロセス中に発生した例外の数です。これらの例外の多くは一時的なエラーであり、KCL 3.x は自動的にリトライして移行を完了しようとします。`Fault` メトリクスの値が継続的に記録されている場合は、移行期間のログを確認して追加のトラブルシューティングを行ってください。問題が解決しない場合は、 にお問い合わせください サポート。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  リーステーブル上のグローバルセカンダリインデックス (GSI) の作成状況です。このメトリクスは、KCL 3.x を実行するための前提条件である、リーステーブル上の GSI が作成されているかどうかを示します。値は 0 または 1 で、1 の場合は GSI が正常に作成されたことを意味します。ロールバック状態の間、このメトリクスは出力されません。ロールフォワード後は、再びこのメトリクスをモニタリングできるようになります。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  すべてのワーカーからのワーカーメトリクス送信状況です。このメトリクスは、すべてのワーカーが CPU 使用率などのメトリクスを送信しているかどうかを示します。値は 0 または 1 で、1 の場合はすべてのワーカーが正常にメトリクスを送信しており、新しいリース割り当てアルゴリズムの利用準備が整っていることを示します。ロールバック状態の間、このメトリクスは出力されません。ロールフォワード後は、再びこのメトリクスをモニタリングできるようになります。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/kcl-migration-from-2-3.html)  | 

KCL は、移行中に 2.x 互換モードへロールバックできる機能を提供しています。KCL 3.x への移行が完了し、ロールバックが不要になった場合は、`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` の `CoordinatorConfig.clientVersionConfig` 設定を削除することをお勧めします。この設定を削除すると、KCL アプリケーションから移行関連のメトリクスが出力されなくなります。

**注記**  
移行期間中および移行完了後の一定期間は、アプリケーションのパフォーマンスと安定性をモニタリングすることをお勧めします。問題が発生した場合は、[KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)を使用して、ワーカーを KCL 2.x 互換の機能にロールバックすることができます。

# 以前のバージョンにロールバックする
<a name="kcl-migration-rollback"></a>

このトピックでは、コンシューマーを前のバージョンにロールバックする手順について説明します。ロールバックが必要な場合は、次の 2 ステップのプロセスを実行します。

1. [KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)を実行します。

1. 以前の KCL バージョンコードを再デプロイします (オプション)。

## ステップ 1: KCL 移行ツールを実行する
<a name="kcl-migration-rollback-tool"></a>

以前の KCL バージョンにロールバックする必要がある場合は、KCL 移行ツールを実行する必要があります。KCL 移行ツールは 2 つの重要なタスクを実行します。
+ これにより、DynamoDB のワーカーメトリクステーブルと呼ばれるメタデータテーブルとリーステーブルのグローバルセカンダリインデックスが削除されます。これら 2 つのアーティファクトは KCL 3.x によって作成されますが、以前のバージョンにロールバックするときには必要ありません。
+ これにより、すべてのワーカーが KCL 2.x と互換性のあるモードで実行され、以前の KCL バージョンで使用される負荷分散アルゴリズムの使用が開始されます。KCL 3.x の新しい負荷分散アルゴリズムに問題がある場合、この問題はすぐに軽減されます。

**重要**  
DynamoDB のコーディネーター状態テーブルが存在し、移行、ロールバック、ロールフォワードプロセス中に削除されていない必要があります。

**注記**  
コンシューマーアプリケーションのすべてのワーカーが、一度に同じ負荷分散アルゴリズムを使用することが重要です。KCL 移行ツールを使用すると、KCL 3.x コンシューマーアプリケーション内のすべてのワーカーが KCL 2.x 互換モードに切り替えられ、前の KCL バージョンにロールバックしている間、すべてのワーカーが同じ負荷分散アルゴリズムで動作するようにします。

[KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)は、[KCL GitHub リポジトリ](https://github.com/awslabs/amazon-kinesis-client/tree/master)のスクリプトディレクトリでダウンロードできます。このスクリプトは、コーディネーター状態テーブルへの書き込み、ワーカーメトリクステーブルの削除、およびリーステーブルの更新を行うための必要な権限を持つ任意のワーカー、または任意のホストから実行できます。スクリプトの実行に必要な IAM アクセス許可については、[KCL コンシューマーアプリケーションに必要な IAM アクセス許可](kcl-iam-permissions.md) を参照してください。このスクリプトは、KCL アプリケーションごとに 1 回だけ実行する必要があります。KCL 移行ツールは、以下のコマンドで実行できます。

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**パラメータ**
+ --region: を `<region>` に置き換えます AWS リージョン。
+ --application\$1name: このパラメータは、DynamoDB メタデータテーブル (リーステーブル、コーディネーター状態テーブル、ワーカーメトリクステーブル) にデフォルト名を使用している場合に必要です。これらのテーブルにカスタム名を指定している場合は、このパラメータを省略できます。`<applicationName>` を実際の KCL アプリケーションの名前に置き換えます。カスタム名が指定されていない場合、ツールはこの名前を使用してデフォルトのテーブル名を取得します。
+ --lease\$1table\$1name (オプション): このパラメータは、KCL 設定でリーステーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。`leaseTableName` をリーステーブルに指定したカスタムテーブル名に置き換えます。
+ --coordinator\$1state\$1table\$1name (オプション): このパラメータは、KCL 設定でコーディネーター状態テーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。`<coordinatorStateTableName>` を、コーディネーター状態テーブルに指定したカスタムテーブル名に置き換えます。
+ --worker\$1metrics\$1table\$1name (オプション): このパラメータは、KCL 設定でワーカーメトリクステーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。`<workerMetricsTableName>` を、ワーカーメトリクステーブルに指定したカスタムテーブル名に置き換えます。

## ステップ 2: 以前の KCL バージョンでコードを再デプロイする (オプション)
<a name="kcl-migration-rollback-redeploy"></a>

 ロールバック用に KCL 移行ツールを実行すると、次のいずれかのメッセージが表示されます。
+ **メッセージ 1:** 「ロールバックが完了しました。お使いの KCL アプリケーションは KCL 2.x 互換モードで動作していました。問題が改善されない場合は、以前の KCL バージョンでコードをデプロイして、元のアプリケーションバイナリにロールバックしてください」。
  + **必要なアクション: **これは、ワーカーが KCL 2.x 互換モードで実行されていたことを意味します。問題が解決しない場合は、以前の KCL バージョンのコードをワーカーに再デプロイしてください。
+ **メッセージ 2:** 「ロールバックが完了しました。お使いの KCL アプリケーションは KCL 3.x の機能モードで動作していました。問題について 5 分以内に問題が緩和されない場合を除き、以前のアプリケーションバイナリへのロールバックは不要です。問題が解決しない場合は、以前の KCL バージョンでコードをデプロイして、以前のアプリケーションバイナリにロールバックしてください」。
  + **必要なアクション: **ワーカーが KCL 3.x モードで実行され、KCL 移行ツールがすべてのワーカーを KCL 2.x 互換モードに切り替えたことを意味します。問題が解決した場合は、以前の KCL バージョンのコードをワーカーに再デプロイする必要はありません。問題が解決しない場合は、以前の KCL バージョンのコードをワーカーに再デプロイしてください。

 

# ロールバック後に KCL 3.x にロールフォワードする
<a name="kcl-migration-rollforward"></a>

このトピックでは、ロールバック後にコンシューマーを KCL 3.x へロールフォワードする手順について説明します。ロールフォワードが必要な場合は、2 ステップのプロセスを完了する必要があります。

1. [KCL 移行ツール](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)を実行します。

1. KCL 3.x を使用してコードをデプロイします。

## ステップ 1: KCL 移行ツールを実行する
<a name="kcl-migration-rollback-tool"></a>

KCL 移行ツールを実行します。次のコマンドで KCL 移行ツールを実行し、KCL 3.x にロールフォワードします。

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**パラメータ**
+ --region: を `<region>` に置き換えます AWS リージョン。
+ --application\$1name: このパラメータは、コーディネーター状態テーブルにデフォルト名を使用している場合に必要です。コーディネーター状態テーブルにカスタム名を指定している場合は、このパラメータを省略できます。`<applicationName>` を実際の KCL アプリケーションの名前に置き換えます。カスタム名が指定されていない場合、ツールはこの名前を使用してデフォルトのテーブル名を取得します。
+ --coordinator\$1state\$1table\$1name (オプション): このパラメータは、KCL 設定でコーディネーター状態テーブルのカスタム名を設定している場合に必要です。デフォルトのテーブル名を使用している場合は、このパラメータを省略できます。`<coordinatorStateTableName>` を、コーディネーター状態テーブルに指定したカスタムテーブル名に置き換えます。

移行ツールをロールフォワードモードで実行すると、KCL は KCL 3.x に必要な次の DynamoDB リソースを作成します。
+ リーステーブルのグローバルセカンダリインデックス
+ ワーカーメトリクステーブル

## ステップ 2: KCL 3.x を使用してコードをデプロイする
<a name="kcl-migration-rollback-redeploy"></a>

ロールフォワードの KCL 移行ツールを実行したら、KCL 3.x を使用してコードをワーカーにデプロイします。[ステップ 8: 移行を完了する](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) に従って移行を完了します。

# プロビジョンドキャパシティモードを使用するリーステーブルのベストプラクティス
<a name="kcl-migration-lease-table"></a>

KCL アプリケーションのリーステーブルがプロビジョンドキャパシティモードに切り替えられている場合、KCL 3.x はリーステーブル上にグローバルセカンダリインデックスを作成します。これは、プロビジョンド課金モードで作成され、ベースとなるリーステーブルと同じ読み込み容量ユニット (RCU) および書き込み容量ユニット (WCU) が設定されます。グローバルセカンダリインデックスが作成された後は、DynamoDB コンソールでグローバルセカンダリインデックスの実際の使用状況をモニタリングし、必要に応じて容量ユニットを調整することをお勧めします。KCL によって作成される DynamoDB メタデータテーブルのキャパシティモードを切り替える方法について詳しくは、[KCL が作成するメタデータテーブルの DynamoDB キャパシティモード](kcl-dynamoDB.md#kcl-capacity-mode) を参照してください。

**注記**  
デフォルトでは、KCL はリーステーブル、ワーカーメトリクステーブル、コーディネーター状態テーブルといったメタデータテーブル、さらにリーステーブル上のグローバルセカンダリインデックスをオンデマンドキャパシティモードで作成します。使用量の変化に基づいて容量を自動的に調整するには、オンデマンドキャパシティモードを使用することをお勧めします。

# KCL 1.x から KCL 3.x への移行
<a name="kcl-migration-1-3"></a>

このトピックでは、KCL 1.x のコンシューマーを KCL 3.x へ移行するための手順を説明します。KCL 1.x は、KCL 2.x や KCL 3.x とは異なるクラスとインターフェイスを使用します。まずレコードプロセッサ、レコードプロセッサファクトリ、ワーカークラスを KCL 2.x/3.x 互換形式に移行し、KCL 2.x から KCL 3.x への移行手順に従う必要があります。KCL 1.x から KCL 3.x に直接アップグレードできます。
+ **ステップ 1: レコードプロセッサを移行する**

  [KCL 1.x から KCL 2.x へのコンシューマーの移行](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)ページの[レコードプロセッサを移行する](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)セクションに従います。
+ **ステップ 2: レコードプロセッサファクトリを移行する**

  [KCL 1.x から KCL 2.x へのコンシューマーの移行](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration)ページの[レコードプロセッサファクトリーを移行する](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)セクションに従います。
+ **ステップ 3: ワーカーを移行する**

  [KCL 1.x から KCL 2.x へのコンシューマーの移行](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration)ページの[ワーカーを移行する](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)セクションに従います。
+ **ステップ 4: KCL 1.x 設定を移行する**

  [KCL 1.x から KCL 2.x へのコンシューマーの移行](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration)ページの[Amazon Kinesis クライアントを設定する](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)セクションに従います。
+ **ステップ 5: アイドル時間の削除とクライアント設定の削除を確認する**

  [KCL 1.x から KCL 2.x へのコンシューマーの移行](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration)ページの[アイドル時間の削除](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal)および[クライアント設定の削除](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals)セクションに従います。
+ **ステップ 6: KCL 2.x から KCL 3.x への移行ガイドの手順に従う**

  [KCL 2.x から KCL 3.x に移行する](kcl-migration-from-2-3.md) ページの手順に従って移行を完了します。以前の KCL バージョンにロールバックするか、ロールバック後に KCL 3.x にロールフォワードする必要がある場合は、[以前のバージョンにロールバックする](kcl-migration-rollback.md) および [ロールバック後に KCL 3.x にロールフォワードする](kcl-migration-rollforward.md) を参照してください。

**重要**  
KCL 3.x で AWS SDK for Java バージョン 2.27.19 ～ 2.27.23 を使用しないでください。これらのバージョンには、KCL が使用する DynamoDB に関連した例外エラーを引き起こす問題が含まれています。この問題を回避するには、 AWS SDK for Java バージョン 2.28.0 以降を使用することをお勧めします。

# 以前の KCL バージョンのドキュメント
<a name="kcl-archive"></a>

次のトピックはアーカイブされました。Kinesis Client Library の現行のドキュメントについては、[Kinesis Client Library を使用する](kcl.md) を参照してください。

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

**Topics**
+ [KCL 1.x および 2.x の情報](shared-throughput-kcl-consumers.md)
+ [共有スループットでカスタムコンシューマーを開発する](shared-throughput-consumers.md)
+ [コンシューマーを KCL 1.x から KCL 2.x に移行する](kcl-migration.md)

# KCL 1.x および 2.x の情報
<a name="shared-throughput-kcl-consumers"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

KDS データストリームからデータを処理できるカスタムコンシューマーアプリケーションを開発する方法の 1 つは、Kinesis Client Library (KCL) を使用することです。

**Topics**
+ [KCL について (以前のバージョン)](#shared-throughput-kcl-consumers-overview)
+ [KCL の以前のバージョン](#shared-throughput-kcl-consumers-versions)
+ [KCL の概念 (以前のバージョン)](#shared-throughput-kcl-consumers-concepts)
+ [リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](#shared-throughput-kcl-consumers-leasetable)
+ [Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する](#shared-throughput-kcl-multistream)
+ [Schema Registry で KCL AWS Glue を使用する](#shared-throughput-kcl-consumers-glue-schema-registry)

**注記**  
KCL 1.x と KCL 2.x については、どちらも使用シナリオに応じて最新の KCL 1.x バージョンまたは KCL 2.x バージョンにアップグレードすることをお勧めします。KCL 1.x と KCL 2.x は、どちらも新しいリリースに伴って定期的に更新されています。これには、最新の依存関係パッチ、セキュリティパッチ、バグ修正、および下位互換性のある新機能が含まれます。詳細については、[https://github.com/awslabs/amazon-kinesis-client/releases](https://github.com/awslabs/amazon-kinesis-client/releases) を参照してください。

## KCL について (以前のバージョン)
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis Data Streams からデータを消費および処理するのに役立ちます。これには、複数のコンシューマーアプリケーションインスタンス間での負荷分散、コンシューマーアプリケーションインスタンスの障害に対する応答、処理済みのレコードのチェックポイント作成、リシャーディングへの対応が挙げられます。KCL はこれらのサブタスクをすべて処理するため、カスタムレコード処理ロジックの記述に集中できます。

KCL は AWS SDK で使用できる Kinesis Data Streams API とは異なることに注意してください。Kinesis Data Streams API では、Kinesis Data Streams の多くの機能 (ストリームの作成、リシャーディング、レコードの入力と取得など) を管理できます。KCL は、これらすべてのサブタスクの抽象化レイヤーを提供します。具体的には、コンシューマーアプリケーションのカスタムデータ処理ロジックに集中できます。Kinesis Data Streams API の詳細については、[Amazon Kinesis API リファレンス](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html)を参照してください。

**重要**  
KCL は Java ライブラリです。Java 以外の言語のサポートは、MultiLangDaemon と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。例えば、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。GitHub の MultiLangDaemon の詳細については、[KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)を参照してください。

KCL は、レコード処理ロジックと Kinesis Data Streams の仲介として機能します。

## KCL の以前のバージョン
<a name="shared-throughput-kcl-consumers-versions"></a>

現在、次のいずれかのサポートされているバージョンの KCL を使用して、カスタムコンシューマーアプリケーションを構築できます。
+ **KCL 1.x**

  詳細については、[KCL 1.x コンシューマーを開発する](developing-consumers-with-kcl.md)を参照してください。
+ **KCL 2.x**

  詳細については、[KCL 2.x コンシューマーを開発する](developing-consumers-with-kcl-v2.md)を参照してください。

KCL 1.x または KCL 2.x のいずれかを使用して、共有スループットを使用するコンシューマーアプリケーションを構築できます。詳細については、[KCL を使用した共有スループットでカスタムコンシューマーを開発する](custom-kcl-consumers.md)を参照してください。

専用スループット (拡張ファンアウトコンシューマー) を使用するコンシューマーアプリケーションを構築するには、KCL 2.x のみを使用できます。詳細については、[専用スループットを備えた拡張ファンアウトを開発する](enhanced-consumers.md)を参照してください。

KCL 1.x と KCL 2.x の違い、および KCL 1.x から KCL 2.x に移行する方法については、[コンシューマーを KCL 1.x から KCL 2.x に移行する](kcl-migration.md)を参照してください。

## KCL の概念 (以前のバージョン)
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **KCL コンシューマーアプリケーション** - KCL を使用してカスタムビルドされ、データストリームからレコードを取得して処理するように設計されたアプリケーション。
+ **コンシューマーアプリケーションインスタンス** - KCL コンシューマーアプリケーションは、通常、障害時の調整とデータレコード処理の動的な負荷分散のために、1 つ以上のアプリケーションインスタンスが同時に実行され、分散されます。
+ **ワーカー** - KCL コンシューマーアプリケーションインスタンスがデータの処理を開始するために使用する高レベルクラス。
**重要**  
各 KCL コンシューマーアプリケーションインスタンスには 1 つのワーカーがあります。

  ワーカーは、シャードとリース情報の同期、シャード割り当ての追跡、シャードからのデータの処理など、さまざまなタスクを初期化し、監督します。ワーカーは、この KCL コンシューマーアプリケーションが処理するデータレコードのデータストリームの名前や、このデータストリームへのアクセスに必要な AWS 認証情報など、コンシューマーアプリケーションの設定情報を KCL に提供します。ワーカーは、その特定の KCL コンシューマーアプリケーションインスタンスを開始して、データストリームからレコードプロセッサにデータレコードを配信します。
**重要**  
KCL 1.x では、このクラスは**ワーカー**と呼ばれます。詳細については、(Java KCL リポジトリです)、[https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java) を参照してください。KCL 2.x では、このクラスは**スケジューラ**と呼ばれます。KCL 2.x のスケジューラの目的は、KCL 1.x のワーカーの目的と同じです。KCL 2.x のスケジューラクラスの詳細については、[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java)を参照してください。
+ **リース** - ワーカーとシャード間のバインディングを定義するデータ。分散型 KCL コンシューマーアプリケーションは、リースを使用して、複数のワーカー間でデータレコード処理を分割します。いつでも、データレコードの各シャードは、**leaseKey** によって識別されるリースによって特定のワーカーにバインドされます。

  デフォルトでは、ワーカーは 1 つ以上のリースを同時に保持できます (**maxLeasesForWorker** 変数)。
**重要**  
すべてのワーカーは、データストリーム内の利用可能なすべてのシャードについて、利用可能なすべてのリースを保持すると競合します。しかし、一度に各リースを正常に保持できるのは1人のワーカーだけです。

  例えば、4 つのシャードを持つデータストリームを処理しているワーカー A を持つコンシューマーアプリケーションインスタンス A がある場合、ワーカー A はシャード 1、2、3、および 4 へのリースを同時に保持できます。ただし、2 つのコンシューマーアプリケーションインスタンス (ワーカー A とワーカー B を含む A と B) があり、これらのインスタンスが 4 つのシャードを持つデータストリームを処理している場合、ワーカー A とワーカー B はシャード 1 へのリースを同時に保持できません。あるワーカーは、このシャードのデータレコードの処理を停止する準備ができるまで、または失敗するまで、特定のシャードへのリースを保持します。あるワーカーがリースの保留を停止すると、別のワーカーがリースを引き取り、保留します。

  詳細については、(Java KCL リポジトリにあります)、を参照してください。[https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java)の場合 KCL 1.x および[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java) KCL 2.x
+ **リーステーブル** - KCL コンシューマーアプリケーションのワーカーによってリースおよび処理されている KDS データストリーム内のシャードを追跡するために使用される一意の Amazon DynamoDB テーブル。リーステーブルは、KCL コンシューマーアプリケーションの実行中に、データストリームからの最新のシャード情報と (ワーカー内およびすべてのワーカー間で) 同期を維持する必要があります。詳細については、[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](#shared-throughput-kcl-consumers-leasetable)を参照してください。
+ **レコードプロセッサ** - KCL コンシューマーアプリケーションがデータストリームから取得したデータを処理する方法を定義するロジック。実行時に、KCL コンシューマーアプリケーションインスタンスがワーカーをインスタンス化し、このワーカーは、リースを保持するシャードごとに 1 つのレコードプロセッサをインスタンス化します。

## リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [リーステーブルとは何ですか?](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [スループット](#shared-throughput-kcl-leasetable-throughput)
+ [Kinesis データストリームのシャードとリーステーブルの同期方法](#shared-throughput-kcl-consumers-leasetable-sync)

### リーステーブルとは何ですか?
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

それぞれの Amazon Kinesis Data Streams アプリケーションに、KCLは、一意のリーステーブル (Amazon DynamoDB テーブルに保存されている) を使用して、KCL コンシューマーアプリケーションのワーカーによってリースおよび処理されている KDS データストリーム内のシャードを追跡します。

**重要**  
KCL は、コンシューマーアプリケーションの名前を使用して、このコンシューマーアプリケーションが使用するリーステーブル名を作成します。したがって、各コンシューマーアプリケーション名は一意である必要があります。

コンシューマーアプリケーションの実行中に、[Amazon DynamoDB コンソール](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html)を使用してリーステーブルを表示できます。

アプリケーションの起動時にKCL コンシューマーアプリケーションのリーステーブルが存在しない場合は、いずれかのワーカーがこのアプリケーションのリーステーブルを作成します。

**重要**  
 アカウントには、Kinesis Data Streams 自体に関連するコストに加えて、DynamoDB テーブルに関連するコストが発生します。

テーブルの各行は、コンシューマーアプリケーションのワーカーによって処理中のシャードを表します。KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理する場合、リーステーブルのハッシュキー `leaseKey` はシャード ID です。[Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する](#shared-throughput-kcl-multistream) であれば、leaseKey の構造は次のようになります: `account-id:StreamName:streamCreationTimestamp:ShardId`。例えば、`111111111:multiStreamTest-1:12345:shardId-000000000336`。

シャード ID に加えて、各行には次のデータが含まれます。
+ **checkpoint:** シャードの最新チェックポイントのシーケンス番号。この値はストリームのすべてのシャードで一意です。
+ **checkpointSubSequenceNumber:** Kinesis Producer Library の集約機能を使用する場合、これは Kinesis レコード内の個々のユーザレコードを追跡する**チェックポイント**の拡張です。
+ **leaseCounter:** ワーカーのリースが他のワーカーに保持されていることをワーカーが検出できるように、リースのバージョニングに使用されます。
+ **leaseKey:** リースの固有識別子。各リースはデータストリームのシャードに固有であり、一度に 1 つのワーカーで保持されます。
+ **leaseOwner:** このリースを保持しているワーカー。
+ **ownerSwitchesSinceCheckpoint:** 最後にチェックポイントが書き込まれてから、このリースのワーカーが何回変更されたかを示します。
+ **parentShardId:** 子シャードの処理を開始する前に、親シャードが完全に処理済みであることを確認するために使用します。これにより、レコードがストリームに入力されたのと同じ順序で処理されるようになります。
+ **hashrange:** `PeriodicShardSyncManager` で使われて、定期的な同期を実行し、リーステーブルで欠落しているシャードを見つけ、必要に応じてリースを作成します。
**注記**  
このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。`PeriodicShardSyncManager` の詳細およびリースとシャード間の定期的な同期については、[Kinesis データストリームのシャードとリーステーブルの同期方法](#shared-throughput-kcl-consumers-leasetable-sync) を参照してください。
+ **childshards:** `LeaseCleanupManager` で使われて、子シャードの処理ステータスを確認し、親シャードをリーステーブルから削除できるかどうかを決定します。
**注記**  
このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。
+ **shardID:** シャードの ID。
**注記**  
このデータは、[Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する](#shared-throughput-kcl-multistream) である場合にのみリーステーブルに存在します。これは Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。
+ **stream name** 以下の形式のデータストリームの識別子: `account-id:StreamName:streamCreationTimestamp`。
**注記**  
このデータは、[Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する](#shared-throughput-kcl-multistream) である場合にのみリーステーブルに存在します。これは Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。

### スループット
<a name="shared-throughput-kcl-leasetable-throughput"></a>

Amazon Kinesis Data Streams アプリケーションでプロビジョニングされたスループットの例外が発生した場合は、DynamoDB テーブルのプロビジョニングされたスループットを増やす必要があります。KCL がテーブルを作成するときにプロビジョニングされるスループットは、1 秒あたりの読み込み 10 回、1 秒あたりの書き込み 10 回ですが、これがユーザーのアプリケーションで十分でない場合があります。例えば、Amazon Kinesis Data Streams アプリケーションが頻繁にチェックポイントを作成する場合や、多くのシャードで構成されるストリームを処理する場合は、より多くのスループットが必要になる可能性があります。

DynamoDB でプロビジョニングされたスループットについては、*Amazon DynamoDB デベロッパーガイド*の[読み取り/書き込み容量モード](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html)および[テーブルとデータの操作](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html)を参照してください。

### Kinesis データストリームのシャードとリーステーブルの同期方法
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

KCL コンシューマーアプリケーションのワーカーは、リースを使用して特定のデータストリームからシャードを処理します。特定の時点でどのワーカーがどのシャードをリースしているかに関する情報は、リーステーブルに保存されます。リーステーブルは、KCL コンシューマーアプリケーションの実行中に、データストリームからの最新のシャード情報と同期を維持する必要があります。KCL は、コンシューマーアプリケーションのブートストラップ (コンシューマーアプリケーションの初期化時または再起動時)、および処理中のシャードが終了 (リシャーディング) に達するたびに、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。つまり、ワーカーまたは KCL コンシューマーアプリケーションは、最初のコンシューマーアプリケーションのブートストラップ中、およびコンシューマーアプリケーションでデータストリームリシャードイベントが発生するたびに、処理中のデータストリームと同期されます。

**Topics**
+ [KCL 1.0 - 1.13 と KCL 2.0 - 2.2 での同期](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [KCL 2.x での同期、KCL 2.3 以降で始まる](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [KCL 1.x での同期、KCL 1.14 以降で始まる](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### KCL 1.0 - 1.13 と KCL 2.0 - 2.2 での同期
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

KCL 1.0 - 1.13 および KCL 2.0 - 2.2 では、コンシューマーアプリケーションのブートストラップ、および各データストリームのリシャードイベント中に、KCL は、`ListShards` または `DescribeStream` 検出 API を呼び出して、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。上記のすべての KCL バージョンで、KCL コンシューマーアプリケーションの各ワーカーは、コンシューマーアプリケーションのブートストラップ中および各ストリームリシャードイベントでリース/シャード同期プロセスを実行するために、次の手順を完了します。
+ 処理中のストリームのデータのすべてのシャードをフェッチします。
+ リーステーブルからすべてのシャードリースをフェッチします。
+ リーステーブルにリースのないオープンシャードをフィルターで除外します。
+ 見つかったすべてのオープンシャードと、開いている親を持たない各オープンシャードについて反復処理します。
  + 階層ツリーをその祖先パスを通過して、シャードが子孫であるかどうかを判断します。祖先シャードが処理されている場合 (リーステーブルに祖先シャードのリースエントリが存在する場合)、または祖先シャードを処理する必要がある場合 (例えば、初期位置が`TRIM_HORIZON`または`AT_TIMESTAMP`)、シャードは子孫と見なされます。
  + コンテキスト内のオープンシャードが子孫である場合、KCL は初期位置に基づいてシャードをチェックポイントし、必要に応じて親のリースを作成します。

#### KCL 2.x での同期、KCL 2.3 以降で始まる
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

サポートされている最新バージョンの KCL 2.x (KCL 2.3) 以降では、ライブラリで同期プロセスに対する次の変更がサポートされるようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションから Kinesis Data Streams サービスに対して実行される API コールの数が大幅に削減され、KCL コンシューマーアプリケーションのリース管理が最適化されます。
+ アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL は`ListShard` API のフィルタリングオプション (`ShardFilter` オプションのリクエストパラメータ) を使用して、`ShardFilter` パラメータで指定された時間に開いているシャードのスナップショットに対してのみリースを取得および作成します。`ShardFilter` パラメーターを使用すると、`ListShards` API の応答をフィルターで除外できます。`ShardFilter` パラメータの唯一の必須プロパティは `Type` です。KCL は `Type` フィルタープロパティとその次の有効な値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。
  + `AT_TRIM_HORIZON` - 応答には、`TRIM_HORIZON` で開いていたすべてのシャードが含まれます。
  + `AT_LATEST` - 応答には、データストリームの現在開いているシャードのみが含まれます。
  + `AT_TIMESTAMP` - 応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。

  `ShardFilter` は空のリーステーブルのリースを作成して、`RetrievalConfig#initialPositionInStreamExtended` で指定したシャードのスナップショットのリースを初期化するときに使用されます。

  `ShardFilter` の詳細については、「[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html)」を参照してください。
+ すべてのワーカーがリース/シャード同期を実行して、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、選択された単一のワーカーリーダーがリース/シャードの同期を実行します。
+ KCL 2.3 は、`GetRecords` および `SubscribeToShard` APIのリターンパラメータ `ChildShards` を使用して、閉じたシャードに対して `SHARD_END` で発生するリース/シャード同期を実行します。これにより、KCL ワーカーは、処理が終了したシャードの子シャードに対してのみリースを作成できます。コンシューマーアプリケーション全体で共有する場合、リース/シャード同期のこの最適化では `GetRecords` API の `ChildShards` パラメータを使用します。専用スループット (拡張ファンアウト) コンシューマーアプリケーションの場合、リース/シャード同期のこの最適化では `SubscribeToShard` API の `ChildShards` パラメータを使用します。詳細については、[GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)、[SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)、および [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html) を参照してください。
+ 上記の変更により、KCL の動作は、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行します。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL は、リーステーブル内の潜在的なホールを特定するために、追加の定期的なシャード/リーススキャンを実行して (つまり、すべての新しいシャードについて学習する)、データストリームの完全なハッシュ範囲が処理されていることを確認し、必要に応じてそれらのリースを作成します。`PeriodicShardSyncManager`は定期的なリース/シャードスキャンの実行を担当するコンポーネントです。

  KCL 2.3 の `PeriodicShardSyncManager` の詳細については、[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java\$1L201-L213](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213)を参照してください。

  KCL 2.3 では、新しい設定オプションを使用して、`LeaseManagementConfig` で `PeriodicShardSyncManager` を設定できるようになりました。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/shared-throughput-kcl-consumers.html)

  新しい CloudWatch メトリクスも発行され、`PeriodicShardSyncManager` のヘルスをモニタリングします。詳細については、[PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task)を参照してください。
+ `HierarchicalShardSyncer` への最適化を含めて、シャードの 1 つのレイヤーに対してのみリースを作成します。

#### KCL 1.x での同期、KCL 1.14 以降で始まる
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

サポートされている最新バージョンの KCL 1.x (KCL 1.14) 以降では、ライブラリで同期プロセスに対する次の変更がサポートされるようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションから Kinesis Data Streams サービスに対して実行される API コールの数が大幅に削減され、KCL コンシューマーアプリケーションのリース管理が最適化されます。
+ アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL は`ListShard` API のフィルタリングオプション (`ShardFilter` オプションのリクエストパラメータ) を使用して、`ShardFilter` パラメータで指定された時間に開いているシャードのスナップショットに対してのみリースを取得および作成します。`ShardFilter` パラメーターを使用すると、`ListShards` API の応答をフィルターで除外できます。`ShardFilter` パラメータの唯一の必須プロパティは `Type` です。KCL は `Type` フィルタープロパティとその次の有効な値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。
  + `AT_TRIM_HORIZON` - 応答には、`TRIM_HORIZON` で開いていたすべてのシャードが含まれます。
  + `AT_LATEST` - 応答には、データストリームの現在開いているシャードのみが含まれます。
  + `AT_TIMESTAMP` - 応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。

  `ShardFilter` は空のリーステーブルのリースを作成して、`KinesisClientLibConfiguration#initialPositionInStreamExtended` で指定したシャードのスナップショットのリースを初期化するときに使用されます。

  `ShardFilter` の詳細については、「[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html)」を参照してください。
+ すべてのワーカーがリース/シャード同期を実行して、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、選択された単一のワーカーリーダーがリース/シャードの同期を実行します。
+ KCL 1.14 は、`GetRecords` および `SubscribeToShard` APIのリターンパラメータ `ChildShards` を使用して、閉じたシャードに対して `SHARD_END` で発生するリース/シャード同期を実行します。これにより、KCL ワーカーは、処理が終了したシャードの子シャードに対してのみリースを作成できます。詳細については、[GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) および [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html) を参照してください。
+ 上記の変更により、KCL の動作は、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行します。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL は、リーステーブル内の潜在的なホールを特定するために、追加の定期的なシャード/リーススキャンを実行して (つまり、すべての新しいシャードについて学習する)、データストリームの完全なハッシュ範囲が処理されていることを確認し、必要に応じてそれらのリースを作成します。`PeriodicShardSyncManager`は定期的なリース/シャードスキャンの実行を担当するコンポーネントです。

  `KinesisClientLibConfiguration#shardSyncStrategyType` が `ShardSyncStrategyType.SHARD_END` に設定されると、`PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` は、シャード同期を強制するために、リーステーブル内のホールを含む連続スキャンの数のしきい値を決定するために使用されます。`KinesisClientLibConfiguration#shardSyncStrategyType` が `ShardSyncStrategyType.PERIODIC` に設定されると、`leasesRecoveryAuditorInconsistencyConfidenceThreshold` は無視されます。

  KCL 1.14での `PeriodicShardSyncManager` の詳細については、[https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java\$1L987-L999](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999)を参照してください。

  KCL 1.14 では、新しい設定オプションを使用して、`LeaseManagementConfig` で `PeriodicShardSyncManager` を設定できるようになりました。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/shared-throughput-kcl-consumers.html)

  新しい CloudWatch メトリクスも発行され、`PeriodicShardSyncManager` のヘルスをモニタリングします。詳細については、[PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task)を参照してください。
+ KCL 1.14 は、遅延リースのクリーンアップもサポートするようになりました。リースは、`SHARD_END` に到達したとき、シャードがデータストリームの保持期間を過ぎて期限切れになったとき、またはリシャーディングオペレーションの結果として閉じられたとき、`LeaseCleanupManager` により非同期的に削除されます。

  新しい設定オプションを使用して、`LeaseCleanupManager` を設定できるようになりました。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ `KinesisShardSyncer` への最適化を含めて、シャードの 1 つのレイヤーに対してのみリースを作成します。

## Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する
<a name="shared-throughput-kcl-multistream"></a>

このセクションでは、複数のデータストリームを同時に処理できる KCL コンシューマーアプリケーションを作成できる KCL 2.x for Java での次の変更について説明します。

**重要**  
マルチストリーム処理は、Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。  
KCL 2.x を実装できる他の言語では、マルチストリーム処理はサポートされていません。  
マルチストリーム処理は KCL 1.x のどのバージョンでもサポートされていません。
+ **MultistreamTracker インターフェイス**

  複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、[MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java) という新しいインターフェイスを実装する必要があります。このインターフェースには、KCL コンシューマーアプリケーションによって処理されるデータストリームとその設定のリストを返す `streamConfigList` メソッドが含まれています。処理中のデータストリームは、コンシューマーアプリケーションのランタイム中に変更できることに注意してください。`streamConfigList`は、処理するデータストリームの変更について学習するために KCL によって定期的に呼び出されます。

  `streamConfigList` メソッドが [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) リストに入力します。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  `StreamIdentifier` および `InitialPositionInStreamExtended` は必須フィールドですが、`consumerArn` は省略可能である点に注意してください。KCL 2.x を使用して拡張ファンアウトコンシューマーアプリケーションを実装する場合にのみ、`consumerArn` を提供する必要があります。

  の詳細については`StreamIdentifier`、[https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129) を参照してください。`StreamIdentifier` を作成するには、`streamArn` および v2.5.0 以降で利用可能な `streamCreationEpoch` からマルチストリームインスタンスを作成することをお勧めします。`streamArm` をサポートしていない KCL v2.3 および v2.4 では、`account-id:StreamName:streamCreationTimestamp` 形式を使用してマルチストリームインスタンスを作成します。この形式は廃止され、次のメジャーリリース以降はサポートされなくなります。

  `MultistreamTracker` には、リーステーブル内の古いストリームのリースを削除するための戦略も含まれます(`formerStreamsLeasesDeletionStrategy`)。コンシューマーアプリケーションのランタイム中は、ストラテジーを変更できないことに注意してください。 詳細については、[https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)を参照してください。
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) は、アプリケーション全体のクラスで、KCL コンシューマーアプリケーションの構築時に使用する KCL 2.x の構成設定をすべて指定するために使用できます。`ConfigsBuilder`クラスは `MultistreamTracker` インターフェイスをサポートするようになりました。ConfigsBuilder は、レコードを消費する 1 つのデータストリームの名前を使用して初期化できます。

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  または、複数のストリームを同時に処理する KCL コンシューマーアプリケーションを実装する場合、`MultiStreamTracker` で ConfigsBuilder を初期化することもできます。

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ KCL コンシューマーアプリケーションにマルチストリームサポートが実装されているため、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれます。
+ KCL コンシューマーアプリケーションのマルチストリームサポートが実装されている場合、leaseKey は次の構造を取ります: `account-id:StreamName:streamCreationTimestamp:ShardId`。例えば、`111111111:multiStreamTest-1:12345:shardId-000000000336`。
**重要**  
KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理する場合、リーステーブルのハッシュキー leaseKey はシャード ID です。この既存の KCL コンシューマーアプリケーションを複数のデータストリームを処理するように再構成すると、リーステーブルが壊れます。マルチストリームサポートでは LeaseKey 構造体は次のようになっている必要があるためです: `account-id:StreamName:StreamCreationTimestamp:ShardId`。

## Schema Registry で KCL AWS Glue を使用する
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

Kinesis データストリームを AWS Glue Schema Registry と統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマ は、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glueスキーマレジストリを使用すると、ストリーミングアプリケーション内のend-to-endのデータ品質とデータガバナンスを向上させることができます。詳細については、[AWS Glue スキーマレジストリ](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)を参照してください。この統合を設定する方法の 1 つは、Java で KCL を使用することです。

**重要**  
現在、Kinesis Data Streams と AWS Glue Schema Registry の統合は、Java に実装された KCL 2.3 コンシューマーを使用する Kinesis データストリームでのみサポートされています。多言語サポートは提供されていません。KCL 1.0 コンシューマーはサポートされていません。KCL 2.3 より前の KCL 2.x コンシューマーはサポートされていません。

KCL を使用して Kinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細については、[「ユースケース: Amazon Kinesis Data Streams と AWS Glue スキーマレジストリの統合](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)」の「KPL/KCL ライブラリを使用したデータの操作」セクションを参照してください。

# 共有スループットでカスタムコンシューマーを開発する
<a name="shared-throughput-consumers"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams からデータを受け取る際に専用スループットを必要としない場合で、200 ms 以下の読み取り伝達遅延を必要としない場合は、以下のトピックで説明しているようにコンシューマーアプリケーションを構築できます。Kinesis Client Library (KCL) または AWS SDK for Javaを使用できます。

**Topics**
+ [KCL を使用した共有スループットでカスタムコンシューマーを開発する](custom-kcl-consumers.md)

専有スループットで Kinesis data streams からレコードを受信できるコンシューマーの構築の詳細については、[専用スループットを備えた拡張ファンアウトを開発する](enhanced-consumers.md)を参照してください。

# KCL を使用した共有スループットでカスタムコンシューマーを開発する
<a name="custom-kcl-consumers"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

共有スループットでカスタムコンシューマーアプリケーションを開発する方法の 1 つは、Kinesis Client Library (KCL) を使用することです。

使用している KCL バージョンの次のトピックから選択します。

**Topics**
+ [KCL 1.x コンシューマーを開発する](developing-consumers-with-kcl.md)
+ [KCL 2.x コンシューマーを開発する](developing-consumers-with-kcl-v2.md)

# KCL 1.x コンシューマーを開発する
<a name="developing-consumers-with-kcl"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Client Library (KCL) を使用して、Amazon Kinesis Data Streams のコンシューマーアプリケーションを開発することができます。

KCL の詳細については、[KCL について (以前のバージョン)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview)を参照してください。

使用するオプションに応じて、次のトピックから選択します。

**Topics**
+ [Java での Kinesis クライアントライブラリコンシューマーを開発する](kinesis-record-processor-implementation-app-java.md)
+ [ode.js で Kinesis Client Library コンシューマーを開発する](kinesis-record-processor-implementation-app-nodejs.md)
+ [.NET で Kinesis Client Library コンシューマーを開発する](kinesis-record-processor-implementation-app-dotnet.md)
+ [Python で Kinesis クライアントライブラリコンシューマーを開発する](kinesis-record-processor-implementation-app-py.md)
+ [Ruby で Kinesis Client Library コンシューマーを開発する](kinesis-record-processor-implementation-app-ruby.md)

# Java での Kinesis クライアントライブラリコンシューマーを開発する
<a name="kinesis-record-processor-implementation-app-java"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Java について説明します。Javadoc リファレンスを表示するには、[AWS Javadoc topic for Class AmazonKinesisClient](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)を参照してください。

GitHub から Java KCL をダウンロードするには、[Kinesis Client Library (Java)](https://github.com/awslabs/amazon-kinesis-client) にアクセスしてください。Apache Maven で Java KCL を検索するには、[KCL 検索結果](https://search.maven.org/#search|ga|1|amazon-kinesis-client)のページを参照してください。Java KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub の[KCL for Java sample project](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis)ページを参照してください。

このサンプルアプリケーションは [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html) を使用します。ログ設定は、`configure` ファイルで定義されている静的な `AmazonKinesisApplicationSample.java` メソッドを使用して変更できます。Log4j および Java アプリケーションで Apache Commons ログ記録を使用する方法の詳細については、「 *AWS SDK for Java デベロッパーガイド*」の「Log4j AWS を使用したログ記録」を参照してください。 [ Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) 

Java で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

**Topics**
+ [IRecordProcessor メソッドを実装する](#kinesis-record-processor-implementation-interface-java)
+ [IRecordProcessor インターフェイスのクラスファクトリを実装する](#kinesis-record-processor-implementation-factory-java)
+ [ワーカーを作成する](#kcl-java-worker)
+ [設定プロパティを変更する](#kinesis-record-processor-initialization-java)
+ [レコードプロセッサインターフェイスのバージョン 2 に移行する](#kcl-java-v2-migration)

## IRecordProcessor メソッドを実装する
<a name="kinesis-record-processor-implementation-interface-java"></a>

KCL は現在、`IRecordProcessor` インターフェイスの 2 つのバージョンをサポートしています。元のインターフェイスは最初のバージョンの KCL で利用可能です。バージョン 2 は KCL バージョン 1.5.0 から利用可能です。両方のインターフェイスが完全にサポートされています。選択するインターフェイスは、お使いのシナリオの要件によって異なります。相違点をすべて確認するには、ローカルに作成した Javadocs、またはソースコードを参照してください。以下のセクションでは、使い始めの最小限の実装を概説します。

**Topics**
+ [オリジナルインターフェイス (バージョン 1)](#kcl-java-interface-original)
+ [更新されたインターフェイス (バージョン 2)](#kcl-java-interface-v2)

### オリジナルインターフェイス (バージョン 1)
<a name="kcl-java-interface-original"></a>

オリジナルな `IRecordProcessor` interface (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。このサンプルでは、開始点として使用できる実装を提供しています (`AmazonKinesisApplicationSampleRecordProcessor.java` を参照してください)。

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**初期化**  
KCL は、レコードプロセッサがインスタンス化されると、`initialize` メソッドを呼び出し、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。Kinesis Data Streams は*少なくとも 1 回*のセマンティクスを使用しています。これは、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、[シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。](kinesis-record-processor-scaling.md)を参照してください。

```
public void initialize(String shardId)
```

**processRecords**  
KCL は、`processRecords` メソッドを呼び出し、`initialize(shardId)` メソッドで指定されたシャードのデータレコードのリストを渡します。レコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。`Record` クラスは、レコードのデータ、シーケンス番号、およびパーティションキーへのアクセスを提供する次のメソッドを公開します。

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

サンプルでは、プライベートメソッド `processRecordsWithRetries` に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、チェックポインタ (`IRecordProcessorCheckpointer`) を `processRecords` に渡すことで、この追跡をユーザーに代わって処理します。レコードプロセッサは、このインターフェイスで `checkpoint` メソッドを呼び出し、シャード内のレコードの処理の進行状況を KCL に知らせます。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが `checkpoint` を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さないと、`checkpoint` への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、`checkpoint` を呼び出す必要があります。レコードプロセッサは、`checkpoint` の各呼び出しで `processRecords` を呼び出す必要はありません。たとえば、プロセッサは、`checkpoint` を 3 回呼び出すたびに、`processRecords` を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして `checkpoint` に指定できます。この場合、KCL は、すべてのレコードがそのレコードまで処理されたと見なします。

このサンプルでは、プライベートメソッド `checkpoint` で、適切な例外処理と再試行のロジックを使用する `IRecordProcessorCheckpointer.checkpoint` を呼び出す方法を示しています。

KCL は、`processRecords` を使用して、データレコードの処理から発生するすべての例外を処理します。例外が `processRecords` からスローされた場合、KCL は、例外発生前に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。

**シャットダウン**  
KCL は、処理が終了した場合 (シャットダウンの理由は `TERMINATE`) またはワーカーが応答していない場合 (シャットダウンの理由は `ZOMBIE`)、`shutdown` メソッドを呼び出します。

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

KCL はまた、`IRecordProcessorCheckpointer` インターフェイスを `shutdown` に渡します。シャットダウンの理由が `TERMINATE` である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの `checkpoint` メソッドを呼び出します。

### 更新されたインターフェイス (バージョン 2)
<a name="kcl-java-interface-v2"></a>

更新された `IRecordProcessor` interface (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

コンテナオブジェクトのメソッドの呼び出しで、インターフェイスのオリジナルバージョンのすべての引数にアクセスできます。たとえば、`processRecords()` でレコードのリストを取得には、`processRecordsInput.getRecords()` が使用できます。

このインターフェイスのバージョン 2 (KCL 1.5.0 以降) では、オリジナルインターフェースで提供される入力に加えて次の新しい入力が使用できます。

シーケンス番号の開始  
`InitializationInput` オペレーションへ渡される `initialize()` オブジェクトでは、開始シーケンス番号はレコードプロセッサのインスタンスに配信されるレコードです。このシーケンス番号は、同じシャードで処理されたレコードプロセッサインスタンスの最後のチェックポイントです。これは、アプリケーションでこの情報が必要になる場合のために提供されます。

保留チェックポイントシーケンス番号  
`initialize()` オペレーションへ渡される `InitializationInput`オブジェクトの保留チェックポイントシーケンス番号 (ある場合) とは、前のレコードプロセッサインスタンスが停止する前にコミットできなかったものを示します。

## IRecordProcessor インターフェイスのクラスファクトリを実装する
<a name="kinesis-record-processor-implementation-factory-java"></a>

レコードプロセッサのメソッドを実装するクラスのファクトリも実装する必要があります。コンシューマーは、ワーカーをインスタンス化するときに、このファクトリへの参照を渡します。

サンプルでは、オリジナルのレコードプロセッサインターフェースを使用した、`AmazonKinesisApplicationSampleRecordProcessorFactory.java` ファイルのファクトリクラスを実装します。クラスファクトリでバージョン 2 レコードプロセッサを作成する場合には、`com.amazonaws.services.kinesis.clientlibrary.interfaces.v2` とい名のパッケージを使用してください。

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## ワーカーを作成する
<a name="kcl-java-worker"></a>

[IRecordProcessor メソッドを実装する](#kinesis-record-processor-implementation-interface-java)で説明しているように、KCL レコードプロセッサには選択できる 2 バージョンがあり、どちらを選ぶかでワーカーの作成方法に影響します。オリジナルレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

レコード プロセッサインターフェイスのバージョン 2 では、`Worker.Builder` を使用してワーカを作成でき、どのコンストラクタを使うかや引数の順序を考慮する必要はありません。更新されたレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## 設定プロパティを変更する
<a name="kinesis-record-processor-initialization-java"></a>

このサンプルでは、設定プロパティのデフォルト値を提供します。ワーカーのこの設定データは `KinesisClientLibConfiguration` オブジェクトにまとめられています。ワーカーをインスタンス化する呼び出しで、このオブジェクトと `IRecordProcessor` のクラスファクトリへの参照が渡されます。Java の properties ファイルを使用してこれらのプロパティを独自の値にオーバーライドできます (`AmazonKinesisApplicationSample.java` を参照してください)。

### アプリケーション名
<a name="configuration-property-application-name"></a>

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーション名が必要です。次のようにアプリケーション名の設定値を使用します。
+ このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
+ KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)を参照してください。

### 認証情報の設定
<a name="kinesis-record-processor-cred-java"></a>

デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーのいずれかが認証情報を利用できるようにする必要があります。例えば、EC2 インスタンスでコンシューマーを実行している場合は、IAM ロールでインスタンスを起動することをお勧めします。この IAM ロールに関連付けられた許可を反映する AWS 認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーの認証情報を管理するための最も安全な方法です。

サンプルアプリケーションは、最初にインスタンスメタデータから IAM 認証情報を取得しようとします。

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

サンプルアプリケーションは、インスタンスメタデータから認証情報を取得できない場合、properties ファイルから認証情報を取得しようとします。

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

インスタンスメタデータの詳細については、「Amazon EC2 ユーザーガイド」の「[インスタンスメタデータ](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html)」を参照してください。**

### 複数のインスタンスへのワーカー ID を使用する
<a name="kinesis-record-processor-workerid-java"></a>

サンプルの初期化コードは、次のコードスニペットに示すように、ローカルコンピュータ名にグローバル一意識別子を追加して、ワーカーの ID (`workerId`) を作成します。このアプローチによって、1 台のコンピュータでコンシューマーアプリケーションの複数のインスタンスを実行するシナリオに対応できます。

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## レコードプロセッサインターフェイスのバージョン 2 に移行する
<a name="kcl-java-v2-migration"></a>

オリジナルインターフェースで使われるコードを移行するためには、上記のステップに加えて、次の手順が必要となります。

1. レコードプロセッサのクラスを変更して、バージョン 2 レコードプロセッサインターフェイスにインポートします。

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. コンテナオブジェクトで `get` メソッドを使用するには、入力するリファレンスを変更します。たとえば、`shutdown()` オペレーションで、`checkpointer` を `shutdownInput.getCheckpointer()` に変更します。

1. レコードプロセッサのファクトリークラスを変更して、バージョン 2 レコードプロセッサファクトリーインターフェイスにインポートします。

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. ワーカーのコンストラクチャを変更して、`Worker.Builder` を使います。例えば、次のようになります。

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# ode.js で Kinesis Client Library コンシューマーを開発する
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Node.js について説明します。

KCL は Java ライブラリであり、Java 以外の言語のサポートは、*MultiLangDaemon* と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Node.js をインストールして、コンシューマーアプリケーションをすべて Node.js で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。GitHub の MultiLangDaemon の詳細については、[KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)のページを参照してください。

GitHub から Java KCL をダウンロードするには、[Kinesis Client Library (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs) にアクセスしてください。

**サンプルコードのダウンロード**

Node.js の KCL で使用可能な 2 つのサンプルコードがあります。
+ [基本サンプル](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Node.js で KCL コンシューマーアプリケーションを構築する方法の基本を説明する次のセクションで使用されます。
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   基本サンプルコードを理解したあとの、やや上級で実際のシナリオを使用したサンプル。このサンプルはここでは説明しませんが、詳細を説明した README ファイルがあります。

Node.js で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

**Topics**
+ [レコードプロセッサを実装する](#kinesis-record-processor-implementation-interface-nodejs)
+ [設定プロパティを変更する](#kinesis-record-processor-initialization-nodejs)

## レコードプロセッサを実装する
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

KCL for Node.js を使用した最もシンプルなコンシューマーは、`recordProcessor` 関数を実装する必要があります。この関数には、`initialize`、`processRecords`、および `shutdown` の各関数が含まれます。このサンプルでは、開始点として使用できる実装を提供しています (`sample_kcl_app.js` を参照してください)。

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**初期化**  
レコードプロセッサが起動すると、KCL は `initialize` 関数を呼び出します。このレコードプロセッサは `initializeInput.shardId` として渡されるシャード ID のみを処理し、通常、その逆も真です (このシャードはこのレコードプロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams は*少なくとも 1 回*のセマンティクスを使用しているからです。つまり、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、[シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。](kinesis-record-processor-scaling.md)を参照してください。

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL は、この関数を呼び出すために `initialize` 関数に指定したシャードのデータレコードのリストが含まれている入力を使用します。実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。

```
processRecords: function(processRecordsInput, completeCallback)
```

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれ、ワーカーはデータを処理するときに、これらを使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。`record` ディクショナリは、レコードのデータ、シーケンス番号、およびパーティションキーにアクセスする次のキーと値のペアを公開します。

```
record.data
record.sequenceNumber
record.partitionKey
```

データは Base64 でエンコードされていることに注意してください。

基本サンプルでは、関数 `processRecords` に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、`processRecordsInput.checkpointer` として渡した `checkpointer` オブジェクトを使用して、この追跡を処理します。レコードプロセッサは、`checkpointer.checkpoint` 関数を呼び出して、シャード内のレコードの処理の進行状況を KCL に知らせます。ワーカーでエラーが発生した場合、シャードの処理を再開するときに、処理されたことが分かっている最後のレコードから再開するように、KCL はこの情報を使用します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが `checkpoint` を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

`checkpoint` 関数にシーケンス番号を渡さないと、`checkpoint` への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合に**のみ**、`checkpoint` を呼び出す必要があります。レコードプロセッサは、`checkpoint` の各呼び出しで `processRecords` を呼び出す必要はありません。たとえば、プロセッサは `checkpoint` を 3 回の呼び出しごとに呼び出したり、レコードプロセッサの外部イベント (実装したカスタムの認証または検証サービスなど) で呼び出したりできます。

オプションでレコードの正確なシーケンス番号をパラメータとして `checkpoint` に指定できます。この場合、KCL は、そのレコードまでのすべてのレコードだけが処理されたと見なします。

基本サンプルアプリケーションでは、`checkpointer.checkpoint` 関数の最もシンプルな呼び出しを示します。関数のこの時点でコンシューマーに必要な他のチェックポイントロジックを追加できます。

**シャットダウン**  
KCL は、処理が終了した場合 (`shutdownInput.reason` は `TERMINATE`) またはワーカーが応答していない場合 (`shutdownInput.reason` は `ZOMBIE`)、`shutdown` 関数を呼び出します。

```
shutdown: function(shutdownInput, completeCallback)
```

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

また、KCL は、`shutdownInput.checkpointer` オブジェクトも `shutdown` に渡します。シャットダウンの理由が `TERMINATE` である場合、レコードプロセッサがすべてのデータレコードの処理を終了したことを確認し、このインターフェイスの `checkpoint` 関数を呼び出します。

## 設定プロパティを変更する
<a name="kinesis-record-processor-initialization-nodejs"></a>

このサンプルでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます (基本サンプルの `sample.properties` を参照してください)。

### アプリケーション名
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。
+ このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
+ KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)を参照してください。

### 認証情報の設定
<a name="kinesis-record-processor-credentials-nodejs"></a>

デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーのいずれかが認証情報を利用できるようにする必要があります。`AWSCredentialsProvider` プロパティを使用して認証情報プロバイダーを設定できます。`sample.properties` ファイルでは、[デフォルトの認証情報プロバイダーチェーン](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)のいずれかの認証情報プロバイダーに対して、ユーザーの認証情報を使用可能にする必要があります。Amazon EC2 インスタンスでコンシューマーを実行している場合は、この IAM ロールに関連付けられたアクセス許可を反映する IAM role. AWS credentials を使用してインスタンスを設定することをお勧めします。この IAM ロールは、インスタンスメタデータを介してインスタンス上のアプリケーションで使用できます。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。

次の例では、KCL を設定し、`sample_kcl_app.js` で指定されているレコードプロセッサを使用して`kclnodejssample`という Kinesis Data Streams を処理します。

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# .NET で Kinesis Client Library コンシューマーを開発する
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、.NET について説明します。

KCL は Java ライブラリであり、Java 以外の言語のサポートは、*MultiLangDaemon* と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for .NET をインストールして、コンシューマーアプリケーションをすべて .NET で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。GitHub の MultiLangDaemon の詳細については、[KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)のページを参照してください。

GitHub から .NET KCL をダウンロードするには、[Kinesis Client Library (.NET)](https://github.com/awslabs/amazon-kinesis-client-net) にアクセスしてください。.NET KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub で[KCL for .NET sample consumer project](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) のページにアクセスしてください。

.NET で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

**Topics**
+ [IRecordProcessor クラスのメソッドを実装する](#kinesis-record-processor-implementation-interface-dotnet)
+ [設定プロパティを変更する](#kinesis-record-processor-initialization-dotnet)

## IRecordProcessor クラスのメソッドを実装する
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

コンシューマーでは、`IRecordProcessor` の次のメソッドを実装する必要があります。出発点として使用できる実装がサンプルコンシューマーに提供されています (`SampleRecordProcessor` の `SampleConsumer/AmazonKinesisSampleConsumer.cs` クラスを参照してください)。

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Initialize**  
KCL は、レコードプロセッサがインスタンス化されると、このメソッドを呼び出して `input` パラメータの特定のシャード ID (`input.ShardId`) を渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams は*少なくとも 1 回*のセマンティクスを使用しているからです。つまり、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、[シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。](kinesis-record-processor-scaling.md)を参照してください。

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
KCL は、このメソッドを呼び出し、`Initialize` メソッドで指定されたシャードの `input` パラメータ (`input.Records`) にあるデータレコードのリストを渡します。実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。

```
public void ProcessRecords(ProcessRecordsInput input)
```

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。`Record` クラスは以下を公開し、レコードのデータ、シーケンス番号、およびパーティションキーのアクセスを可能にします。

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

サンプルでは、メソッド `ProcessRecordsWithRetries` に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、`Checkpointer` オブジェクトを `ProcessRecords` に渡すことで、この追跡をユーザーに代わって処理します (`input.Checkpointer`)。レコードプロセッサは、`Checkpointer.Checkpoint` メソッドを呼び出して、シャード内のレコード処理の進行状況を KCL に知らせます。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが `Checkpointer.Checkpoint` を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さないと、`Checkpointer.Checkpoint` への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、`Checkpointer.Checkpoint` を呼び出す必要があります。レコードプロセッサは、`Checkpointer.Checkpoint` の各呼び出しで `ProcessRecords` を呼び出す必要はありません。たとえば、プロセッサは、3 回または 4 回呼び出すたびに、`Checkpointer.Checkpoint` を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして `Checkpointer.Checkpoint` に指定できます。この場合、KCL は、レコード処理がそのレコードまで完了したと見なします。

サンプルでは、プライベートメソッド `Checkpoint(Checkpointer checkpointer)` で、適切な例外処理と再試行のロジックを使用する `Checkpointer.Checkpoint` メソッドを呼び出す方法を示しています。

KCL for .NET では、例外を処理する方法が他の KCL 言語ライブラリとは異なり、データレコードの処理から発生した例外を扱いません。ユーザーコードからの例外がキャッチされないと、プログラムがクラッシュします。

**シャットダウン**  
KCL は、処理が終了した場合 (シャットダウンの理由は `TERMINATE`) またはワーカーが応答していない場合 (シャットダウンの `input.Reason` の値は `ZOMBIE`)、`Shutdown` メソッドを呼び出します。

```
public void Shutdown(ShutdownInput input)
```

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

また、KCL は、`Checkpointer` オブジェクトも `shutdown` に渡します。シャットダウンの理由が `TERMINATE` である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの `checkpoint` メソッドを呼び出します。

## 設定プロパティを変更する
<a name="kinesis-record-processor-initialization-dotnet"></a>

このサンプルコンシューマーでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます (`SampleConsumer/kcl.properties` を参照してください)。

### アプリケーション名
<a name="modify-kinesis-record-processor-application-name"></a>

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。
+ このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
+ KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)を参照してください。

### 認証情報の設定
<a name="kinesis-record-processor-creds-dotnet"></a>

デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーのいずれかが認証情報を利用できるようにする必要があります。`AWSCredentialsProvider` プロパティを使用して認証情報プロバイダーを設定できます。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) では、[デフォルトの認証情報プロバイダーチェーン](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)のいずれかの認証情報プロバイダーに対して、ユーザーの認証情報を使用可能にする必要があります。EC2 インスタンスでコンシューマーアプリケーションを実行している場合は、IAM ロールでインスタンスを設定することをお勧めします。この IAM ロールに関連付けられた許可を反映する AWS 認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーの認証情報を管理するための最も安全な方法です。

サンプルのプロパティファイルでは、 で指定されているレコードプロセッサを使用してwordsという Kinesis data stream を処理するように KCL を設定します。

# Python で Kinesis クライアントライブラリコンシューマーを開発する
<a name="kinesis-record-processor-implementation-app-py"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Python について説明します。

KCL は Java ライブラリであり、Java 以外の言語のサポートは、*MultiLangDaemon* と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。GitHub の MultiLangDaemon の詳細については、[KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)のページを参照してください。

GitHub から Python KCL をダウンロードするには、[Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python) にアクセスしてください。Python KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub で[KCL for Python sample project](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)ページにアクセスしてください。

Python で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

**Topics**
+ [RecordProcessor クラスのメソッドを実装する](#kinesis-record-processor-implementation-interface-py)
+ [設定プロパティを変更する](#kinesis-record-processor-initialization-py)

## RecordProcessor クラスのメソッドを実装する
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` クラスでは、`RecordProcessorBase` を拡張して次のメソッドを実装する必要があります。このサンプルでは、開始点として使用できる実装を提供しています (`sample_kclpy_app.py` を参照してください)。

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**初期化**  
KCL は、レコードプロセッサがインスタンス化されると、`initialize` メソッドを呼び出し、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams は*少なくとも 1 回*のセマンティクスを使用しているからです。つまり、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、[シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。](kinesis-record-processor-scaling.md)を参照してください。

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL は、このメソッドを呼び出し、`initialize` メソッドで指定されたシャードのデータレコードのリストを渡します。実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。

```
def process_records(self, records, checkpointer) 
```

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。`record` ディクショナリは、レコードのデータ、シーケンス番号、およびパーティションキーにアクセスする次のキーと値のペアを公開します。

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

データは Base64 でエンコードされていることに注意してください。

サンプルでは、メソッド `process_records` に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、`Checkpointer` オブジェクトを `process_records` に渡すことで、この追跡をユーザーに代わって処理します。レコードプロセッサは、このオブジェクトの `checkpoint` メソッドを呼び出して、シャード内のレコードの処理の進行状況を KCL に通知します。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが `checkpoint` を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さないと、`checkpoint` への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、`checkpoint` を呼び出す必要があります。レコードプロセッサは、`checkpoint` の各呼び出しで `process_records` を呼び出す必要はありません。たとえば、プロセッサは、3 回呼び出すたびに、`checkpoint` を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして `checkpoint` に指定できます。この場合、KCL は、そのレコードまでのすべてのレコードだけが処理されたと見なします。

サンプルでは、プライベートメソッド `checkpoint` で、適切な例外処理と再試行のロジックを使用する `Checkpointer.checkpoint` メソッドを呼び出す方法を示しています。

KCL は、`process_records` を使用して、データレコードの処理から発生するすべての例外を処理します。例外が `process_records` からスローされた場合、KCL は、例外発生前に `process_records` に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。

**シャットダウン**  
 KCL は、処理が終了した場合 (シャットダウンの理由は `TERMINATE`) またはワーカーが応答していない場合 (シャットダウンの `reason` は `ZOMBIE`)、`shutdown` メソッドを呼び出します。

```
def shutdown(self, checkpointer, reason)
```

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

 また、KCL は、`Checkpointer` オブジェクトも `shutdown` に渡します。シャットダウンの `reason` が `TERMINATE` である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの `checkpoint` メソッドを呼び出します。

## 設定プロパティを変更する
<a name="kinesis-record-processor-initialization-py"></a>

このサンプルでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます (`sample.properties` を参照してください)。

### アプリケーション名
<a name="kinesis-record-processor-application-name-py"></a>

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。
+ このアプリケーション名と関連付けられたワーカーはすべて、同じストリーム上で連携して処理しているとみなされます。これらのワーカーは複数のインスタンスに分散している場合があります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
+ KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)を参照してください。

### 認証情報の設定
<a name="kinesis-record-processor-creds-py"></a>

デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーのいずれかが認証情報を利用できるようにする必要があります。`AWSCredentialsProvider` プロパティを使用して認証情報プロバイダーを設定できます。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) では、[デフォルトの認証情報プロバイダーチェーン](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)のいずれかの認証情報プロバイダーに対して、ユーザーの認証情報を使用可能にする必要があります。Amazon EC2 インスタンスでコンシューマーアプリケーションを実行している場合は、IAM ロールでインスタンスを設定することをお勧めします。この IAM ロールに関連付けられた許可を反映する AWS 認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。

サンプルのプロパティファイルでは、 で指定されているレコードプロセッサを使用してwordsという Kinesis data stream を処理するように KCL を設定します。

# Ruby で Kinesis Client Library コンシューマーを開発する
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Ruby について説明します。

KCL は Java ライブラリであり、Java 以外の言語のサポートは、*MultiLangDaemon* と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for .Ruby をインストールして、コンシューマーアプリケーションをすべて Ruby で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。GitHub の MultiLangDaemon の詳細については、[KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)のページを参照してください。

GitHub から Ruby KCL をダウンロードするには、[Kinesis Client Library (Ruby)](https://github.com/awslabs/amazon-kinesis-client-ruby) にアクセスしてください。Ruby KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub で[KCL for Ruby sample project](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples)ページにアクセスしてください。

KCL Ruby サポートライブラリの詳細については、[KCL Ruby Gems ドキュメント](http://www.rubydoc.info/gems/aws-kclrb)を参照してください。

# KCL 2.x コンシューマーを開発する
<a name="developing-consumers-with-kcl-v2"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

このトピックでは、バージョン 2.0 の Kinesis Client Library (KCL) を使用する方法について説明します。

KCL の詳細については、[Kinesis Client Library 1.x を使用したコンシューマーの開発](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html)に示されている概要を参照してください。

使用するオプションに応じて、次のトピックから選択します。

**Topics**
+ [Java での Kinesis クライアントライブラリコンシューマーを開発する](kcl2-standard-consumer-java-example.md)
+ [Python で Kinesis クライアントライブラリコンシューマーを開発する](kcl2-standard-consumer-python-example.md)
+ [KCL 2.x を使用して拡張ファンアウトコンシューマーを開発する](building-enhanced-consumers-kcl-retired.md)

# Java での Kinesis クライアントライブラリコンシューマーを開発する
<a name="kcl2-standard-consumer-java-example"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

次のコードは、`ProcessorFactory` および `RecordProcessor` の Java のサンプル実装を示しています。拡張ファンアウト機能を活用する方法については、[拡張ファンアウトでコンシューマーを使用する](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html)を参照してください。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Python で Kinesis クライアントライブラリコンシューマーを開発する
<a name="kcl2-standard-consumer-python-example"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Python について説明します。

KCL は Java ライブラリであり、Java 以外の言語のサポートは、*MultiLangDaemon* と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。GitHub の MultiLangDaemon の詳細については、[KCL MultiLangDaemon project](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)のページを参照してください。

GitHub から Python KCL をダウンロードするには、[Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python) にアクセスしてください。Python KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub で[KCL for Python sample project](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)ページにアクセスしてください。

Python で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

**Topics**
+ [RecordProcessor クラスのメソッドを実装する](#kinesis-record-processor-implementation-interface-py)
+ [設定プロパティを変更する](#kinesis-record-processor-initialization-py)

## RecordProcessor クラスのメソッドを実装する
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` クラスでは、`RecordProcessorBase` クラスを拡張して次のメソッドを実装する必要があります。

```
initialize
process_records
shutdown_requested
```

このサンプルでは、開始点として使用できる実装を提供しています。

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## 設定プロパティを変更する
<a name="kinesis-record-processor-initialization-py"></a>

このサンプルでは、次のスクリプトに示すように、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます。

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### アプリケーション名
<a name="kinesis-record-processor-application-name-py"></a>

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。
+ このアプリケーション名と関連付けられたワーカーはすべて、同じストリーム上で連携して処理しているとみなされます。これらのワーカーは複数のインスタンス間に分散している場合があります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
+ KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、「[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)」を参照してください。

### 認証情報
<a name="kinesis-record-processor-creds-py"></a>

デフォルトの AWS 認証情報プロバイダー[チェーンの認証情報プロバイダー](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)の 1 つが認証情報を利用できるようにする必要があります。`AWSCredentialsProvider` プロパティを使用して認証情報プロバイダーを設定できます。Amazon EC2 インスタンスでコンシューマーアプリケーションを実行する場合は、この IAM ロールに関連付けられたアクセス許可を反映する IAM role. AWS credentials を使用してインスタンスを設定することをお勧めします。この IAM ロールは、インスタンスメタデータを介してインスタンス上のアプリケーションで使用できます。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。

# KCL 2.x を使用して拡張ファンアウトコンシューマーを開発する
<a name="building-enhanced-consumers-kcl-retired"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

Amazon Kinesis Data Streams で*拡張ファンアウト*を使用するコンシューマーは、シャードあたり 1 秒間に最大 2 MB のデータの専用スループットで、データストリームからレコードを受け取ることができます。このタイプのコンシューマーは、ストリームからデータを受け取っている他のコンシューマーと競合する必要はありません。詳細については、[専用スループットを備えた拡張ファンアウトを開発する](enhanced-consumers.md)を参照してください。

拡張ファンアウトを使用してストリームからデータを受け取るアプリケーションを開発するには、バージョン 2.0 以降の Kinesis Client Library (KCL) を使用できます。KCL は、アプリケーションをストリームのすべてのシャードに自動的にサブスクライブし、コンシューマーアプリケーションがシャードあたり 2 MB/秒のスループット値で読み取ることができるようにします。拡張ファンアウトをオンにせずに KCL を使用する場合は、[Kinesis Client Library 2.0 を使用したコンシューマーの開発](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html)を参照してください。

**Topics**
+ [Java で KCL 2.x を使用して拡張ファンアウトコンシューマーを開発する](building-enhanced-consumers-kcl-java.md)

# Java で KCL 2.x を使用して拡張ファンアウトコンシューマーを開発する
<a name="building-enhanced-consumers-kcl-java"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

拡張ファンアウトを使用してストリームからデータを受け取るアプリケーションを Amazon Kinesis Data Streams で開発するには、バージョン 2.0 以降の Kinesis Client Library (KCL) を使用できます。次のコードは、`ProcessorFactory` および `RecordProcessor` の Java のサンプル実装を示しています。

`KinesisClientUtil` を使用して `KinesisAsyncClient` を作成し、`KinesisAsyncClient` で `maxConcurrency` を設定することをお勧めします。

**重要**  
すべてのリースと `KinesisAsyncClient` の追加使用のための十分な高い `maxConcurrency` を持つよう `KinesisAsyncClient` を設定しないと、Amazon Kinesis Client で非常に大きなレイテンシーが発生する可能性があります。

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# コンシューマーを KCL 1.x から KCL 2.x に移行する
<a name="kcl-migration"></a>

**重要**  
Amazon Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL 1.x は 2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 1.x を使用して KCL アプリケーションを最新の KCL バージョンに移行することを**強くお勧めします**。最新の KCL バージョンを確認するには、[GitHub のAmazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client)ページを参照してください。最新の KCL バージョンの詳細については、[Kinesis Client Library を使用する](kcl.md) を参照してください。KCL 1.x から KCL 3.x への移行については、「[KCL 1.x から KCL 3.x への移行](kcl-migration-1-3.md)」を参照してください。

このトピックでは、Kinesis Client Library (KCL) のバージョン 1.x と 2.x の違いについて説明します。また、コンシューマーを KCL のバージョン 1.x からバージョン 2.x に移行する方法も示します。クライアントを移行すると、最後にチェックポイントが作成された場所からレコードの処理が開始されます。

KCL のバージョン 2.0 では、以下のインターフェイスの変更が導入されています。


**KCL インターフェイスの変更**  

| KCL 1.x インターフェイス | KCL 2.0 インターフェイス | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | software.amazon.kinesis.processor.ShardRecordProcessor 内に折りたたみ | 

**Topics**
+ [レコードプロセッサを移行する](#recrod-processor-migration)
+ [レコードプロセッサファクトリーを移行する](#recrod-processor-factory-migration)
+ [ワーカーを移行する](#worker-migration)
+ [Amazon Kinesis Client を設定する](#client-configuration)
+ [アイドル時間の削除](#idle-time-removal)
+ [クライアント設定の削除](#client-configuration-removals)

## レコードプロセッサを移行する
<a name="recrod-processor-migration"></a>

以下の例は、KCL1.x に実装されたレコードプロセッサを示しています。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**レコードプロセッサのクラスを移行するには**

1. インターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` および `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` から `software.amazon.kinesis.processor.ShardRecordProcessor` に変更します。以下に例を示します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. `import` メソッド `initialize` とメソッドの `processRecords` ステートメントを更新します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. `shutdown` メソッドを以下の新しいメソッドに置き換えます。`leaseLost`、`shardEnded`、および `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## レコードプロセッサファクトリーを移行する
<a name="recrod-processor-factory-migration"></a>

レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。以下に示しているのは、KCL 1.x ファクトリーの例です。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**レコードプロセッサファクトリーを移行するには**

1. 実装されているインターフェイスを `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` から `software.amazon.kinesis.processor.ShardRecordProcessorFactory` に変更します。以下に例を示します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. `createProcessor` の戻り署名を変更します。

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

以下は、2.0 のレコードプロセッサファクトリーの例です。

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## ワーカーを移行する
<a name="worker-migration"></a>

バージョン 2.0 の KCL では、新しいクラス `Scheduler` によって `Worker` クラスが置き換えられます。KCL 1.x のワーカーの例を次に示します。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**ワーカーを移行するには**

1. `Worker` クラスの `import` ステートメントを `Scheduler` クラスと `ConfigsBuilder` クラスのインポートステートメントに変更します。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 次の例に示すように、`ConfigsBuilder` と `Scheduler` を作成します。

   `KinesisClientUtil` を使用して `KinesisAsyncClient` を作成し、`KinesisAsyncClient` で `maxConcurrency` を設定することをお勧めします。
**重要**  
すべてのリースと `KinesisAsyncClient` の追加使用のための十分な高い `maxConcurrency` を持つよう `KinesisAsyncClient` を設定しないと、Amazon Kinesis Client で非常に大きなレイテンシーが発生する可能性があります。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Amazon Kinesis Client を設定する
<a name="client-configuration"></a>

Kinesis Client Library のリリース 2.0 では、クライアントの設定が単一の設定クラス (`KinesisClientLibConfiguration`) から 6 つの設定クラスに移行されました。次の表で移行を説明します。


**設定フィールドとその新しいクラス**  

| 元のフィールド | 新しい設定クラス | 説明 | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | この KCL アプリケーションの名前。tableName および consumerName のデフォルトとして使用されます。 | 
| tableName | ConfigsBuilder | Amazon DynamoDB リーステーブルで使用されるテーブル名の上書きを許可します。 | 
| streamName | ConfigsBuilder | このアプリケーションがレコードを処理するストリームの名前。 | 
| kinesisEndpoint | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| dynamoDBEndpoint | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| initialPositionInStreamExtended | RetrievalConfig | アプリケーションの初期実行から開始し、KCL がレコードの取得を開始するシャード内の場所。 | 
| kinesisCredentialsProvider | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| dynamoDBCredentialsProvider | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| cloudWatchCredentialsProvider | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| failoverTimeMillis | LeaseManagementConfig | リース所有者が失敗したとみなすまでの経過時間 (ミリ秒)。 | 
| workerIdentifier | ConfigsBuilder | このアプリケーションプロセッサのインスタンス化を表す一意の識別子。一意である必要があります。 | 
| shardSyncIntervalMillis | LeaseManagementConfig | シャード同期コールの間隔。 | 
| maxRecords | PollingConfig | Kinesis が返すレコードの最大数の設定を許可します。 | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | このオプションは削除されました。アイドル時間の削除を参照してください。 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | 設定すると、Kinesis から提供されたレコードがない場合でもレコードプロセッサが呼び出されます。 | 
| parentShardPollIntervalMillis | CoordinatorConfig | 親シャードが完了したかどうかを確認するためにレコードプロセッサがポーリングを行う頻度。 | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | 設定すると、子リースの処理が開始されると即時にリースが削除されます。 | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | 設定すると、開いているシャードがある子シャードは無視されます。これは、主に DynamoDB Streams 用です。 | 
| kinesisClientConfig | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| dynamoDBClientConfig | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| cloudWatchClientConfig | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| taskBackoffTimeMillis | LifecycleConfig | 失敗したタスクを再試行するまでの待機時間。 | 
| metricsBufferTimeMillis | MetricsConfig | CloudWatch メトリックスの発行を制御します。 | 
| metricsMaxQueueSize | MetricsConfig | CloudWatch メトリックスの発行を制御します。 | 
| metricsLevel | MetricsConfig | CloudWatch メトリックスの発行を制御します。 | 
| metricsEnabledDimensions | MetricsConfig | CloudWatch メトリックスの発行を制御します。 | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | このオプションは削除されました。チェックポイントシーケンス番号の検証を参照してください。 | 
| regionName | ConfigsBuilder | このオプションは削除されました。クライアント設定の削除を参照してください。 | 
| maxLeasesForWorker | LeaseManagementConfig | アプリケーションの単一のインスタンスが受け入れるリースの最大数。 | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | アプリケーションが同時にスティールを試みるリースの最大数。 | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | Kinesis Client Library で新しい DynamoDB リーステーブルを作成する場合に使用する DynamoDB 読み取り IOPS。 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | Kinesis Client Library で新しい DynamoDB リーステーブルを作成する場合に使用する DynamoDB 読み取り IOPS。 | 
| initialPositionInStreamExtended | LeaseManagementConfig | アプリケーションが読み取りを開始するストリーム内の初期位置。これは最初のリースの作成時にのみ使用されます。 | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | リーステーブルに既存のリースがある場合、シャードデータの同期を無効にします。TODO: KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | どのシャードの優先順位付けを使用するか。 | 
| shutdownGraceMillis | 該当なし | このオプションは削除されました。MultiLang の削除を参照してください。 | 
| timeoutInSeconds | 該当なし | このオプションは削除されました。MultiLang の削除を参照してください。 | 
| retryGetRecordsInSeconds | PollingConfig | GetRecords が失敗した場合の試行間隔の遅延時間を設定します。 | 
| maxGetRecordsThreadPool | PollingConfig | GetRecords に使用されるスレッドプールのサイズ。 | 
| maxLeaseRenewalThreads | LeaseManagementConfig | リース更新スレッドプールのサイズを制御します。アプリケーションが処理するリースの数が多いほど、このプールも大きくする必要があります。 | 
| recordsFetcherFactory | PollingConfig | ストリームから取得するフェッチャーを作成するために使用されるファクトリーの置換を許可します。 | 
| logWarningForTaskAfterMillis | LifecycleConfig | タスクが完了していない場合に警告がログに記録されるまでの待機期間。 | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 障害が発生した場合に ListShards を呼び出す間隔 (ミリ秒)。 | 
| maxListShardsRetryAttempts | RetrievalConfig | 失敗とみなすまでの ListShards の再試行の最大回数。 | 

## アイドル時間の削除
<a name="idle-time-removal"></a>

KCL の 1.x バージョンでは、`idleTimeBetweenReadsInMillis` は 2 つの数量に相当します。
+ タスクの送信チェックの間隔。`CoordinatorConfig#shardConsumerDispatchPollIntervalMillis` を設定することで、タスク間の間隔を設定できるようになりました。
+ Kinesis Data Streams から返されるレコードがない場合に休止状態になるまでの時間。バージョン 2.0 では、拡張ファンアウトのレコードはそれぞれのレトリバーからプッシュされます。シャードコンシューマーのアクティビティは、プッシュされたリクエストが到着した場合にのみ発生します。

## クライアント設定の削除
<a name="client-configuration-removals"></a>

バージョン 2.0 では、KCL はクライアントを作成しなくなりました。有効なクライアントの提供はユーザーに任されます。この変更により、クライアントの作成を制御するすべての設定パラメータが削除されました。これらのパラメータが必要な場合は、クライアントを `ConfigsBuilder` に提供する前にクライアントで設定できます。


****  

| 削除されたフィールド | 同等の設定 | 
| --- | --- | 
| kinesisEndpoint | 優先エンドポイントを指定した SDK KinesisAsyncClient の設定: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()。 | 
| dynamoDBEndpoint | 優先エンドポイントを指定した SDK DynamoDbAsyncClient の設定: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()。 | 
| kinesisClientConfig | 必要な設定を指定した SDK KinesisAsyncClient の設定: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build() | 
| dynamoDBClientConfig | 必要な設定を指定した SDK DynamoDbAsyncClient の設定: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build() | 
| cloudWatchClientConfig | 必要な設定を指定した SDK CloudWatchAsyncClient の設定: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build() | 
| regionName | 優先リージョンを指定して SDK を設定します。これは、すべての SDK クライアントで同じです。例えば、KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build()。 | 