翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
プロデューサーを実装する
チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する のアプリケーションでは、株式市場取引をモニタリングする実際のシナリオが使用されます。次の原理によって、このシナリオをプロデューサーおよびサポートコード構造にマッピングすることができます。
ソースコードを参照し、次の情報を確認してください。
- StockTrade クラス
-
株式取引は、
StockTrade
クラスのインスタンスによって個別に表されます。このインスタンスには、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID などの属性が含まれます。このクラスは、既に実装されています。 - ストリームレコード
-
ストリームとは、一連のレコードのことです。レコードとは、JSON 形式による連続する
StockTrade
インスタンスの 1 つを表しています。例:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- StockTradeGenerator クラス
-
StockTradeGenerator
には、呼び出されるたびにランダムに生成された新しい株式取引を返す、getRandomTrade()
と呼ばれるメソッドが含まれています。このクラスは、既に実装されています。 - StockTradesWriter クラス
-
プロデューサーの
main
メソッドであるStockTradesWriter
は、継続的にランダム取引を取得し、以下のタスクを実行してそれらを Kinesis Data Streams に送信します。-
ストリーム名とリージョン名を入力として読み取ります。
-
AmazonKinesisClientBuilder
を作成します。 -
クライアントビルダーを使用してリージョン、認証情報、およびクライアント構成を設定します。
-
クライアントビルダーを使用して
AmazonKinesis
クライアントを構成します。 -
ストリームが存在し、アクティブであることを確認します (そうでない場合は、エラーで終了します)。
-
連続ループで、
StockTradeGenerator.getRandomTrade()
メソッドに続きsendStockTrade
メソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。
sendStockTrade
クラスのStockTradesWriter
メソッドには次のコードがあります。private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }
次のコードの詳細を参照してください。
-
PutRecord
API はバイト配列を想定するため、trade
を JSON 形式に変換する必要があります。この操作は、次の 1 行のコードによって行われます。byte[] bytes = trade.toJsonAsBytes();
-
取引を送信する前に、新しい
PutRecordRequest
インスタンス (この場合、putRecord
と呼ばれる) を作成する必要があります。PutRecordRequest putRecord = new PutRecordRequest();
各
PutRecord
の呼び出しには、ストリーム名、パーティションキー、およびデータ BLOB が必要です。次のコードによって、putRecord
メソッドを使用して、これらのフィールドをsetXxxx()
オブジェクトに追加します。putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));
この例では、株式チケットをパーティションキーとして使用することで、レコードを特定のシャードにマッピングしています。実際には、レコードがストリーム全体に均等に分散するように、シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。ストリームにデータを追加する方法の詳細については、ストリームにデータを追加するを参照してください。
次に、
putRecord
をクライアントに送信 (put
オペレーション) することができます。kinesisClient.putRecord(putRecord);
-
エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状態を記録します。
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
put
オペレーションの前後に try/catch ブロックを追加します。try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }
これは、ネットワークエラーや、ストリームがスループット制限を超えて抑制されたことが原因で、Kinesis Data Streams の
put
オペレーションが失敗することがあるためです。データが失われることがないように、単純な再試行を使用するなど、put
オペレーションの再試行ポリシーを慎重に検討することをお勧めします。 -
ステータスのログ記録は有益ですが、オプションです。
LOG.info("Putting trade: " + trade.toString());
ここに示されているプロデューサーでは、Kinesis Data Streams API のシングルレコード機能
PutRecord
が使用されています。実際には、個々のプロデューサーで大量のレコードが生成される場合があります。その場合、PutRecords
のマルチレコード機能を使用して、レコードのバッチを一度に送信する方が効率的です。詳細については、ストリームにデータを追加するを参照してください。 -
プロデューサーを実行するには
-
前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアがファイル
~/.aws/credentials
に保存されていることを確認します。 -
次の引数を指定して
StockTradeWriter
クラスを実行します。StockTradeStream us-west-2
us-west-2
以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。
次のような出力が表示されます:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Kinesis Data Streams によって株式取引ストリームが取り込まれます。