KCL 1.x および 2.x の情報 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

KCL 1.x および 2.x の情報

注記

Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。パフォーマンスと新機能が向上したKCLバージョン 3.x に移行することをお勧めします。最新のKCLドキュメントと移行ガイドについては、「」を参照してくださいKinesis Client Library を使用する

KDS データストリームからのデータを処理できるカスタムコンシューマーアプリケーションを開発する方法の 1 つは、Kinesis Client Library () を使用することですKCL。

注記

KCL 1.x と 2.x KCL の両方で、使用シナリオに応じて、最新の KCL 1.x バージョンまたは KCL 2.x バージョンにアップグレードすることをお勧めします。1.x KCL と KCL 2.x はどちらも、最新の依存関係とセキュリティのパッチ、バグ修正、および下位互換性のある新機能を含む新しいリリースで定期的に更新されます。詳細については、https://github.com/awslabs/amazon-kinesis-client「/releases」を参照してください。

について KCL (以前のバージョン)

KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis データストリームからのデータの消費と処理に役立ちます。これには、複数のコンシューマーアプリケーションインスタンス間での負荷分散、コンシューマーアプリケーションインスタンスの障害に対する応答、処理済みのレコードのチェックポイント作成、リシャーディングへの対応が挙げられます。KCL はこれらのすべてのサブタスクを処理するため、カスタムレコード処理ロジックの作成に集中できます。

KCL は、 でAPIs利用可能な Kinesis Data Streams とは異なります AWS SDKs。Kinesis Data Streams APIsは、ストリームの作成、リシャーディング、レコードの配置と取得など、Kinesis Data Streams のさまざまな側面を管理するのに役立ちます。KCL は、特にコンシューマーアプリケーションのカスタムデータ処理ロジックに集中できるように、これらのすべてのサブタスクを抽象化します。Kinesis Data Streams の詳細についてはAPI、Amazon KinesisAPIリファレンス」を参照してください。

重要

KCL は Java ライブラリです。Java 以外の言語のサポートは、 と呼ばれる多言語インターフェイスを使用して提供されます MultiLangDaemon。このデーモンは Java ベースで、Java 以外のKCL言語を使用している場合にバックグラウンドで実行されます。例えば、 KCL for Python をインストールし、コンシューマーアプリケーションを Python で完全に記述する場合、 のためにシステムに Java をインストールする必要があります MultiLangDaemon。さらに、 MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定があります。 MultiLangDaemon の の詳細については GitHub、「 KCL MultiLangDaemon プロジェクト」を参照してください。

は、レコード処理ロジックと Kinesis Data Streams の間の仲介KCLとして機能します。

KCL 以前のバージョン

現在、以下のサポートされているバージョンの のいずれかを使用してKCL、カスタムコンシューマーアプリケーションを構築できます。

1KCL.x または 2.x KCL のいずれかを使用して、共有スループットを使用するコンシューマーアプリケーションを構築できます。詳細については、「を使用して共有スループットでカスタムコンシューマーを開発する KCL」を参照してください。

専用スループット (拡張ファンアウトコンシューマー) を使用するコンシューマーアプリケーションを構築するには、 2.x KCL のみを使用できます。詳細については、「専用スループットによる拡張ファンアウトコンシューマーの開発」を参照してください。

1.x と 2.x KCL の違い、および KCL 1.x から KCL 2.x KCL に移行する方法については、「」を参照してくださいコンシューマーを KCL 1.x から 2.x KCL に移行する

