範例:交錯視窗 - Amazon Kinesis Data Analytics for SQL Applications 開發人員指南

在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:

1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。

2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

範例:交錯視窗

當視窗化查詢為每個不同的分割區索引鍵處理個別視窗時,從具有相符索引鍵的資料到達時開始,即為交錯視窗。如需詳細資訊,請參閱 交錯窗口。這個 Amazon Kinesis Data Analytics 範例使用 EVENT_TIME 和 TICKER 欄來建立交錯視窗。來源串流包含六個記錄的群組,其中有相同的 EVENT_TIME 和 TICKER 值,這些資料會在一分鐘內到達,但不一定具有相同的分鐘值 (例如 18:41:xx)。

在此範例中,將下列記錄於指定時間寫入 Amazon Kinesis 資料串流。指令碼不會將時間寫入串流,但應用程式擷取記錄的時間會寫入 ROWTIME 欄位:

{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:30 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:40 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:50 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:00 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:10 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:21 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:31 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:41 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:51 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:01 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:11 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:21 ...

接著,在 AWS Management Console 建立 Kinesis Data Analytics 應用程式,並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄,並推斷含有兩個資料欄 (EVENT_TIMETICKER) 的應用程式內結構描述,如下所示。

顯示應用程式內結構描述的主控台螢幕擷取畫面,其中包含價格和股票代碼欄。

您可以將應用程式碼與 COUNT 函數搭配使用,以建立資料的視窗化彙總。接著將產生的資料插入另一個應用程式內串流,如下列螢幕擷取畫面所示:

顯示應用程式內串流生成資料的主控台螢幕擷取畫面。

在下列程序中,您會建立 Kinesis Data Analytics 應用程式,根據 EVENT_TIME 和 TICKER,在交錯視窗彙總輸入串流中的值。

步驟 1:建立 Kinesis Data Stream

建立 Amazon Kinesis 資料串流,並填入紀錄,如下所示:

  1. 前往 https://console.aws.amazon.com/kinesis 登入 AWS Management Console 並開啟 Kinesis 主控台。

  2. 在導覽窗格中選擇資料串流

  3. 選擇建立 Kinesis 串流,然後建立內含一個碎片之串流。如需詳細資訊,請參閱 Amazon Kinesis Data Streams 開發人員指南中的建立串流

  4. 若要在生產環境中將記錄寫入 Kinesis 資料串流,建議您使用 Kinesis Producer LibraryKinesis Data Streams API。為了簡單起見,這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會在一分鐘的時間內,連續將具有相同隨機 EVENT_TIME 和股票代號的六筆記錄寫入串流。讓指令碼持續執行,以便在稍後的步驟產生應用程式結構描述。

    import datetime import json import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) return { "EVENT_TIME": event_time.isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), } def generate(stream_name, kinesis_client): while True: data = get_data() # Send six records, ten seconds apart, with the same event time and ticker for _ in range(6): print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey", ) time.sleep(10) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

步驟 2:建立 Kinesis Data Analytics 應用程式

建立 Kinesis Data Analytics 應用程式,如下所示。

  1. 前往 https://console.aws.amazon.com/kinesisanalytics 開啟 Managed Service for Apache Flink 主控台。

  2. 選擇建立應用程式,輸入應用程式名稱,然後選擇建立應用程式

  3. 在應用程式詳細資料頁面上,選擇連接串流資料來連接至來源。

  4. 連接至來源頁面,執行下列動作:

    1. 選擇您在上一節建立的串流。

    2. 選擇探索結構描述。等待主控台顯示推斷的結構描述和範例記錄,這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有兩個資料欄。

    3. 選擇編輯結構描述。將 EVENT_TIME 欄的欄類型變更為 TIMESTAMP

    4. 選擇 儲存結構描述並更新串流範例。主控台儲存結構描述後,選擇結束

    5. 選擇儲存並繼續

  5. 在應用程式詳細資訊頁面上,選擇至 SQL 編輯器。若要啟動應用程式,請在出現的對話方塊中選擇是,啟動應用程式

  6. 在 SQL 編輯器中,編寫應用程式碼並驗證結果,如下所示:

    1. 請複製以下應用程式碼,然後貼到編輯器中。

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
    2. 選擇儲存並執行 SQL

      即時分析標籤上,您可以查看應用程式建立的所有應用程式內串流,並驗證資料。