本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
实现制作器
教程:使用KPL和 KCL 1.x 处理实时股票数据 中的应用程序使用实际股票市场交易监控场景。以下准则简要说明了此场景如何映射到创建器和支持的代码结构。
请参阅源代码并查看以下信息。
- StockTrade 班级
-
单次股票交易由一个
StockTrade
类实例表示。此实例包含一些属性,如股票代号、价格、股份数、交易类型(买入或卖出)以及唯一标识交易的 ID。将为您实现此类。 - 流记录
-
流是一个记录序列。记录是JSON格式化
StockTrade
实例的序列化。例如:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- StockTradeGenerator 班级
-
StockTradeGenerator
包含一个名为getRandomTrade()
的方法,当调用此方法时,它将返回一个随机生成的新股票交易。将为您实现此类。 - StockTradesWriter 班级
-
创建器的
main
方法StockTradesWriter
将持续检索随机交易,然后通过执行以下任务将该交易发送到 Kinesis Data Streams:-
将流名称和区域名称作为输入读取。
-
创建一个
AmazonKinesisClientBuilder
。 -
使用客户端生成器来设置区域、凭证和客户端配置。
-
使用客户端生成器构建一个
AmazonKinesis
客户端。 -
检查流是否存在且处于活动状态 (如果不是这样,它将退出并显示错误)。
-
在连续循环中,会依次调用
StockTradeGenerator.getRandomTrade()
方法和sendStockTrade
方法以便每 100 毫秒将交易发送到流一次。
sendStockTrade
类的StockTradesWriter
方法具有以下代码:private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }
请参阅以下代码细分:
-
PutRecord
API需要一个字节数组,您必须将其trade
转换为JSON格式。此行代码将执行该操作:byte[] bytes = trade.toJsonAsBytes();
-
您需要先创建新的
PutRecordRequest
实例(此示例中称为putRecord
),然后才能发送交易:PutRecordRequest putRecord = new PutRecordRequest();
每个
PutRecord
调用均需要流名称、分区键和数据 Blob。以下代码使用putRecord
对象的setXxxx()
方法来填充该对象中的这些字段:putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));
该示例使用股票行情自动收录器作为将记录映射到特定分片的分区键。实际上,每个分片应具有数百或数千个分区键,以便记录均匀地分布在流中。有关如何将数据添加到流的更多信息,请参阅 向直播中添加数据。
现在
putRecord
已准备好发送到客户端(put
操作):kinesisClient.putRecord(putRecord);
-
错误检查和日志记录始终是有用的附加功能。此代码将记录错误条件:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
围绕
put
操作添加 try/catch 块:try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }
这是因为 Kinesis Data Streams
put
操作可能因网络错误或数据流达到其吞吐量限额并受到限制而导致失败。我们建议您仔细考虑您的put
操作重试策略,以避免数据丢失,例如使用重试。 -
状态日志记录很有用,但它是可选的:
LOG.info("Putting trade: " + trade.toString());
此处显示的制作人使用 Kinesis Data API Streams 的单条记录功能
PutRecord
。实际上,如果单个创建者生成许多记录,则使用PutRecords
的多记录功能并一次性发送批量记录通常会更有效。有关更多信息,请参阅 向直播中添加数据。 -
运行创建器
-
确认之前检索到的访问密钥和私有密钥对(创建IAM用户时)已保存在文件中
~/.aws/credentials
。 -
使用以下参数运行
StockTradeWriter
类:StockTradeStream us-west-2
如果您在
us-west-2
之外的区域中创建流,则必须改为在此处指定该区域。
您应该可以看到类似于如下所示的输出内容:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
您的股票交易流现在正由 Kinesis Data Streams 摄取。