翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
コンシューマーを実装する
チュートリアル: KPLおよび 1.x KCL を使用してリアルタイムの株式データを処理する のコンシューマーアプリケーションでは、プロデューサーの実装で作成した株式取引ストリームを継続的に処理します。その後、1 分ごとに売買されている最も人気のある株式を出力します。アプリケーションは Kinesis Client Library (KCL) 上に構築されており、コンシューマーアプリケーションに共通する面倒な作業の多くを行います。詳細については、「1.x KCL コンシューマーの開発」を参照してください。
ソースコードを参照し、次の情報を確認してください。
- StockTradesProcessor クラス
-
事前に用意されているコンシューマーのメインクラスで、次のタスクを実行します。
-
引数として渡されたアプリケーション、ストリーム、およびリージョン名を読み取ります。
-
~/.aws/credentials
から認証情報を読み取ります。 -
RecordProcessor
のインスタンスとして機能し、StockTradeRecordProcessor
インスタンスによって実装される、RecordProcessorFactory
インスタンスを作成します。 -
RecordProcessorFactory
インスタンスと、ストリーム名、認証情報、アプリケーション名などの標準設定を使用してKCLワーカーを作成します。 -
このワーカーは、(このコンシューマーインスタンスに割り当てられた) 各シャードに新しいスレッドを作成します。これにより、継続的に Kinesis Data Streams からレコードが読み取られます。次に、
RecordProcessor
インスタンスを呼び出して、受信したレコードのバッチを処理します。
-
- StockTradeRecordProcessor クラス
-
RecordProcessor
インスタンスを実装したら、次にinitialize
、processRecords
、shutdown
の 3 つの必須メソッドを実装します。Kinesis Client Library によって使用される
initialize
およびshutdown
は、名前が示すとおり、レコードの受信がいつ開始し、いつ終了するかをレコードプロセッサに知らせます。これにより、レコードプロセッサは、アプリケーションに固有の設定および終了タスクを行うことができます。これらのコードは事前に用意されています。主な処理はprocessRecords
メソッドで行われ、そこでは各レコードのprocessRecord
が使用されます。後者のメソッドは、ほとんどの場合、空のスケルトンコードとして提供されます。次のステップでは、これを実装する方法について説明します。詳細は、次のステップを参照してください。また、
processRecord
のサポートメソッドであるreportStats
およびresetStats
の実装にも注目してください。これらのメソッドは、元のソースコードでは空になっています。processRecords
メソッドは既に実装されており、次のステップを実行します。-
渡された各レコードについて、レコード上で
processRecord
を呼び出します。 -
最後のレポートから 1 分間以上経過した場合は、
reportStats()
を呼び出して最新の統計を出力し、次の間隔に新しいレコードのみ含まれるようにresetStats()
を呼び出して統計を消去します。 -
次のレポート時間を設定します。
-
最後のチェックポイントから 1 分間以上経過した場合は、
checkpoint()
を呼び出します。 -
次のチェックポイント時間を設定します。
このメソッドでは、60 秒間間隔でレポートおよびチェックポイント時間が設定されています。チェックポイントの詳細については、コンシューマーに関する追加情報を参照してください。
-
- StockStats クラス
-
このクラスでは、データを保持し、最も人気のある株式の経時的な統計を示すことができます。このコードは、事前に用意されており、次のメソッドが含まれています。
-
addStockTrade(StockTrade)
: 指定されたStockTrade
を実行中の統計に取り込みます。 -
toString()
: 特定の形式の文字列として統計を返します。
このクラスは、各株式の取引の合計数と最大数を累計して、最も人気のある株式を追跡します。これらの数は、株式取引を受け取る度に更新されます。
-
次のステップに示されているコードを StockTradeRecordProcessor
クラスのメソッドに追加します。
コンシューマーを実装するには
-
processRecord
メソッドを実装するには、サイズの正しいStockTrade
オブジェクトを開始し、それにレコードデータを追加します。また、問題が発生した場合に警告がログに記録されるようにします。StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
-
簡単な
reportStats
メソッドを実装します。出力形式は好みに応じて自由に変更することができます。System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
最後に、新しい
stockStats
インスタンスを作成するresetStats
メソッドを実装します。stockStats = new StockStats();
コンシューマーを実行するには
-
プロデューサーの実装 で記述したプロデューサーを実行し、シミュレートした株式取引レコードをストリームに取り込みます。
-
以前に取得したアクセスキーとシークレットキーペア (IAMユーザーの作成時) がファイル
~/.aws/credentials
に保存されていることを確認します。 -
次の引数を指定して
StockTradesProcessor
クラスを実行します。StockTradesProcessor StockTradeStream us-west-2
us-west-2
以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。
1 分後、次のような出力が表示されます。その後、1 分間ごとに出力が更新されます。
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
コンシューマーに関する追加情報
1.x KCL コンシューマーの開発などで説明されている Kinesis Client Library のメリットに詳しい方であれば、ここで使用することに疑問を感じるかもしれません。処理には 1 つのシャードストリームと 1 つのコンシューマーインスタンスのみを使用しますが、 を使用してコンシューマーを実装する方が簡単ですKCL。プロデューサーセクションとコンシューマーセクションのコードの実装手順を比較すると、コンシューマーの実装の方が比較的に簡単であることがわかります。これは主に、 KCLが提供するサービスによるものです。
このアプリケーションでは、個別のレコードを処理できるレコードプロセッサクラスの実装に焦点を合わせてきました。レコードが Kinesis Data Streams からどのように取得されるかについて心配する必要はありません。 KCL は、新しいレコードが使用可能になるたびにレコードを取得し、レコードプロセッサを呼び出します。また、シャードカウントやコンシューマーインスタンス数についても心配しなくて済みます。ストリームがスケールアップされても、複数のシャードやコンシューマーインスタンスを処理するためにアプリケーションを書き直す必要はありません。
チェックポイントという用語は、ストリーム内のポイントを、これまでに消費および処理されたデータレコードまで記録することを意味します。アプリケーションがクラッシュすると、ストリームはストリームの先頭からではなく、その時点から読み込まれます。チェックポイントやそのさまざまな設計パターン、およびベストプラクティスは、この章の範囲外です。ただし、本番環境ではこのような問題に直面することがあります。
で学習したようにプロデューサーの実装、Kinesis Data Streams のput
オペレーションはパーティションキーを入力としてAPI受け取ります。Kinesis Data Streams は、レコードを複数のシャードに分割するメカニズムとしてパーティションキーを使用します (複数のシャードがストリームに含まれる場合)。同じパーティションキーは、常に同じシャードにルーティングされます。このため、同じパーティションキーを持つレコードはそのコンシューマーにのみ送信され、他のコンシューマーに送信されることはないと仮定して、特定のシャードを処理するコンシューマーを設計できます。したがって、コンシューマーのワーカーは、必要なデータが欠落しているかもしれないと心配することなく、同じパーティションキーを持つすべてのレコードを集計できます。
このアプリケーションでは、コンシューマーによるレコードの処理は集中的ではないため、1 つのシャードを使用して、KCLスレッドと同じスレッドで処理を実行できます。ただし、実際には、まずシャードの数のスケールアップを検討します。レコードの処理が大変になることが予想される場合は、異なるスレッドに処理を切り替えたり、スレッドプールを使用したりする必要があるかもしれません。このようにして、 KCLは新しいレコードをより迅速に取得でき、他のスレッドはレコードを並行して処理できます。マルチスレッド設計は簡単ではなく、高度な手法でアプローチする必要があるため、シャード数を増やすことが通常、スケールアップの最も効果的な方法です。