對於新專案,我們建議您使用適用於 Apache Flink Studio 的全新受管理服務,取代適用於應用程式的 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 易於使用且具備進階分析功能,讓您在幾分鐘內建置複雜的串流處理應用程式。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
滑動視窗
您可以定義以時間或資料列為基礎的視窗,而不需使用 GROUP BY
將記錄分組。您可以透過添加明確的 WINDOW
子句來完成此操作。
在這種情況下,當窗口隨著時間的推移而滑動時,Amazon Kinesis Data Analytics 會在串流上出現新記錄時發出輸出。Kinesis Data Analytics 會透過處理窗口中的資料列來發出此輸出。窗口可以在這種類型的處理中重疊,且記錄可以是多個窗口的一部分,並用每個窗口進行處理。以下範例解釋了滑動窗口。
考慮用一個簡單的查詢來計算串流上的記錄。此範例假設了 5 秒的窗口。在下面的示範串流中,新的記錄到達時間為 t1、t2、t6 和 t7,三個記錄到達時間為 t8 秒。
請謹記以下幾點:
-
此範例假設了 5 秒的窗口。5 秒的窗口會隨著時間連續滑動。
-
針對進入窗口的每一列,滑動窗口會發出一個輸出列。應用程式啟動後不久,即使 5 秒的窗口尚未通過,您也會看到查詢發出串流上每筆新記錄的輸出。例如,當記錄出現在第一秒和第二秒的查詢時,串流會發出輸出。稍後,查詢會在 5 秒的窗口中處理記錄。
-
窗口隨著時間滑動。如果串流上的舊記錄落在窗口外,則查詢不會發出輸出,除非串流上也有新記錄落在該 5 秒視窗內。
假設查詢在 t0 開始執行。然後會發生以下情況:
-
t0 時,查詢開始。查詢不會發出輸出(計數值),因為目前沒有記錄。
-
在 t1,一個新的記錄出現在串流上,查詢發出計數值 1。
-
在 t2,另一個記錄出現,查詢發出計數值 2。
-
5 秒的窗口會隨著時間滑動:
-
在 t3,滑動窗口 t3 到 t0
-
在 t4 (滑動窗口 t4 到 t0)
-
在 t5 滑動窗口 t5-t0
在所有這些時刻,5 秒的窗口具有相同的記錄-沒有新記錄。因此,查詢沒有發出任何輸出。
-
-
在時間 t6 時,5 秒的窗口為 (t6 到 t1)。查詢在 t6 偵測到一個新的記錄,因此它發出輸出 2。t1 處的記錄不再位於窗口中,並且不會計算在內。
-
在時間 t7 時,5 秒的窗口為 t7 到 t2。查詢在 t7 偵測到一個新的記錄,因此它發出輸出 2。t2 處的記錄不再位於 5 秒窗口中,因此不會計算在內。
-
在時間 t8 時,5 秒的窗口為 t8 到 t3。查詢偵測到三個新記錄,因此會發出記錄計數 5。
總之,窗口是一個固定的大小,並隨著時間的推移滑動。新的記錄出現時,查詢會發出輸出。
注意
建議您使用不超過一個小時的滑動窗口。如果您使用較長的窗口,在定期系統維護後,應用程式需要更長的時間重新啟動。這是因為必須再次從串流中讀取來源資料。
下列範例查詢使用 WINDOW
子句來定義窗口和執行彙總。因為查詢未指定 GROUP BY
,所以查詢會使用滑動窗口方法來處理串流上的記錄。
範例 1:使用 1 分鐘滑動窗口處理串流
請考慮在入門練習中填入應用程式內串流的示範串流 SOURCE_SQL_STREAM_001
。以下為其結構描述。
(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)
假設您希望應用程式使用滑動 1 分鐘窗口來計算彙總。也就是說,對於出現在串流上的每個新記錄,您希望應用程式套用彙總到前 1 分鐘窗口的記錄,以發出輸出。
您可以使用下列時間窗口查詢。查詢會使用 WINDOW
子句來定義 1 分鐘的範圍間隔。在 WINDOW
子句中,PARTITION BY
會按滑動窗口內的股票代號值對記錄進行分組。
SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
若要測試查詢
-
按照入門練習設置應用程式。
-
用上述
SELECT
查詢取代應用程式碼中的SELECT
陳述式。產生的應用程式碼如下所示。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
範例 2:查詢在滑動視窗上套用彙總
示範串流上的下列查詢,會傳回 10 秒窗口中每個股票代號價格變動百分比的平均值。
SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
若要測試查詢
-
按照入門練習設置應用程式。
-
用上述
SELECT
查詢取代應用程式碼中的SELECT
陳述式。產生的應用程式碼如下所示。CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);
範例 3:查詢相同串流上多個滑動視窗的資料
您可以撰寫查詢以發出輸出,其中每個資料欄值,是使用同一個資料流上定義的不同滑動窗口來計算。
在下面的例子中,查詢發出輸出股票代號,價格,a2 和 a10。它發出輸出給為兩列移動平均線穿過十列移動平均線的股票代碼。a2
和 a10
欄值衍生自兩列與十列滑動窗口。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);
若要針對示範串流測試此查詢,請遵循 範例 1 中所述的測試程序。