對於新專案,我們建議您使用適用於 Apache Flink Studio 的全新受管理服務,取代適用於應用程式的 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 易於使用且具備進階分析功能,讓您在幾分鐘內建置複雜的串流處理應用程式。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
範例:轉換多個資料類型
擷取、轉換和載入 (ETL) 應用程式的常見需求,是在串流來源上處理多個記錄類型。您可以建立 Kinesis Data Analytics 應用程式來處理這些類型的串流來源。程序如下:
-
首先,將串流來源對應到應用程式內輸入串流,類似於所有其他 Kinesis Data Analytics 應用程式。
-
然後,在應用程式碼中撰寫 SQL 陳述式,從應用程式內輸入串流擷取特定類型的資料列。接著將它們插入個別的應用程式內串流。(您可以在應用程式碼中建立其他應用程式內串流。)
在本練習中,您有一個接收兩種類型 (Order
和 Trade
) 記錄的串流來源。這些是股票訂單和相應的交易。針對每個訂單,可能有零個或多個交易。每種類型的範例記錄如下所示:
訂單記錄
{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}
交易記錄
{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}
當您使用建立應用程式時 AWS Management Console,主控台會針對建立的應用程式內輸入串流顯示下列推斷結構描述。根據預設,主控台會命名此應用程式內串流 SOURCE_SQL_STREAM_001
。
儲存組態時,Amazon Kinesis Data Analytics 會持續從串流來源讀取資料,並在應用程式內串流中插入資料列。您現在可以對應用程式內串流中的資料執行分析。
在此範例的應用程式碼中,先建立兩個額外的應用程式內串流,Order_Stream
和 Trade_Stream
。然後,您可以根據記錄類型從 SOURCE_SQL_STREAM_001
串流中篩選列,並使用幫浦將它們插入新建立的串流中。如需此編碼模式的相關資訊,請參閱 應用程式碼。
-
將訂單和交易列過濾到個別的應用程式內串流:
-
篩選
SOURCE_SQL_STREAM_001
中的訂單記錄,並將訂單儲存在Order_Stream
中。--Create Order_Stream. CREATE OR REPLACE STREAM "Order_Stream" ( order_id integer, order_type varchar(10), ticker varchar(4), order_price DOUBLE, record_type varchar(10) ); CREATE OR REPLACE PUMP "Order_Pump" AS INSERT INTO "Order_Stream" SELECT STREAM oid, otype,oticker, oprice, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Order';
-
篩選
SOURCE_SQL_STREAM_001
中的交易記錄,並將訂單儲存在Trade_Stream
中。--Create Trade_Stream. CREATE OR REPLACE STREAM "Trade_Stream" (trade_id integer, order_id integer, trade_price DOUBLE, ticker varchar(4), record_type varchar(10) ); CREATE OR REPLACE PUMP "Trade_Pump" AS INSERT INTO "Trade_Stream" SELECT STREAM tid, toid, tprice, tticker, recordtype FROM "SOURCE_SQL_STREAM_001" WHERE recordtype = 'Trade';
-
-
現在,您可以對這些串流執行其他分析。在此範例,於一分鐘的翻轉視窗中計算股票代碼的交易數量,並將結果保存到另一個
DESTINATION_SQL_STREAM
串流中。--do some analytics on the Trade_Stream and Order_Stream. -- To see results in console you must write to OPUT_SQL_STREAM. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker varchar(4), trade_count integer ); CREATE OR REPLACE PUMP "Output_Pump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker, count(*) as trade_count FROM "Trade_Stream" GROUP BY ticker, FLOOR("Trade_Stream".ROWTIME TO MINUTE);
您會看到下列結果。