KCL の概念 (以前のバージョン)

  • KCL コンシューマーアプリケーション – を使用してカスタムビルドKCLされ、データストリームからレコードを読み取って処理するように設計されたアプリケーション。

  • コンシューマーアプリケーションインスタンス - KCLコンシューマーアプリケーションは通常、障害時に調整し、データレコード処理を動的に負荷分散するために、1 つ以上のアプリケーションインスタンスを同時に実行して分散されます。

  • ワーカー – KCLコンシューマーアプリケーションインスタンスがデータの処理を開始するために使用する高レベルクラス。

    重要

    各KCLコンシューマーアプリケーションインスタンスには 1 つのワーカーがあります。

    ワーカーは、シャードとリース情報の同期、シャード割り当ての追跡、シャードからのデータの処理など、さまざまなタスクを初期化し、監督します。ワーカーは、このコンシューマーアプリケーションKCLが処理するデータレコードのデータストリームの名前や、このデータストリームへのアクセスに必要な AWS 認証情報など、KCLコンシューマーアプリケーションの設定情報を提供します。また、ワーカーは、その特定のKCLコンシューマーアプリケーションインスタンスを起動して、データストリームからレコードプロセッサにデータレコードを配信します。

    重要

    1.x KCL では、このクラスはワーカーと呼ばれます。詳細については、/https://github.com/awslabs/amazon-kinesis-clientblob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java を参照してください (これらは Java KCLリポジトリです)。2.x KCL では、このクラスはスケジューラと呼ばれます。2.x KCL でのスケジューラの目的と 1.x KCL でのワーカーの目的は同じです。2KCL.x のスケジューラクラスの詳細については、https://github.com/awslabs/amazon-kinesis-client「/.blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Schedulerjava」を参照してください。

  • リース - ワーカーとシャード間のバインディングを定義するデータ。分散KCLコンシューマーアプリケーションは、リースを使用して、ワーカーのフリート間でデータレコード処理を分割します。いつでも、データレコードの各シャードは、 leaseKey変数によって識別されるリースによって特定のワーカーにバインドされます。

    デフォルトでは、ワーカーは 1 つ以上のリースを同時に保持できます (maxLeasesForWorker 変数の値の影響を受けます)。

    重要

    すべてのワーカーは、データストリーム内の利用可能なすべてのシャードについて、利用可能なすべてのリースを保持すると競合します。しかし、一度に各リースを正常に保持できるのは1人のワーカーだけです。

    例えば、4 つのシャードを持つデータストリームを処理しているワーカー A を持つコンシューマーアプリケーションインスタンス A がある場合、ワーカー A はシャード 1、2、3、および 4 へのリースを同時に保持できます。ただし、2 つのコンシューマーアプリケーションインスタンス (ワーカー A とワーカー B を含む A と B) があり、これらのインスタンスが 4 つのシャードを持つデータストリームを処理している場合、ワーカー A とワーカー B はシャード 1 へのリースを同時に保持できません。あるワーカーは、このシャードのデータレコードの処理を停止する準備ができるまで、または失敗するまで、特定のシャードへのリースを保持します。あるワーカーがリースの保留を停止すると、別のワーカーがリースを引き取り、保留します。

    詳細については、(Java KCLリポジトリ) 「1.x の場合は https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java」、2.x KCL の場合は https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.javaKCL」を参照してください。

  • リーステーブル - KCLコンシューマーアプリケーションのワーカーによってリースおよび処理されるKDSデータストリーム内のシャードを追跡するために使用される一意の Amazon DynamoDB テーブル。リーステーブルは、KCLコンシューマーアプリケーションの実行中、データストリームからの最新のシャード情報と (ワーカー内およびすべてのワーカー間で) 同期している必要があります。詳細については、「リーステーブルを使用して、KCLコンシューマーアプリケーションによって処理されたシャードを追跡する」を参照してください。

  • レコードプロセッサ – KCLコンシューマーアプリケーションがデータストリームから取得したデータを処理する方法を定義するロジック。実行時に、KCLコンシューマーアプリケーションインスタンスはワーカーをインスタンス化し、このワーカーはリースを保持するシャードごとに 1 つのレコードプロセッサをインスタンス化します。

リーステーブルを使用して、KCLコンシューマーアプリケーションによって処理されたシャードを追跡する

リーステーブルとは何ですか?

Amazon Kinesis Data Streams アプリケーションごとに、 は一意のリーステーブル (Amazon DynamoDB テーブルに格納) KCLを使用して、KCLコンシューマーアプリケーションのワーカーによってリースおよび処理されるKDSデータストリーム内のシャードを追跡します。

重要

