範例:轉換多個資料類型 - 亞馬遜 Kinesis SQL 應用程式資料分析開發人員指南

對於新專案,我們建議您使用適用於 Apache Flink Studio 的全新受管理服務,取代適用於應用程式的 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 易於使用且具備進階分析功能,讓您在幾分鐘內建置複雜的串流處理應用程式。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

範例:轉換多個資料類型

擷取、轉換和載入 (ETL) 應用程式的常見需求,是在串流來源上處理多個記錄類型。您可以建立 Kinesis Data Analytics 應用程式來處理這些類型的串流來源。程序如下:

  1. 首先,將串流來源對應到應用程式內輸入串流,類似於所有其他 Kinesis Data Analytics 應用程式。

  2. 然後,在應用程式碼中撰寫 SQL 陳述式,從應用程式內輸入串流擷取特定類型的資料列。接著將它們插入個別的應用程式內串流。(您可以在應用程式碼中建立其他應用程式內串流。)

在本練習中,您有一個接收兩種類型 (OrderTrade) 記錄的串流來源。這些是股票訂單和相應的交易。針對每個訂單,可能有零個或多個交易。每種類型的範例記錄如下所示:

訂單記錄

{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}

交易記錄

{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}

當您使用建立應用程式時 AWS Management Console,主控台會針對建立的應用程式內輸入串流顯示下列推斷結構描述。根據預設,主控台會命名此應用程式內串流 SOURCE_SQL_STREAM_001

顯示格式化的應用程式內串流範例之主控台螢幕擷取畫面

儲存組態時,Amazon Kinesis Data Analytics 會持續從串流來源讀取資料,並在應用程式內串流中插入資料列。您現在可以對應用程式內串流中的資料執行分析。

在此範例的應用程式碼中,先建立兩個額外的應用程式內串流,Order_StreamTrade_Stream。然後,您可以根據記錄類型從 SOURCE_SQL_STREAM_001 串流中篩選列,並使用幫浦將它們插入新建立的串流中。如需此編碼模式的相關資訊,請參閱 應用程式碼

  1. 將訂單和交易列過濾到個別的應用程式內串流:

    1. 篩選 SOURCE_SQL_STREAM_001 中的訂單記錄,並將訂單儲存在 Order_Stream 中。

      --Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
    2. 篩選 SOURCE_SQL_STREAM_001 中的交易記錄,並將訂單儲存在 Trade_Stream 中。

      --Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
  2. 現在,您可以對這些串流執行其他分析。在此範例,於一分鐘的翻轉視窗中計算股票代碼的交易數量,並將結果保存到另一個 DESTINATION_SQL_STREAM 串流中。

    --do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);

    您會看到下列結果。

    顯示 SQL 結果標籤上結果的主控台螢幕擷取畫面。
後續步驟

步驟 1:準備資料