实现制作器 - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

实现制作器

教程:使用KPL和 KCL 1.x 处理实时股票数据 中的应用程序使用实际股票市场交易监控场景。以下准则简要说明了此场景如何映射到创建器和支持的代码结构。

请参阅源代码并查看以下信息。

StockTrade 班级

单次股票交易由一个 StockTrade 类实例表示。此实例包含一些属性,如股票代号、价格、股份数、交易类型(买入或卖出)以及唯一标识交易的 ID。将为您实现此类。

流记录

流是一个记录序列。记录是JSON格式化StockTrade实例的序列化。例如:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator 班级

StockTradeGenerator 包含一个名为 getRandomTrade() 的方法,当调用此方法时,它将返回一个随机生成的新股票交易。将为您实现此类。

StockTradesWriter 班级

创建器的 main 方法 StockTradesWriter 将持续检索随机交易,然后通过执行以下任务将该交易发送到 Kinesis Data Streams:

  1. 将流名称和区域名称作为输入读取。

  2. 创建一个 AmazonKinesisClientBuilder

  3. 使用客户端生成器来设置区域、凭证和客户端配置。

  4. 使用客户端生成器构建一个 AmazonKinesis 客户端。

  5. 检查流是否存在且处于活动状态 (如果不是这样,它将退出并显示错误)。

  6. 在连续循环中,会依次调用 StockTradeGenerator.getRandomTrade() 方法和 sendStockTrade 方法以便每 100 毫秒将交易发送到流一次。

sendStockTrade 类的 StockTradesWriter 方法具有以下代码:

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

请参阅以下代码细分:

  • PutRecordAPI需要一个字节数组,您必须将其trade转换为JSON格式。此行代码将执行该操作:

    byte[] bytes = trade.toJsonAsBytes();
  • 您需要先创建新的 PutRecordRequest 实例(此示例中称为 putRecord),然后才能发送交易:

    PutRecordRequest putRecord = new PutRecordRequest();

    每个 PutRecord 调用均需要流名称、分区键和数据 Blob。以下代码使用 putRecord 对象的 setXxxx() 方法来填充该对象中的这些字段:

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    该示例使用股票行情自动收录器作为将记录映射到特定分片的分区键。实际上,每个分片应具有数百或数千个分区键,以便记录均匀地分布在流中。有关如何将数据添加到流的更多信息,请参阅 向直播中添加数据

    现在 putRecord 已准备好发送到客户端(put 操作):

    kinesisClient.putRecord(putRecord);
  • 错误检查和日志记录始终是有用的附加功能。此代码将记录错误条件:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    围绕 put 操作添加 try/catch 块:

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    这是因为 Kinesis Data Streams put 操作可能因网络错误或数据流达到其吞吐量限额并受到限制而导致失败。我们建议您仔细考虑您的put操作重试策略,以避免数据丢失,例如使用重试。

  • 状态日志记录很有用,但它是可选的:

    LOG.info("Putting trade: " + trade.toString());

此处显示的制作人使用 Kinesis Data API Streams 的单条记录功能PutRecord。实际上,如果单个创建者生成许多记录,则使用 PutRecords 的多记录功能并一次性发送批量记录通常会更有效。有关更多信息,请参阅 向直播中添加数据

运行创建器
  1. 确认之前检索到的访问密钥和私有密钥对(创建IAM用户时)已保存在文件中~/.aws/credentials

  2. 使用以下参数运行 StockTradeWriter 类:

    StockTradeStream us-west-2

    如果您在 us-west-2 之外的区域中创建流,则必须改为在此处指定该区域。

您应该可以看到类似于如下所示的输出内容:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

您的股票交易流现在正由 Kinesis Data Streams 摄取。

后续步骤

实现消费者