KCL はコンシューマーアプリケーションの名前を使用して、このコンシューマーアプリケーションが使用するリーステーブルの名前を作成します。したがって、各コンシューマーアプリケーション名は一意である必要があります。

コンシューマーアプリケーションの実行中に、Amazon DynamoDB コンソールを使用してリーステーブルを表示できます。

アプリケーションの起動時にKCLコンシューマーアプリケーションのリーステーブルが存在しない場合、ワーカーの 1 人がこのアプリケーションのリーステーブルを作成します。

重要

アカウントには、Kinesis Data Streams 自体に関連するコストに加えて、DynamoDB テーブルに関連するコストが発生します。

テーブルの各行は、コンシューマーアプリケーションのワーカーによって処理中のシャードを表します。KCL コンシューマーアプリケーションleaseKeyが 1 つのデータストリームのみを処理する場合、リーステーブルのハッシュキーはシャード ID です。の場合同じ 2.x for Java KCL コンシューマーアプリケーションで複数のデータストリームを処理する、 の構造 leaseKey は次のようになります: account-id:StreamName:streamCreationTimestamp:ShardId。例えば、111111111:multiStreamTest-1:12345:shardId-000000000336 と指定します。

シャード ID に加えて、各行には次のデータが含まれます。

  • checkpoint: シャードの最新チェックポイントのシーケンス番号。この値はストリームのすべてのシャードで一意です。

  • checkpointSubSequence数値: Kinesis プロデューサーライブラリの集約機能を使用する場合、これは Kinesis レコード内の個々のユーザーレコードを追跡するチェックポイントの拡張機能です。

  • leaseCounter: リースのバージョニングに使用して、ワーカーが自分のリースが別のワーカーによって取得されたことを検出できるようにします。

  • leaseKey: リースの一意の識別子。各リースはデータストリームのシャードに固有であり、一度に 1 つのワーカーで保持されます。

  • leaseOwner: このリースを保持しているワーカー。

  • ownerSwitchesSinceチェックポイント: このリースが、チェックポイントが最後に書き込まれた時点からワーカーを変更した回数。

  • parentShardId: 子シャードで処理を開始する前に、親シャードが完全に処理されるようにするために使用します。これにより、レコードがストリームに入力されたのと同じ順序で処理されるようになります。

  • hashrange: PeriodicShardSyncManager で使われて、定期的な同期を実行し、リーステーブルで欠落しているシャードを見つけ、必要に応じてリースを作成します。

    注記

    このデータは、1.14 および 2.3 KCL 以降のすべてのシャードKCLのリーステーブルに存在します。PeriodicShardSyncManager の詳細およびリースとシャード間の定期的な同期については、Kinesis データストリームのシャードとリーステーブルの同期方法 を参照してください。

  • childshards: LeaseCleanupManager で使われて、子シャードの処理ステータスを確認し、親シャードをリーステーブルから削除できるかどうかを決定します。

    注記

    このデータは、1.14 および 2.3 KCL 以降のすべてのシャードKCLのリーステーブルに存在します。

  • shardID: シャードの ID。

    注記

    このデータは、同じ 2.x for Java KCL コンシューマーアプリケーションで複数のデータストリームを処理する である場合にのみリーステーブルに存在します。これは Java KCL 用 2.x でのみサポートされており、Java 用 KCL 2.3 以降でサポートされています。

  • stream name 以下の形式のデータストリームの識別子: account-id:StreamName:streamCreationTimestamp

    注記

    このデータは、同じ 2.x for Java KCL コンシューマーアプリケーションで複数のデータストリームを処理する である場合にのみリーステーブルに存在します。これは Java KCL 用 2.x でのみサポートされており、Java 用 KCL 2.3 以降でサポートされています。

スループット

Amazon Kinesis Data Streams アプリケーションでプロビジョニングされたスループットの例外が発生した場合は、DynamoDB テーブルのプロビジョニングされたスループットを増やす必要があります。KCL がテーブルを作成するときにプロビジョニングされるスループットは、1 秒あたりの読み込み 10 回、1 秒あたりの書き込み 10 回ですが、これがユーザーのアプリケーションで十分でない場合があります。例えば、Amazon Kinesis Data Streams アプリケーションが頻繁にチェックポイントを作成する場合や、多くのシャードで構成されるストリームを処理する場合は、より多くのスループットが必要になる可能性があります。

