實施消費者 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

實施消費者

教程:使用KPL和 KCL 1.x 處理實時庫存數據所述的消費者應用程式會持續處理您在實施生產者時建立的股票交易串流。隨後,其將輸出每分鐘買進和賣出最多的熱門股票。該應用程式建置在 Kinesis 用戶端程式庫 (KCL) 之上,該程式庫可執行消費者應用程式常見的許多繁重工作。如需詳細資訊,請參閱開發 KCL 1.x 消費者

請查看原始碼並對照檢閱以下資訊。

StockTradesProcessor 類別

供您使用的消費者主要類別,將執行以下任務:

  • 讀取以引數形式傳入的應用程式名稱、串流名稱和區域名稱。

  • ~/.aws/credentials 讀取登入資料。

  • 建立 RecordProcessorFactory 執行個體以提供由 RecordProcessor 執行個體實作的 StockTradeRecordProcessor 執行個體。

  • 使用RecordProcessorFactory執行個體和標準組態 (包括串流名稱、認證和應用程式名稱) 建立 KCL Worker。

  • 工作者會為每個碎片 (已指派給此取用者執行個體) 建立新的執行緒,以持續循環從 Kinesis Data Streams 讀取記錄。接著,其將叫用 RecordProcessor 執行個體以處理收到的各個批次記錄。

StockTradeRecordProcessor 類別

RecordProcessor 執行個體的實作,而此執行個體將實作三個必要的方法:initializeprocessRecordsshutdown

顧名思義,initializeshutdown 分別供 Kinesis Client Library 用於使記錄處理器得知何時應準備好開始接收記錄以及何時應停止接收記錄,好讓程式庫能夠執行任何應用程式特定的設定和終止任務。這些方法的程式碼已為您提供。主要處理任務在 processRecords 方法中進行,而此方法將使用 processRecord 處理每筆記錄。後一種方法以幾乎全空的架構程式碼提供,讓您於下一個步驟進行實作,屆時將會有進一步說明。

另請注意 processRecord 的支援方法 reportStatsresetStats 的實作,其最初的原始碼為全空。

程式碼已為您實作 processRecords 方法,並將執行以下步驟:

  • 對每一筆傳入的記錄呼叫 processRecord

  • 若自從上次報告後已歷時至少 1 分鐘,請先呼叫 reportStats() 列印出最新統計資料,接著呼叫 resetStats() 清除統計資料以使下一個間隔僅包含新記錄。

  • 設定下一次報告時間。

  • 若自從最後一個檢查點過後已歷時至少 1 分鐘,請呼叫 checkpoint()

  • 設定下一次檢查點作業時間。

此方法使用 60 秒的間隔做為報告及檢查點作業率。如需檢查點作業的詳細資訊,請參閱有關消費者的其他信息

StockStats 類別

此類別針對一段時間內最熱門的股票提供資料保留與統計資料追蹤。其程式碼已為您提供且包含下列方法:

  • addStockTrade(StockTrade):將給定的 StockTrade 注入目前統計資料。

  • toString():以格式化字串的形式傳回統計資料。

該類別通過保持每個股票的交易總數和最大數量的運行計數來跟踪最受歡迎的股票。每當股票交易達成時,其將更新這些計數。

StockTradeRecordProcessor 類別的各個方法加入程式碼,如以下步驟所示。

實作消費者
  1. 實作 processRecord 方法,藉此執行個體化正確大小的 StockTrade 物件並將記錄資料加入該物件,且於發生問題時記錄警告。

    StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
  2. 實作簡易的 reportStats 方法。輸出格式可依照您的偏好逕自修改。

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. 最後,實作 resetStats 方法以便建立新的 stockStats 執行個體。

    stockStats = new StockStats();
執行消費者
  1. 執行您在實施生產者時撰寫的生產者,將模擬的股票交易記錄注入您的串流。

  2. 確認先前擷取的存取金鑰和秘密 key pair (建立IAM使用者時) 已儲存在檔案中~/.aws/credentials

  3. 使用以下引數執行 StockTradesProcessor 類別:

    StockTradesProcessor StockTradeStream us-west-2

    請注意,如果您是在 us-west-2 以外的區域建立串流,則此處必須改為指定該區域。

一分鐘後,您應會看到類似以下內容的輸出,而且此後每分鐘將重新整理一次輸出:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

有關消費者的其他信息

如果您已熟悉 Kinesis Client Library 的優點 (誠如 開發 KCL 1.x 消費者 及其他各處的介紹),可能會質疑為何應該在此使用該程式庫。雖然您只使用單一碎片串流和單一取用戶執行個體來處理它,但使用KCL. 將生產者一節的程式碼實作步驟對比消費者,您會發現實作消費者相較之下容易些。這在很大程度上是由於KCL提供的服務。

在此應用程式中,您專注於實作可處理個別記錄的記錄處理器類別。您不必擔心如何從 Kinesis Data Streams 擷取記錄;只要有新記錄可用,就會KCL擷取記錄並叫用記錄處理器。此外,您也不需要為碎片和消費者執行個體的數目傷腦筋。如果串流已擴展,您無須重新撰寫應用程式就能處理多個碎片或多個消費者執行個體。

術語檢查點意味著將流中的點記錄到目前為止已使用和處理的數據記錄。如果應用程式當機,則會從該點讀取串流,而不是從串流的開頭讀取。檢查點作業的主題及各種設計模式與最佳實務已超出本章討論範圍。不過,生產環境可能要面臨這方面的問題。

如您在中所學到的實施生產者,Kinesis Data Streams 中的put作業會API採用分區索引鍵做為輸入。Kinesis Data Streams 使用分割區索引鍵做為跨多個碎片分割記錄的機制 (若串流中有多個碎片)。相同的分割區索引鍵一律會路由至同一碎片。這使您能夠憑藉以下假定狀況,設計用於處理特定碎片的消費者:具有相同分割區索引鍵的記錄只會傳送至該消費者,凡是具有相同分割區索引鍵的記錄終究不會抵達任何其他消費者。因此,消費者的工作者可彙整具有相同分割區索引鍵的所有記錄,而不必擔心會遺失所需的資料。

在此應用程序中,消費者對記錄的處理並不密集,因此您可以使用一個分片並在與線程相同的線程中進行處理。KCL然而若是實際應用,請首先考慮擴展碎片數目。在某些情況下,您可能要切換由另一執行緒處理,或者預料將需密集處理記錄時使用執行緒集區。通過這種方式,KCL可以更快地獲取新記錄,而其他線程可以 parallel 處理記錄。多線程設計並不是微不足道的,應該使用先進的技術來處理,因此增加碎片計數通常是擴展的最有效方法。

後續步驟

(可選)擴展消費者