在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:
1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。
2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
範例:交錯視窗
當視窗化查詢為每個不同的分割區索引鍵處理個別視窗時,從具有相符索引鍵的資料到達時開始,即為交錯視窗。如需詳細資訊,請參閱 交錯窗口。這個 Amazon Kinesis Data Analytics 範例使用 EVENT_TIME 和 TICKER 欄來建立交錯視窗。來源串流包含六個記錄的群組,其中有相同的 EVENT_TIME 和 TICKER 值,這些資料會在一分鐘內到達,但不一定具有相同的分鐘值 (例如 18:41:xx
)。
在此範例中,將下列記錄於指定時間寫入 Amazon Kinesis 資料串流。指令碼不會將時間寫入串流,但應用程式擷取記錄的時間會寫入 ROWTIME
欄位:
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:30 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:40 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:17:50 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:00 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:10 {"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"} 20:18:21 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:31 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:41 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:18:51 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:01 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:11 {"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"} 20:19:21 ...
接著,在 AWS Management Console 建立 Kinesis Data Analytics 應用程式,並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄,並推斷含有兩個資料欄 (EVENT_TIME
和 TICKER
) 的應用程式內結構描述,如下所示。
您可以將應用程式碼與 COUNT
函數搭配使用,以建立資料的視窗化彙總。接著將產生的資料插入另一個應用程式內串流,如下列螢幕擷取畫面所示:
在下列程序中,您會建立 Kinesis Data Analytics 應用程式,根據 EVENT_TIME 和 TICKER,在交錯視窗彙總輸入串流中的值。
步驟 1:建立 Kinesis Data Stream
建立 Amazon Kinesis 資料串流,並填入紀錄,如下所示:
前往 https://console.aws.amazon.com/kinesis
登入 AWS Management Console 並開啟 Kinesis 主控台。 -
在導覽窗格中選擇資料串流。
-
選擇建立 Kinesis 串流,然後建立內含一個碎片之串流。如需詳細資訊,請參閱 Amazon Kinesis Data Streams 開發人員指南中的建立串流。
-
若要在生產環境中將記錄寫入 Kinesis 資料串流,建議您使用 Kinesis Producer Library 或 Kinesis Data Streams API。為了簡單起見,這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會在一分鐘的時間內,連續將具有相同隨機
EVENT_TIME
和股票代號的六筆記錄寫入串流。讓指令碼持續執行,以便在稍後的步驟產生應用程式結構描述。import datetime import json import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) return { "EVENT_TIME": event_time.isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), } def generate(stream_name, kinesis_client): while True: data = get_data() # Send six records, ten seconds apart, with the same event time and ticker for _ in range(6): print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey", ) time.sleep(10) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
步驟 2:建立 Kinesis Data Analytics 應用程式
建立 Kinesis Data Analytics 應用程式,如下所示。
前往 https://console.aws.amazon.com/kinesisanalytics
開啟 Managed Service for Apache Flink 主控台。 -
選擇建立應用程式,輸入應用程式名稱,然後選擇建立應用程式。
-
在應用程式詳細資料頁面上,選擇連接串流資料來連接至來源。
-
在連接至來源頁面,執行下列動作:
-
選擇您在上一節建立的串流。
-
選擇探索結構描述。等待主控台顯示推斷的結構描述和範例記錄,這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有兩個資料欄。
-
選擇編輯結構描述。將 EVENT_TIME 欄的欄類型變更為
TIMESTAMP
。 -
選擇 儲存結構描述並更新串流範例。主控台儲存結構描述後,選擇結束。
-
選擇儲存並繼續。
-
-
在應用程式詳細資訊頁面上,選擇至 SQL 編輯器。若要啟動應用程式,請在出現的對話方塊中選擇是,啟動應用程式。
-
在 SQL 編輯器中,編寫應用程式碼並驗證結果,如下所示:
-
請複製以下應用程式碼,然後貼到編輯器中。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, ticker_symbol VARCHAR(4), ticker_count INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM EVENT_TIME, TICKER, COUNT(TICKER) AS ticker_count FROM "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
-
選擇儲存並執行 SQL。
在即時分析標籤上,您可以查看應用程式建立的所有應用程式內串流,並驗證資料。
-