DynamoDB でプロビジョニングされたスループットについては、Amazon DynamoDB デベロッパーガイド読み取り/書き込み容量モードおよびテーブルとデータの操作を参照してください。

Kinesis データストリームのシャードとリーステーブルの同期方法

KCL コンシューマーアプリケーションのワーカーは、リースを使用して特定のデータストリームからのシャードを処理します。特定の時点でどのワーカーがどのシャードをリースしているかに関する情報は、リーステーブルに保存されます。リーステーブルは、KCLコンシューマーアプリケーションの実行中、データストリームからの最新のシャード情報と同期している必要があります。 は、コンシューマーアプリケーションのブートストラップ中 (コンシューマーアプリケーションの初期化時または再起動時) および処理中のシャードが終了 (リシャーディング) するたびに、リーステーブルを Kinesis Data Streams サービスから取得したシャード情報とKCL同期します。つまり、ワーカーまたはKCLコンシューマーアプリケーションは、最初のコンシューマーアプリケーションのブートストラップ中、およびコンシューマーアプリケーションがデータストリームリシャードイベントに遭遇するたびに、処理中のデータストリームと同期されます。

1.0KCL~1.13 および KCL2.0~2.2 での同期

KCL 1.0 - 1.13 および KCL 2.0 - 2.2 では、コンシューマーアプリケーションのブートストラップ中、および各データストリームリシャードイベント中に、 は ListShardsまたは DescribeStream検出 を呼び出して、リーステーブルを Kinesis Data Streams サービスから取得したシャード情報とKCL同期しますAPIs。上記のすべてのKCLバージョンで、KCLコンシューマーアプリケーションの各ワーカーは、コンシューマーアプリケーションのブートストラップ中および各ストリームリシャードイベントでリース/シャード同期プロセスを実行するために、次のステップを実行します。

  • 処理中のストリームのデータのすべてのシャードをフェッチします。

  • リーステーブルからすべてのシャードリースをフェッチします。

  • リーステーブルにリースのないオープンシャードをフィルターで除外します。

  • 見つかったすべてのオープンシャードと、開いている親を持たない各オープンシャードについて反復処理します。

    • 階層ツリーをその祖先パスを通過して、シャードが子孫であるかどうかを判断します。祖先シャードが処理されている場合 (リーステーブルに祖先シャードのリースエントリが存在する場合)、または祖先シャードを処理する必要がある場合 (例えば、初期位置がTRIM_HORIZONまたはAT_TIMESTAMP)、シャードは子孫と見なされます。

    • コンテキスト内のオープンシャードが子孫の場合、 は初期位置に基づいてシャードをKCLチェックポイントし、必要に応じて親のリースを作成します。

2.x KCL での同期、2.3 KCL 以降

