本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
实现产生器
此教程使用股票市场交易监控的实际场景。以下准则简要说明了此场景如何映射到创建器及其支持的代码结构。
请参阅源代码
- StockTrade 类
-
单次股票交易由 StockTrade 类的一个实例表示。此实例包含一些属性,如股票代号、价格、股份数、交易类型(买入或卖出)以及唯一标识交易的 ID。将为您实现此类。
- 流记录
-
流是一个记录序列。记录是 JSON 格式的
StockTrade
实例序列化。例如:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- StockTradeGenerator 类
-
StockTradeGenerator 包含一个名为
getRandomTrade()
的方法,当调用此方法时,它将返回一个随机生成的新股票交易。将为您实现此类。 - StockTradesWriter 类
-
创建器的
main
方法 StockTradesWriter 将持续检索随机交易,然后通过执行以下任务将该交易发送到 Kinesis Data Streams:-
将数据流名称和区域名称作为输入读取。
-
使用
KinesisAsyncClientBuilder
来设置区域、凭证和客户端配置。 -
检查流是否存在且处于活动状态 (如果不是这样,它将退出并显示错误)。
-
在连续循环中,会依次调用
StockTradeGenerator.getRandomTrade()
方法和sendStockTrade
方法以便每 100 毫秒将交易发送到流一次。
sendStockTrade
类的StockTradesWriter
方法具有以下代码:private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }
请参阅以下代码细分:
-
PutRecord
API 需要一个字节数组,并且您必须将交易转换为 JSON 格式。此行代码将执行该操作:byte[] bytes = trade.toJsonAsBytes();
-
您需要先创建新的
PutRecordRequest
实例(此示例中称为请求),然后才能发送交易。每个request
均需要流名称、分区键和数据 Blob。PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();
该示例使用股票代码作为将记录映射到特定分片的分区键。实际上,每个分片应具有数百或数千个分区键,以便记录均匀地分布在流中。有关如何将数据添加到流的更多信息,请参阅 将数据写入 Amazon Kinesis Data Streams。
现在
request
已准备好发送到客户端(put 操作):kinesisClient.putRecord(request).get();
-
错误检查和日志记录始终是有用的附加功能。此代码将记录错误条件:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
围绕
put
操作添加 try/catch 块:try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }
这是因为 Kinesis 数据流 put 操作可能因网络错误或数据流达到其吞吐量限额并受到限制而导致失败。建议您仔细考虑针对
put
操作的重试策略以避免数据丢失,例如使用重试。 -
状态日志记录很有用,但它是可选的:
LOG.info("Putting trade: " + trade.toString());
此处显示的创建器使用 Kinesis Data Streams API 单记录功能
PutRecord
。实际上,如果单个创建者生成许多记录,则使用PutRecords
的多记录功能并一次性发送批量记录通常会更有效。有关更多信息,请参阅 将数据写入 Amazon Kinesis Data Streams。 -
运行创建器
-
验证在创建 IAM 策略和用户中检索到的访问密钥和私有密钥对是否保存到文件
~/.aws/credentials
中。 -
使用以下参数运行
StockTradeWriter
类:StockTradeStream us-west-2
如果您在
us-west-2
之外的区域中创建流,则必须改为在此处指定该区域。
您应该可以看到类似于如下所示的输出内容:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
您的股票交易现在正由 Kinesis Data Streams 摄取。