實施生產者 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

實施生產者

本教學課程使用真實情境的股票市場交易監控。以下原則簡要說明此情境如何對應到生產者及其支援的程式碼結構。

請查看原始碼並對照檢閱以下資訊。

StockTrade 類

個別股票交易由 StockTrade類別的一個實例表示。此執行個體包含若干屬性,如股票代號、價格、股份數、交易類型 (買進或賣出) 以及唯一識別該交易的 ID。程式碼已為您實作此類別。

串流記錄

串流是一連串的記錄。記錄是JSON格式StockTrade執行個體的序列化。例如:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator 類

StockTradeGenerator 有一個叫做的方法getRandomTrade(),每次調用時都會返回一個新的隨機生成的股票交易。程式碼已為您實作此類別。

StockTradesWriter 類

生產者的main方法會 StockTradesWriter持續擷取隨機交易,然後透過執行下列工作將其傳送至 Kinesis Data Streams:

  1. 讀取數據流名稱和區域名稱作為輸入。

  2. 使用KinesisAsyncClientBuilder來設定區域、認證和用戶端組態。

  3. 檢查串流是否存在且處於作用中狀態 (否則將結束此方法並顯示錯誤)。

  4. 在連續迴圈中依序呼叫 StockTradeGenerator.getRandomTrade() 方法和 sendStockTrade 方法,每隔 100 毫秒將交易傳送至串流。

sendStockTrade 類別的 StockTradesWriter 方法含有以下程式碼:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

請參閱以下的程式碼詳解:

  • PutRecordAPI期望一個字節數組,並且您必須將交易轉換為JSON格式。這一行程式碼將執行該項操作:

    byte[] bytes = trade.toJsonAsBytes();
  • 傳送交易之前,您必須先建立新的 PutRecordRequest 執行個體 (本例中稱為要求)。每個 request 都需要串流名稱、分割區索引鍵和資料 Blob。

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    此範例使用股票股票代碼做為分割索引鍵,該索引鍵會將記錄對應至特定的碎片。實際上,每個碎片應該會有成千上百的分割區索引鍵,使記錄均勻地分佈於串流中。如需如何加入資料至串流的詳細資訊,請參閱將資料寫入 Amazon Kinesis Data Streams

    現在 request 已準備好傳送至用戶端 (put 操作):

    kinesisClient.putRecord(request).get();
  • 錯誤檢查和日誌記錄肯定是頗為實用的附加功能。此程式碼將記錄錯誤情況:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    put 操作的周圍加入 try/catch 區塊:

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    這麼做是因為 Kinesis Data Streams put 操作可能由於網路錯誤或是資料串流達到其傳輸量限制並受到調節而導致失敗。建議您仔細考慮put作業的重試原則,以避免資料遺失,例如使用重試。

  • 狀態記錄也很實用,但可有可無:

    LOG.info("Putting trade: " + trade.toString());

此處顯示的生產者使用 Kinesis Data Streams API 單一記錄功能。PutRecord實際上,如果個別的生產者將產生許多記錄,則使用 PutRecords 的多筆記錄功能並一次性傳送各個批次的記錄通常會更有效率。如需詳細資訊,請參閱將資料寫入 Amazon Kinesis Data Streams

執行生產者
  1. 確認在建立IAM策略和使用者中擷取的存取金鑰和秘密金鑰對已儲存在檔案 ~/.aws/credentials 中。

  2. 使用以下引數執行 StockTradeWriter 類別:

    StockTradeStream us-west-2

    如果您是在 us-west-2 以外的區域建立串流,則此處必須改為指定該區域。

您應該會看到類似下列的輸出:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

您的股票交易現在正由 Kinesis Data Streams 擷取中。

後續步驟

實施消費者