サポートされている最新バージョンの KCL 2.x (KCL 2.3) 以降では、ライブラリは同期プロセスに対する以下の変更をサポートするようになりました。これらのリース/シャード同期の変更により、KCLコンシューマーアプリケーションから Kinesis Data Streams サービスへのAPI呼び出し回数が大幅に減少し、KCLコンシューマーアプリケーションのリース管理が最適化されます。

  • アプリケーションのブートストラップ中に、リーステーブルが空の場合、 は ListShard APIのフィルタリングオプション (ShardFilterオプションのリクエストパラメータ) KCLを使用して、 ShardFilterパラメータで指定された時間に開かれたシャードのスナップショットのリースのみを取得して作成します。ShardFilter パラメータを使用すると、 ListShards のレスポンスを除外できますAPI。ShardFilter パラメータの唯一の必須プロパティは ですType。 はType、フィルタープロパティとその有効な値の次の KCLを使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。

    • AT_TRIM_HORIZON - 応答には、TRIM_HORIZON で開いていたすべてのシャードが含まれます。

    • AT_LATEST - 応答には、データストリームの現在開いているシャードのみが含まれます。

    • AT_TIMESTAMP - 応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。

    ShardFilter は空のリーステーブルのリースを作成して、RetrievalConfig#initialPositionInStreamExtended で指定したシャードのスナップショットのリースを初期化するときに使用されます。

    ShardFilter の詳細については、「https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html」を参照してください。

  • lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard 同期を実行するすべてのワーカーの代わりに。

  • KCL 2.3 は、 GetRecordsと の ChildShards return パラメータSubscribeToShardAPIsを使用して、閉じたシャードSHARD_ENDに対して で発生するリース/シャード同期を実行し、KCLワーカーが処理が終了したシャードの子シャードのリースのみを作成できるようにします。コンシューマーアプリケーション全体で共有する場合、リース/シャード同期のこの最適化ではGetRecords、 の ChildShardsパラメータを使用しますAPI。専用スループット (拡張ファンアウト) コンシューマーアプリケーションの場合、リース/シャード同期のこの最適化ではSubscribeToShard、 の ChildShardsパラメータを使用しますAPI。詳細については、GetRecordsSubscribeToShardsChildShard を参照してください。

  • 上記の変更により、 の動作KCLは、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行します。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、 KCL は追加の定期的なシャード/リーススキャンも実行し、リーステーブルの潜在的な穴 (つまり、すべての新しいシャードについて学習する) を特定して、データストリームの完全なハッシュ範囲が処理されていることを確認し、必要に応じてリースを作成します。 PeriodicShardSyncManagerは、定期的なリース/シャードスキャンの実行を担当するコンポーネントです。

    2.3 KCL PeriodicShardSyncManagerの の詳細については、https://github.com/awslabs/amazon-kinesis-client「/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213」を参照してください。

    KCL 2.3 では、新しい設定オプションが PeriodicShardSyncManagerで設定できるようになりましたLeaseManagementConfig

    名前 デフォルト値 説明
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000 (2 分)

    リーステーブルで部分的なリースをスキャンする監査ジョブの頻度 (ミリ単位)。監査者がストリームのリースの穴を検出すると、leasesRecoveryAuditorInconsistencyConfidenceThreshold に基づいてシャード同期がトリガーされます。

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断するための、定期的な監査ジョブの信頼しきい値。監査者がデータストリームに対して同じ不整合セットを何度も繰り返し検出すると、シャード同期がトリガーされます。

    の正常性をモニタリングするために、新しい CloudWatch メトリクスも出力されるようになりましたPeriodicShardSyncManager。詳細については、「PeriodicShardSyncManager」を参照してください。

  • HierarchicalShardSyncer への最適化を含めて、シャードの 1 つのレイヤーに対してのみリースを作成します。

1.x KCL での同期、1.14 KCL 以降

