滑動視窗 - 亞馬遜 Kinesis SQL 應用程式資料分析開發人員指南

對於新專案,我們建議您使用適用於 Apache Flink Studio 的全新受管理服務,取代適用於應用程式的 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 易於使用且具備進階分析功能,讓您在幾分鐘內建置複雜的串流處理應用程式。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

滑動視窗

您可以定義以時間或資料列為基礎的視窗,而不需使用 GROUP BY 將記錄分組。您可以透過添加明確的 WINDOW 子句來完成此操作。

在這種情況下,當窗口隨著時間的推移而滑動時,Amazon Kinesis Data Analytics 會在串流上出現新記錄時發出輸出。Kinesis Data Analytics 會透過處理窗口中的資料列來發出此輸出。窗口可以在這種類型的處理中重疊,且記錄可以是多個窗口的一部分,並用每個窗口進行處理。以下範例解釋了滑動窗口。

考慮用一個簡單的查詢來計算串流上的記錄。此範例假設了 5 秒的窗口。在下面的示範串流中,新的記錄到達時間為 t1、t2、t6 和 t7,三個記錄到達時間為 t8 秒。

Timeline showing record arrivals at t1, t2, t6, t7, and multiple at t8 within a 5-second window.

請謹記以下幾點:

  • 此範例假設了 5 秒的窗口。5 秒的窗口會隨著時間連續滑動。

  • 針對進入窗口的每一列,滑動窗口會發出一個輸出列。應用程式啟動後不久,即使 5 秒的窗口尚未通過,您也會看到查詢發出串流上每筆新記錄的輸出。例如,當記錄出現在第一秒和第二秒的查詢時,串流會發出輸出。稍後,查詢會在 5 秒的窗口中處理記錄。

  • 窗口隨著時間滑動。如果串流上的舊記錄落在窗口外,則查詢不會發出輸出,除非串流上也有新記錄落在該 5 秒視窗內。

假設查詢在 t0 開始執行。然後會發生以下情況:

  1. t0 時,查詢開始。查詢不會發出輸出(計數值),因為目前沒有記錄。

    Timeline showing a stream starting at t0 with no output initially indicated.
  2. 在 t1,一個新的記錄出現在串流上,查詢發出計數值 1。

    Timeline showing a stream with a record appearing at time t1, and an arrow pointing to t0.
  3. 在 t2,另一個記錄出現,查詢發出計數值 2。

    Timeline showing stream events at different time points, with two vertical bars at the end.
  4. 5 秒的窗口會隨著時間滑動:

    • 在 t3,滑動窗口 t3 到 t0

    • 在 t4 (滑動窗口 t4 到 t0)

    • 在 t5 滑動窗口 t5-t0

    在所有這些時刻,5 秒的窗口具有相同的記錄-沒有新記錄。因此,查詢沒有發出任何輸出。

    Timeline showing stream with multiple time points and colored rectangles representing data windows.
  5. 在時間 t6 時,5 秒的窗口為 (t6 到 t1)。查詢在 t6 偵測到一個新的記錄,因此它發出輸出 2。t1 處的記錄不再位於窗口中,並且不會計算在內。

    Timeline showing stream events at different time points with a sliding 5-second window.
  6. 在時間 t7 時,5 秒的窗口為 t7 到 t2。查詢在 t7 偵測到一個新的記錄,因此它發出輸出 2。t2 處的記錄不再位於 5 秒窗口中,因此不會計算在內。

    Timeline showing stream events and time points from t0 to t7, with a 5-second window highlighted.
  7. 在時間 t8 時,5 秒的窗口為 t8 到 t3。查詢偵測到三個新記錄,因此會發出記錄計數 5。

    Timeline showing stream events with orange bars representing record counts at different time intervals.

總之,窗口是一個固定的大小,並隨著時間的推移滑動。新的記錄出現時,查詢會發出輸出。

注意

建議您使用不超過一個小時的滑動窗口。如果您使用較長的窗口,在定期系統維護後,應用程式需要更長的時間重新啟動。這是因為必須再次從串流中讀取來源資料。

下列範例查詢使用 WINDOW 子句來定義窗口和執行彙總。因為查詢未指定 GROUP BY,所以查詢會使用滑動窗口方法來處理串流上的記錄。

範例 1:使用 1 分鐘滑動窗口處理串流

請考慮在入門練習中填入應用程式內串流的示範串流 SOURCE_SQL_STREAM_001。以下為其結構描述。

(TICKER_SYMBOL VARCHAR(4), SECTOR varchar(16), CHANGE REAL, PRICE REAL)

假設您希望應用程式使用滑動 1 分鐘窗口來計算彙總。也就是說,對於出現在串流上的每個新記錄,您希望應用程式套用彙總到前 1 分鐘窗口的記錄,以發出輸出。

您可以使用下列時間窗口查詢。查詢會使用 WINDOW 子句來定義 1 分鐘的範圍間隔。在 WINDOW 子句中,PARTITION BY會按滑動窗口內的股票代號值對記錄進行分組。

SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);
若要測試查詢
  1. 按照入門練習設置應用程式。

  2. 用上述 SELECT 查詢取代應用程式碼中的 SELECT 陳述式。產生的應用程式碼如下所示。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Min_Price double, Max_Price double, Avg_Price double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, MIN(Price) OVER W1 AS Min_Price, MAX(Price) OVER W1 AS Max_Price, AVG(Price) OVER W1 AS Avg_Price FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING);

範例 2:查詢在滑動視窗上套用彙總

示範串流上的下列查詢,會傳回 10 秒窗口中每個股票代號價格變動百分比的平均值。

SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

若要測試查詢
  1. 按照入門練習設置應用程式。

  2. 用上述 SELECT 查詢取代應用程式碼中的 SELECT 陳述式。產生的應用程式碼如下所示。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(10), Avg_Percent_Change double); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM Ticker_Symbol, AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change FROM "SOURCE_SQL_STREAM_001" WINDOW W1 AS ( PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING);

範例 3:查詢相同串流上多個滑動視窗的資料

您可以撰寫查詢以發出輸出,其中每個資料欄值,是使用同一個資料流上定義的不同滑動窗口來計算。

在下面的例子中,查詢發出輸出股票代號,價格,a2 和 a10。它發出輸出給為兩列移動平均線穿過十列移動平均線的股票代碼。a2a10 欄值衍生自兩列與十列滑動窗口。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( ticker_symbol VARCHAR(12), price double, average_last2rows double, average_last10rows double); CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, price, avg(price) over last2rows, avg(price) over last10rows FROM SOURCE_SQL_STREAM_001 WINDOW last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING), last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

若要針對示範串流測試此查詢,請遵循 範例 1 中所述的測試程序。