本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
實施生產者
教程:使用KPL和 KCL 1.x 處理實時庫存數據所述的應用程式使用真實情境的股票市場交易監控。以下原則簡要說明此情境如何對應到生產者和支援的程式碼結構。
請查看原始碼並對照檢閱以下資訊。
- StockTrade 類別
-
單次股票交易是由
StockTrade
類別的執行個體表示。此執行個體包含若干屬性,如股票代號、價格、股份數、交易類型 (買進或賣出) 以及唯一識別該交易的 ID。程式碼已為您實作此類別。 - 串流記錄
-
串流是一連串的記錄。記錄是JSON格式
StockTrade
執行個體的序列化。例如:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- StockTradeGenerator 類別
-
StockTradeGenerator
具有getRandomTrade()
方法,每次叫用後將傳回隨機產生的新股票交易。程式碼已為您實作此類別。 - StockTradesWriter 類別
-
生產者的
main
方法StockTradesWriter
會持續擷取隨機交易,然後透過執行以下任務將該交易傳送至 Kinesis Data Streams:-
讀取串流名稱和區域名稱做為輸入。
-
建立
AmazonKinesisClientBuilder
。 -
使用用戶端建置器設定區域、登入資料和用戶端組態。
-
使用用戶端建置器建置
AmazonKinesis
用戶端。 -
檢查串流是否存在且處於作用中狀態 (否則將結束此方法並顯示錯誤)。
-
在連續迴圈中依序呼叫
StockTradeGenerator.getRandomTrade()
方法和sendStockTrade
方法,每隔 100 毫秒將交易傳送至串流。
sendStockTrade
類別的StockTradesWriter
方法含有以下程式碼:private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }
請參閱以下的程式碼詳解:
-
PutRecord
API需要一個字節數組,並且您必須轉換trade
為JSON格式。這一行程式碼將執行該項操作:byte[] bytes = trade.toJsonAsBytes();
-
傳送交易之前,您必須先建立新的
PutRecordRequest
執行個體 (本例中其名稱為putRecord
):PutRecordRequest putRecord = new PutRecordRequest();
每次呼叫
PutRecord
都需要串流名稱、分割區索引鍵和資料 Blob。以下程式碼使用putRecord
物件的setXxxx()
方法填入這些欄位:putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));
本範例使用股票代號做為分割區索引鍵,將記錄對應到特定碎片。實際上,每個碎片應該會有成千上百的分割區索引鍵,使記錄均勻地分佈於串流中。如需如何加入資料至串流的詳細資訊,請參閱將資料新增至串流。
現在
putRecord
已準備好傳送用戶端 (put
操作):kinesisClient.putRecord(putRecord);
-
錯誤檢查和日誌記錄肯定是頗為實用的附加功能。此程式碼將記錄錯誤情況:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
在
put
操作的周圍加入 try/catch 區塊:try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }
這麼做是因為 Kinesis Data Streams
put
操作可能由於網路錯誤或是串流達到其輸送量限制並受到調節而導致失敗。我們建議您仔細考慮put
操作的重試政策,以避免數據丟失,例如使用重試。 -
狀態記錄也很實用,但可有可無:
LOG.info("Putting trade: " + trade.toString());
此處顯示的生產者使用 Kinesis Data Streams API 單一記錄功能。
PutRecord
實際上,如果個別的生產者將產生許多記錄,則使用PutRecords
的多筆記錄功能並一次性傳送各個批次的記錄通常會更有效率。如需詳細資訊,請參閱將資料新增至串流。 -
執行生產者
-
確認先前擷取的存取金鑰和秘密 key pair (建立IAM使用者時) 已儲存在檔案中
~/.aws/credentials
。 -
使用以下引數執行
StockTradeWriter
類別:StockTradeStream us-west-2
如果您是在
us-west-2
以外的區域建立串流,則此處必須改為指定該區域。
您應該會看到類似下列的輸出:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
您的股票交易串流現在正由 Kinesis Data Streams 擷取中。