サポートされている最新バージョンの 1.x (KCL 1.14) KCL 以降では、ライブラリは同期プロセスに対する以下の変更をサポートするようになりました。これらのリース/シャード同期の変更により、KCLコンシューマーアプリケーションから Kinesis Data Streams サービスへのAPI呼び出し回数が大幅に減少し、KCLコンシューマーアプリケーションのリース管理が最適化されます。

  • アプリケーションのブートストラップ中に、リーステーブルが空の場合、 は ListShard APIのフィルタリングオプション (ShardFilterオプションのリクエストパラメータ) KCLを使用して、 ShardFilterパラメータで指定された時間に開かれたシャードのスナップショットのみを取得してリースを作成します。ShardFilter パラメータを使用すると、 ListShards のレスポンスを除外できますAPI。ShardFilter パラメータの唯一の必須プロパティは ですType。 KCLはType、フィルタープロパティとその有効な値のうち、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。

    • AT_TRIM_HORIZON - 応答には、TRIM_HORIZON で開いていたすべてのシャードが含まれます。

    • AT_LATEST - 応答には、データストリームの現在開いているシャードのみが含まれます。

    • AT_TIMESTAMP - 応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。

    ShardFilter は空のリーステーブルのリースを作成して、KinesisClientLibConfiguration#initialPositionInStreamExtended で指定したシャードのスナップショットのリースを初期化するときに使用されます。

    ShardFilter の詳細については、「https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html」を参照してください。

  • lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard 同期を実行するすべてのワーカーの代わりに。

  • KCL 1.14 は、 GetRecordsおよび の ChildShards return パラメータSubscribeToShardAPIsを使用して、閉じたシャードSHARD_ENDに対して で発生するリース/シャード同期を実行し、KCLワーカーが処理が終了したシャードの子シャードのリースのみを作成できるようにします。詳細については、GetRecordsおよびChildShardを参照してください。

  • 上記の変更により、 の動作KCLは、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行します。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、 KCL は追加の定期的なシャード/リーススキャンも実行し、リーステーブルの潜在的な穴 (つまり、すべての新しいシャードについて学習する) を特定して、データストリームの完全なハッシュ範囲が処理されていることを確認し、必要に応じてリースを作成します。 PeriodicShardSyncManagerは、定期的なリース/シャードスキャンの実行を担当するコンポーネントです。

    KinesisClientLibConfiguration#shardSyncStrategyTypeShardSyncStrategyType.SHARD_END に設定されると、PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold は、シャード同期を強制するために、リーステーブル内のホールを含む連続スキャンの数のしきい値を決定するために使用されます。KinesisClientLibConfiguration#shardSyncStrategyTypeShardSyncStrategyType.PERIODIC に設定されると、leasesRecoveryAuditorInconsistencyConfidenceThreshold は無視されます。

    1.14 PeriodicShardSyncManagerの KCL の詳細については、https://github.com/awslabs/amazon-kinesis-client「/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999」を参照してください。

    KCL 1.14 では、新しい設定オプションが PeriodicShardSyncManagerで設定できるようになりましたLeaseManagementConfig

    名前 デフォルト値 説明
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断するための、定期的な監査ジョブの信頼しきい値。監査者がデータストリームに対して同じ不整合セットを何度も繰り返し検出すると、シャード同期がトリガーされます。

    の正常性をモニタリングするために、新しい CloudWatch メトリクスも出力されるようになりましたPeriodicShardSyncManager。詳細については、「PeriodicShardSyncManager」を参照してください。

  • KCL 1.14 では、遅延リースのクリーンアップもサポートされるようになりました。リースは、SHARD_END に到達したとき、シャードがデータストリームの保持期間を過ぎて期限切れになったとき、またはリシャーディングオペレーションの結果として閉じられたとき、LeaseCleanupManager により非同期的に削除されます。

    新しい設定オプションを使用して、LeaseCleanupManager を設定できるようになりました。

    名前 デフォルト値 説明
    leaseCleanupIntervalミリス

    1 分

    リースクリーンアップスレッドを実行する間隔。

    completedLeaseCleanupIntervalMillis 5 分

    リースが完了したかどうかをチェックする間隔。

    garbageLeaseCleanupIntervalMillis 30 分

    リースがガベージであるかどうかをチェックする間隔 (つまり、データストリームの保持期間を過ぎてトリミング)。

  • KinesisShardSyncer への最適化を含めて、シャードの 1 つのレイヤーに対してのみリースを作成します。

同じ 2.x for Java KCL コンシューマーアプリケーションで複数のデータストリームを処理する

このセクションでは、複数のデータストリームを同時に処理できるKCLコンシューマーアプリケーションを作成できる KCL 2.x for Java の以下の変更について説明します。

重要

マルチストリーム処理は Java KCL 用 2.x でのみサポートされており、Java 用 KCL 2.3 以降でサポートされています。

マルチストリーム処理は、 2.x KCL を実装できる他のすべての言語でNOTサポートされています。

