翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
注記
Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL バージョン 3.x に移行することをお勧めします。これにより、パフォーマンスと新機能が向上します。最新の KCL ドキュメントと移行ガイドについては、「」を参照してくださいKinesis Client Library を使用する。
KDS データストリームからデータを処理できるカスタムコンシューマーアプリケーションを開発する方法の 1 つは、Kinesis Client Library (KCL) を使用することです。
トピック
注記
KCL 1.x と KCL 2.x については、どちらも使用シナリオに応じて最新の KCL 1.x バージョンまたは KCL 2.x バージョンにアップグレードすることをお勧めします。KCL 1.x と KCL 2.x は、どちらも新しいリリースに伴って定期的に更新されています。これには、最新の依存関係パッチ、セキュリティパッチ、バグ修正、および下位互換性のある新機能が含まれます。詳細については、https://github.com/awslabs/amazon-kinesis-client/releases
KCL について (以前のバージョン)
KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis Data Streams からデータを消費および処理するのに役立ちます。これには、複数のコンシューマーアプリケーションインスタンス間での負荷分散、コンシューマーアプリケーションインスタンスの障害に対する応答、処理済みのレコードのチェックポイント作成、リシャーディングへの対応が挙げられます。KCL はこれらのサブタスクをすべて処理するため、カスタムレコード処理ロジックの記述に集中できます。
KCL は AWS SDK で使用できる Kinesis Data Streams API とは異なることに注意してください。Kinesis Data Streams API では、Kinesis Data Streams の多くの機能 (ストリームの作成、リシャーディング、レコードの入力と取得など) を管理できます。KCL は、これらすべてのサブタスクの抽象化レイヤーを提供します。具体的には、コンシューマーアプリケーションのカスタムデータ処理ロジックに集中できます。Kinesis Data Streams API の詳細については、Amazon Kinesis API リファレンスを参照してください。
重要
KCL は Java ライブラリです。Java 以外の言語のサポートは、MultiLangDaemon と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。例えば、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定がいくつかあります。GitHub の MultiLangDaemon の詳細については、KCL MultiLangDaemon project
KCL は、レコード処理ロジックと Kinesis Data Streams の仲介として機能します。
KCL の以前のバージョン
現在、次のいずれかのサポートされているバージョンの KCL を使用して、カスタムコンシューマーアプリケーションを構築できます。
-
KCL 1.x
詳細については、KCL 1.x コンシューマーを開発するを参照してください。
-
KCL 2.x
詳細については、KCL 2.x コンシューマーを開発するを参照してください。
KCL 1.x または KCL 2.x のいずれかを使用して、共有スループットを使用するコンシューマーアプリケーションを構築できます。詳細については、KCL を使用した共有スループットでカスタムコンシューマーを開発するを参照してください。
専用スループット (拡張ファンアウトコンシューマー) を使用するコンシューマーアプリケーションを構築するには、KCL 2.x のみを使用できます。詳細については、専用スループットで拡張ファンアウトコンシューマーを開発するを参照してください。
KCL 1.x と KCL 2.x の違い、および KCL 1.x から KCL 2.x に移行する方法については、コンシューマーを KCL 1.x から KCL 2.x に移行するを参照してください。
KCL の概念 (以前のバージョン)
-
KCL コンシューマーアプリケーション - KCL を使用してカスタムビルドされ、データストリームからレコードを取得して処理するように設計されたアプリケーション。
-
コンシューマーアプリケーションインスタンス - KCL コンシューマーアプリケーションは、通常、障害時の調整とデータレコード処理の動的な負荷分散のために、1 つ以上のアプリケーションインスタンスが同時に実行され、分散されます。
-
ワーカー - KCL コンシューマーアプリケーションインスタンスがデータの処理を開始するために使用する高レベルクラス。
重要
各 KCL コンシューマーアプリケーションインスタンスには 1 つのワーカーがあります。
ワーカーは、シャードとリース情報の同期、シャード割り当ての追跡、シャードからのデータの処理など、さまざまなタスクを初期化し、監督します。ワーカーは、この KCL コンシューマーアプリケーションが処理するデータレコードのデータストリームの名前や、このデータストリームへのアクセスに必要な AWS 認証情報など、コンシューマーアプリケーションの設定情報を KCL に提供します。ワーカーは、その特定の KCL コンシューマーアプリケーションインスタンスを開始して、データストリームからレコードプロセッサにデータレコードを配信します。
重要
KCL 1.x では、このクラスはワーカーと呼ばれます。詳細については、(Java KCL リポジトリです)、https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
を参照してください。KCL 2.x では、このクラスはスケジューラと呼ばれます。KCL 2.x のスケジューラの目的は、KCL 1.x のワーカーの目的と同じです。KCL 2.x のスケジューラクラスの詳細については、https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java を参照してください。 -
リース - ワーカーとシャード間のバインディングを定義するデータ。分散型 KCL コンシューマーアプリケーションは、リースを使用して、複数のワーカー間でデータレコード処理を分割します。いつでも、データレコードの各シャードは、leaseKey によって識別されるリースによって特定のワーカーにバインドされます。
デフォルトでは、ワーカーは 1 つ以上のリースを同時に保持できます (maxLeasesForWorker 変数)。
重要
すべてのワーカーは、データストリーム内の利用可能なすべてのシャードについて、利用可能なすべてのリースを保持すると競合します。しかし、一度に各リースを正常に保持できるのは1人のワーカーだけです。
例えば、4 つのシャードを持つデータストリームを処理しているワーカー A を持つコンシューマーアプリケーションインスタンス A がある場合、ワーカー A はシャード 1、2、3、および 4 へのリースを同時に保持できます。ただし、2 つのコンシューマーアプリケーションインスタンス (ワーカー A とワーカー B を含む A と B) があり、これらのインスタンスが 4 つのシャードを持つデータストリームを処理している場合、ワーカー A とワーカー B はシャード 1 へのリースを同時に保持できません。あるワーカーは、このシャードのデータレコードの処理を停止する準備ができるまで、または失敗するまで、特定のシャードへのリースを保持します。あるワーカーがリースの保留を停止すると、別のワーカーがリースを引き取り、保留します。
詳細については、(Java KCL リポジトリにあります)、を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java
の場合 KCL 1.x およびhttps://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java KCL 2.x -
リーステーブル - KCL コンシューマーアプリケーションのワーカーによってリースおよび処理されている KDS データストリーム内のシャードを追跡するために使用される一意の Amazon DynamoDB テーブル。リーステーブルは、KCL コンシューマーアプリケーションの実行中に、データストリームからの最新のシャード情報と (ワーカー内およびすべてのワーカー間で) 同期を維持する必要があります。詳細については、リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡するを参照してください。
-
レコードプロセッサ - KCL コンシューマーアプリケーションがデータストリームから取得したデータを処理する方法を定義するロジック。実行時に、KCL コンシューマーアプリケーションインスタンスがワーカーをインスタンス化し、このワーカーは、リースを保持するシャードごとに 1 つのレコードプロセッサをインスタンス化します。
リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する
リーステーブルとは何ですか?
それぞれの Amazon Kinesis Data Streams アプリケーションに、KCLは、一意のリーステーブル (Amazon DynamoDB テーブルに保存されている) を使用して、KCL コンシューマーアプリケーションのワーカーによってリースおよび処理されている KDS データストリーム内のシャードを追跡します。
重要
KCL は、コンシューマーアプリケーションの名前を使用して、このコンシューマーアプリケーションが使用するリーステーブル名を作成します。したがって、各コンシューマーアプリケーション名は一意である必要があります。
コンシューマーアプリケーションの実行中に、Amazon DynamoDB コンソールを使用してリーステーブルを表示できます。
アプリケーションの起動時にKCL コンシューマーアプリケーションのリーステーブルが存在しない場合は、いずれかのワーカーがこのアプリケーションのリーステーブルを作成します。
重要
アカウントには、Kinesis Data Streams 自体に関連するコストに加えて、DynamoDB テーブルに関連するコストが発生します。
テーブルの各行は、コンシューマーアプリケーションのワーカーによって処理中のシャードを表します。KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理する場合、リーステーブルのハッシュキー leaseKey
はシャード ID です。Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する であれば、leaseKey の構造は次のようになります: account-id:StreamName:streamCreationTimestamp:ShardId
。例えば、111111111:multiStreamTest-1:12345:shardId-000000000336
と指定します。
シャード ID に加えて、各行には次のデータが含まれます。
-
checkpoint: シャードの最新チェックポイントのシーケンス番号。この値はストリームのすべてのシャードで一意です。
-
checkpointSubSequenceNumber: Kinesis Producer Library の集約機能を使用する場合、これは Kinesis レコード内の個々のユーザレコードを追跡するチェックポイントの拡張です。
-
leaseCounter: ワーカーのリースが他のワーカーに保持されていることをワーカーが検出できるように、リースのバージョニングに使用されます。
-
leaseKey: リースの固有識別子。各リースはデータストリームのシャードに固有であり、一度に 1 つのワーカーで保持されます。
-
leaseOwner: このリースを保持しているワーカー。
-
ownerSwitchesSinceCheckpoint: 最後にチェックポイントが書き込まれてから、このリースのワーカーが何回変更されたかを示します。
-
parentShardId: 子シャードの処理を開始する前に、親シャードが完全に処理済みであることを確認するために使用します。これにより、レコードがストリームに入力されたのと同じ順序で処理されるようになります。
-
hashrange:
PeriodicShardSyncManager
で使われて、定期的な同期を実行し、リーステーブルで欠落しているシャードを見つけ、必要に応じてリースを作成します。注記
このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。
PeriodicShardSyncManager
の詳細およびリースとシャード間の定期的な同期については、Kinesis データストリームのシャードとリーステーブルの同期方法 を参照してください。 -
childshards:
LeaseCleanupManager
で使われて、子シャードの処理ステータスを確認し、親シャードをリーステーブルから削除できるかどうかを決定します。注記
このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。
-
shardID: シャードの ID。
注記
このデータは、Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する である場合にのみリーステーブルに存在します。これは Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。
-
stream name 以下の形式のデータストリームの識別子:
account-id:StreamName:streamCreationTimestamp
。注記
このデータは、Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する である場合にのみリーステーブルに存在します。これは Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。
スループット
Amazon Kinesis Data Streams アプリケーションでプロビジョニングされたスループットの例外が発生した場合は、DynamoDB テーブルのプロビジョニングされたスループットを増やす必要があります。KCL がテーブルを作成するときにプロビジョニングされるスループットは、1 秒あたりの読み込み 10 回、1 秒あたりの書き込み 10 回ですが、これがユーザーのアプリケーションで十分でない場合があります。例えば、Amazon Kinesis Data Streams アプリケーションが頻繁にチェックポイントを作成する場合や、多くのシャードで構成されるストリームを処理する場合は、より多くのスループットが必要になる可能性があります。
DynamoDB でプロビジョニングされたスループットについては、Amazon DynamoDB デベロッパーガイドの読み取り/書き込み容量モードおよびテーブルとデータの操作を参照してください。
Kinesis データストリームのシャードとリーステーブルの同期方法
KCL コンシューマーアプリケーションのワーカーは、リースを使用して特定のデータストリームからシャードを処理します。特定の時点でどのワーカーがどのシャードをリースしているかに関する情報は、リーステーブルに保存されます。リーステーブルは、KCL コンシューマーアプリケーションの実行中に、データストリームからの最新のシャード情報と同期を維持する必要があります。KCL は、コンシューマーアプリケーションのブートストラップ (コンシューマーアプリケーションの初期化時または再起動時)、および処理中のシャードが終了 (リシャーディング) に達するたびに、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。つまり、ワーカーまたは KCL コンシューマーアプリケーションは、最初のコンシューマーアプリケーションのブートストラップ中、およびコンシューマーアプリケーションでデータストリームリシャードイベントが発生するたびに、処理中のデータストリームと同期されます。
KCL 1.0 - 1.13 と KCL 2.0 - 2.2 での同期
KCL 1.0 - 1.13 および KCL 2.0 - 2.2 では、コンシューマーアプリケーションのブートストラップ、および各データストリームのリシャードイベント中に、KCL は、ListShards
または DescribeStream
検出 API を呼び出して、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。上記のすべての KCL バージョンで、KCL コンシューマーアプリケーションの各ワーカーは、コンシューマーアプリケーションのブートストラップ中および各ストリームリシャードイベントでリース/シャード同期プロセスを実行するために、次の手順を完了します。
-
処理中のストリームのデータのすべてのシャードをフェッチします。
-
リーステーブルからすべてのシャードリースをフェッチします。
-
リーステーブルにリースのないオープンシャードをフィルターで除外します。
-
見つかったすべてのオープンシャードと、開いている親を持たない各オープンシャードについて反復処理します。
-
階層ツリーをその祖先パスを通過して、シャードが子孫であるかどうかを判断します。祖先シャードが処理されている場合 (リーステーブルに祖先シャードのリースエントリが存在する場合)、または祖先シャードを処理する必要がある場合 (例えば、初期位置が
TRIM_HORIZON
またはAT_TIMESTAMP
)、シャードは子孫と見なされます。 -
コンテキスト内のオープンシャードが子孫である場合、KCL は初期位置に基づいてシャードをチェックポイントし、必要に応じて親のリースを作成します。
-
KCL 2.x での同期、KCL 2.3 以降で始まる
サポートされている最新バージョンの KCL 2.x (KCL 2.3) 以降では、ライブラリで同期プロセスに対する次の変更がサポートされるようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションから Kinesis Data Streams サービスに対して実行される API コールの数が大幅に削減され、KCL コンシューマーアプリケーションのリース管理が最適化されます。
-
アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL は
ListShard
API のフィルタリングオプション (ShardFilter
オプションのリクエストパラメータ) を使用して、ShardFilter
パラメータで指定された時間に開いているシャードのスナップショットに対してのみリースを取得および作成します。ShardFilter
パラメーターを使用すると、ListShards
API の応答をフィルターで除外できます。ShardFilter
パラメータの唯一の必須プロパティはType
です。KCL はType
フィルタープロパティとその次の有効な値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。-
AT_TRIM_HORIZON
- 応答には、TRIM_HORIZON
で開いていたすべてのシャードが含まれます。 -
AT_LATEST
- 応答には、データストリームの現在開いているシャードのみが含まれます。 -
AT_TIMESTAMP
- 応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。
ShardFilter
は空のリーステーブルのリースを作成して、RetrievalConfig#initialPositionInStreamExtended
で指定したシャードのスナップショットのリースを初期化するときに使用されます。ShardFilter
の詳細については、「https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html」を参照してください。 -
-
すべてのワーカーがリース/シャード同期を実行して、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、選択された単一のワーカーリーダーがリース/シャードの同期を実行します。
-
KCL 2.3 は、
GetRecords
およびSubscribeToShard
APIのリターンパラメータChildShards
を使用して、閉じたシャードに対してSHARD_END
で発生するリース/シャード同期を実行します。これにより、KCL ワーカーは、処理が終了したシャードの子シャードに対してのみリースを作成できます。コンシューマーアプリケーション全体で共有する場合、リース/シャード同期のこの最適化ではGetRecords
API のChildShards
パラメータを使用します。専用スループット (拡張ファンアウト) コンシューマーアプリケーションの場合、リース/シャード同期のこの最適化ではSubscribeToShard
API のChildShards
パラメータを使用します。詳細については、GetRecords、SubscribeToShards、および ChildShard を参照してください。 -
上記の変更により、KCL の動作は、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行します。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL は、リーステーブル内の潜在的なホールを特定するために、追加の定期的なシャード/リーススキャンを実行して (つまり、すべての新しいシャードについて学習する)、データストリームの完全なハッシュ範囲が処理されていることを確認し、必要に応じてそれらのリースを作成します。
PeriodicShardSyncManager
は定期的なリース/シャードスキャンの実行を担当するコンポーネントです。KCL 2.3 の
PeriodicShardSyncManager
の詳細については、https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213を参照してください。 KCL 2.3 では、新しい設定オプションを使用して、
LeaseManagementConfig
でPeriodicShardSyncManager
を設定できるようになりました。名前 デフォルト値 説明 leasesRecoveryAuditorExecutionFrequencyMillis 120000 (2 分)
リーステーブルで部分的なリースをスキャンする監査ジョブの頻度 (ミリ単位)。監査者がストリームのリースの穴を検出すると、
leasesRecoveryAuditorInconsistencyConfidenceThreshold
に基づいてシャード同期がトリガーされます。leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断するための、定期的な監査ジョブの信頼しきい値。監査者がデータストリームに対して同じ不整合セットを何度も繰り返し検出すると、シャード同期がトリガーされます。
新しい CloudWatch メトリクスも発行され、
PeriodicShardSyncManager
のヘルスをモニタリングします。詳細については、PeriodicShardSyncManagerを参照してください。 -
HierarchicalShardSyncer
への最適化を含めて、シャードの 1 つのレイヤーに対してのみリースを作成します。
KCL 1.x での同期、KCL 1.14 以降で始まる
サポートされている最新バージョンの KCL 1.x (KCL 1.14) 以降では、ライブラリで同期プロセスに対する次の変更がサポートされるようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションから Kinesis Data Streams サービスに対して実行される API コールの数が大幅に削減され、KCL コンシューマーアプリケーションのリース管理が最適化されます。
-
アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL は
ListShard
API のフィルタリングオプション (ShardFilter
オプションのリクエストパラメータ) を使用して、ShardFilter
パラメータで指定された時間に開いているシャードのスナップショットに対してのみリースを取得および作成します。ShardFilter
パラメーターを使用すると、ListShards
API の応答をフィルターで除外できます。ShardFilter
パラメータの唯一の必須プロパティはType
です。KCL はType
フィルタープロパティとその次の有効な値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。-
AT_TRIM_HORIZON
- 応答には、TRIM_HORIZON
で開いていたすべてのシャードが含まれます。 -
AT_LATEST
- 応答には、データストリームの現在開いているシャードのみが含まれます。 -
AT_TIMESTAMP
- 応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。
ShardFilter
は空のリーステーブルのリースを作成して、KinesisClientLibConfiguration#initialPositionInStreamExtended
で指定したシャードのスナップショットのリースを初期化するときに使用されます。ShardFilter
の詳細については、「https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html」を参照してください。 -
-
すべてのワーカーがリース/シャード同期を実行して、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、選択された単一のワーカーリーダーがリース/シャードの同期を実行します。
-
KCL 1.14 は、
GetRecords
およびSubscribeToShard
APIのリターンパラメータChildShards
を使用して、閉じたシャードに対してSHARD_END
で発生するリース/シャード同期を実行します。これにより、KCL ワーカーは、処理が終了したシャードの子シャードに対してのみリースを作成できます。詳細については、GetRecords および ChildShard を参照してください。 -
上記の変更により、KCL の動作は、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行します。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL は、リーステーブル内の潜在的なホールを特定するために、追加の定期的なシャード/リーススキャンを実行して (つまり、すべての新しいシャードについて学習する)、データストリームの完全なハッシュ範囲が処理されていることを確認し、必要に応じてそれらのリースを作成します。
PeriodicShardSyncManager
は定期的なリース/シャードスキャンの実行を担当するコンポーネントです。KinesisClientLibConfiguration#shardSyncStrategyType
がShardSyncStrategyType.SHARD_END
に設定されると、PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold
は、シャード同期を強制するために、リーステーブル内のホールを含む連続スキャンの数のしきい値を決定するために使用されます。KinesisClientLibConfiguration#shardSyncStrategyType
がShardSyncStrategyType.PERIODIC
に設定されると、leasesRecoveryAuditorInconsistencyConfidenceThreshold
は無視されます。KCL 1.14での
PeriodicShardSyncManager
の詳細については、https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999を参照してください。 KCL 1.14 では、新しい設定オプションを使用して、
LeaseManagementConfig
でPeriodicShardSyncManager
を設定できるようになりました。名前 デフォルト値 説明 leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断するための、定期的な監査ジョブの信頼しきい値。監査者がデータストリームに対して同じ不整合セットを何度も繰り返し検出すると、シャード同期がトリガーされます。
新しい CloudWatch メトリクスも発行され、
PeriodicShardSyncManager
のヘルスをモニタリングします。詳細については、PeriodicShardSyncManagerを参照してください。 -
KCL 1.14 は、遅延リースのクリーンアップもサポートするようになりました。リースは、
SHARD_END
に到達したとき、シャードがデータストリームの保持期間を過ぎて期限切れになったとき、またはリシャーディングオペレーションの結果として閉じられたとき、LeaseCleanupManager
により非同期的に削除されます。新しい設定オプションを使用して、
LeaseCleanupManager
を設定できるようになりました。名前 デフォルト値 説明 leaseCleanupIntervalMillis 1 分
リースクリーンアップスレッドを実行する間隔。
completedLeaseCleanupIntervalMillis 5 分 リースが完了したかどうかをチェックする間隔。
garbageLeaseCleanupIntervalMillis 30 分 リースがガベージであるかどうかをチェックする間隔 (つまり、データストリームの保持期間を過ぎてトリミング)。
-
KinesisShardSyncer
への最適化を含めて、シャードの 1 つのレイヤーに対してのみリースを作成します。
Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する
このセクションでは、複数のデータストリームを同時に処理できる KCL コンシューマーアプリケーションを作成できる KCL 2.x for Java での次の変更について説明します。
重要
マルチストリーム処理は、Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。
KCL 2.x を実装できる他の言語では、マルチストリーム処理はサポートされていません。
マルチストリーム処理は KCL 1.x のどのバージョンでもサポートされていません。
-
MultistreamTracker インターフェイス
複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、MultistreamTracker
という新しいインターフェイスを実装する必要があります。このインターフェースには、KCL コンシューマーアプリケーションによって処理されるデータストリームとその設定のリストを返す streamConfigList
メソッドが含まれています。処理中のデータストリームは、コンシューマーアプリケーションのランタイム中に変更できることに注意してください。streamConfigList
は、処理するデータストリームの変更について学習するために KCL によって定期的に呼び出されます。streamConfigList
メソッドが StreamConfigリストに入力します。 package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
StreamIdentifier
およびInitialPositionInStreamExtended
は必須フィールドですが、consumerArn
は省略可能である点に注意してください。KCL 2.x を使用して拡張ファンアウトコンシューマーアプリケーションを実装する場合にのみ、consumerArn
を提供する必要があります。の詳細については
StreamIdentifier
、https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129を参照してください。 StreamIdentifier
を作成するには、streamArn
および v2.5.0 以降で利用可能なstreamCreationEpoch
からマルチストリームインスタンスを作成することをお勧めします。streamArm
をサポートしていない KCL v2.3 および v2.4 では、account-id:StreamName:streamCreationTimestamp
形式を使用してマルチストリームインスタンスを作成します。この形式は廃止され、次のメジャーリリース以降はサポートされなくなります。MultistreamTracker
には、リーステーブル内の古いストリームのリースを削除するための戦略も含まれます(formerStreamsLeasesDeletionStrategy
)。コンシューマーアプリケーションのランタイム中は、ストラテジーを変更できないことに注意してください。 詳細については、https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.javaを参照してください。 -
ConfigsBuilder
は、アプリケーション全体のクラスで、KCL コンシューマーアプリケーションの構築時に使用する KCL 2.x の構成設定をすべて指定するために使用できます。 ConfigsBuilder
クラスはMultistreamTracker
インターフェイスをサポートするようになりました。ConfigsBuilder は、レコードを消費する 1 つのデータストリームの名前を使用して初期化できます。/** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
または、複数のストリームを同時に処理する KCL コンシューマーアプリケーションを実装する場合、
MultiStreamTracker
で ConfigsBuilder を初期化することもできます。* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
KCL コンシューマーアプリケーションにマルチストリームサポートが実装されているため、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれます。
-
KCL コンシューマーアプリケーションのマルチストリームサポートが実装されている場合、leaseKey は次の構造を取ります:
account-id:StreamName:streamCreationTimestamp:ShardId
。例えば、111111111:multiStreamTest-1:12345:shardId-000000000336
と指定します。重要
KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理する場合、リーステーブルのハッシュキー leaseKey はシャード ID です。この既存の KCL コンシューマーアプリケーションを複数のデータストリームを処理するように再構成すると、リーステーブルが壊れます。マルチストリームサポートでは LeaseKey 構造体は次のようになっている必要があるためです:
account-id:StreamName:StreamCreationTimestamp:ShardId
。
スキーマレジストリで KCL AWS Glue を使用する
Kinesis データストリームを AWS Glue Schema Registry と統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマは、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glueスキーマレジストリを使用すると、ストリーミングアプリケーション内のend-to-endのデータ品質とデータガバナンスを向上させることができます。詳細については、AWS Glue スキーマレジストリを参照してください。この統合を設定する方法の 1 つは、Java で KCL を使用することです。
重要
現在、Kinesis Data Streams と AWS Glue Schema Registry の統合は、Java に実装された KCL 2.3 コンシューマーを使用する Kinesis データストリームでのみサポートされています。多言語サポートは提供されていません。KCL 1.0 コンシューマーはサポートされていません。KCL 2.3 より前の KCL 2.x コンシューマーはサポートされていません。
KCL を使用して Kinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細については、「ユースケース: Amazon Kinesis Data Streams と AWS Glue スキーマレジストリの統合」の「KPL/KCL ライブラリを使用したデータの操作」セクションを参照してください。