マルチストリーム処理は、 1.x KCL のすべてのバージョンでNOTサポートされています。

  • MultistreamTracker インターフェイス

    複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、 という新しいインターフェイスを実装する必要がありますMultistreamTracker。このインターフェイスには、KCLコンシューマーアプリケーションによって処理されるデータストリームとその設定のリストを返す streamConfigListメソッドが含まれています。処理中のデータストリームは、コンシューマーアプリケーションのランタイム中に変更できることに注意してください。 streamConfigListは、処理するデータストリームの変更について学習KCLするために、 によって定期的に呼び出されます。

    streamConfigList メソッドによってStreamConfigリストが入力されます。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    StreamIdentifier および InitialPositionInStreamExtended は必須フィールドですが、consumerArn は省略可能である点に注意してください。2.x KCL を使用して拡張ファンアウトコンシューマーアプリケーションを実装しているconsumerArn場合にのみ、 を指定する必要があります。

    の詳細についてはStreamIdentifierhttps://github.com/awslabs/amazon-kinesis-client「/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129」を参照してください。StreamIdentifier を作成するには、streamArn および v2.5.0 以降で利用可能な streamCreationEpoch からマルチストリームインスタンスを作成することをお勧めします。をサポートしていない KCL v2.3 および v2.4 ではstreamArm、 形式を使用してマルチストリームインスタンスを作成しますaccount-id:StreamName:streamCreationTimestamp。この形式は廃止され、次のメジャーリリース以降はサポートされなくなります。

    MultistreamTracker には、リーステーブル内の古いストリームのリースを削除するための戦略も含まれます(formerStreamsLeasesDeletionStrategy)。コンシューマーアプリケーションのランタイム中に戦略CANNOTが変更されることに注意してください。詳細については、https://github.com/awslabs/「amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client.java」を参照してくださいsrc/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy。

  • ConfigsBuilder は、KCLコンシューマーアプリケーションの構築時に使用するすべての KCL 2.x 構成設定を指定するために使用できるアプリケーション全体のクラスです。 ConfigsBuilder クラスで MultistreamTrackerインターフェイスがサポートされるようになりました。レコードを消費する 1 つのデータストリームの名前を使用して、次のいずれかを初期化 ConfigsBuilderできます。

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    または、複数のストリームを同時に処理するKCLコンシューマーアプリケーションを実装MultiStreamTrackerする場合は、 ConfigsBuilder で を初期化できます。

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • KCL コンシューマーアプリケーションにマルチストリームサポートを実装すると、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれるようになりました。

  • KCL コンシューマーアプリケーションのマルチストリームサポートが実装されると、 は次の構造 leaseKey になります: account-id:StreamName:streamCreationTimestamp:ShardId。例えば、111111111:multiStreamTest-1:12345:shardId-000000000336 と指定します。

    重要

    既存のKCLコンシューマーアプリケーションが 1 つのデータストリームのみを処理するように設定されている場合、 leaseKey (リーステーブルのハッシュキー) はシャード ID です。この既存のKCLコンシューマーアプリケーションを再設定して複数のデータストリームを処理すると、リーステーブルが壊れます。これは、マルチストリームをサポートする場合、 leaseKey 構造が になる必要があるためですaccount-id:StreamName:StreamCreationTimestamp:ShardId

スキーマレジストリKCLで AWS Glue を使用する

Kinesis データストリームを AWS Glue Schema Registry と統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマは、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glueスキーマレジストリを使用すると、ストリーミングアプリケーション内のデータ品質とデータガバナンスを改善 end-to-endできます。詳細については、AWS Glue スキーマレジストリを参照してください。この統合を設定する方法の 1 つは、Java KCLの を使用することです。

重要

現在、Kinesis Data Streams と AWS Glue Schema Registry の統合は、Java に実装された 2.3 KCL コンシューマーを使用する Kinesis データストリームでのみサポートされています。多言語サポートは提供されていません。 KCL 1.0 コンシューマーはサポートされていません。 KCL 2.3 より前の KCL 2.x コンシューマーはサポートされていません。

を使用して Kinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細についてはKCL、「ユースケース: Amazon Kinesis Data Streams と Glue スキーマレジストリの統合」の「 KPL/KCL ライブラリを使用したデータの操作」セクションを参照してください。 Amazon Kinesis AWS