範例:轉換 DateTime 值 - Amazon Kinesis Data Analytics for SQL Applications 開發人員指南

在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:

1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。

2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

範例:轉換 DateTime 值

Amazon Kinesis Data Analytics 支援將資料欄轉換為時間戳記。舉例來說,除了 ROWTIME 欄之外,您可能想要使用自己的時間戳記做為 GROUP BY 子句的一部分,將其當作另一個以時間為基礎的視窗。Kinesis Data Analytics 提供操作和 SQL 函數,用於處理日期和時間欄位。

  • 日期和時間運算子:您可以對日期、時間和間隔資料類型執行算術運算。如需詳細資訊,請參閱 Amazon Managed Service for Apache Flink SQL 參考資料中的日期、時間和間隔運算子

     

  • SQL 函數:包括以下項目。如需詳細資訊,請參閱 Amazon Managed Service for Apache Flink SQL 參考資料中的日期和時間函數

    • EXTRACT() - 從日期、時間、時間戳記或間隔運算式中擷取一個欄位。

    • CURRENT_TIME - 傳回查詢執行的時間 (UTC)。

    • CURRENT_DATE - 傳回查詢執行的日期 (UTC)。

    • CURRENT_TIMESTAMP - 傳回查詢執行的時間戳記 (UTC)。

    • LOCALTIME - 傳回由 Kinesis Data Analytics 運行環境定義,查詢執行時的目前時間 (UTC)。

    • LOCALTIMESTAMP - 傳回由 Kinesis Data Analytics 運行環境定義的目前時間戳記 (UTC)。

       

  • SQL 擴充功能:包括下列項目。如需詳細資訊,請參閱 Amazon Managed Service for Apache Flink SQL 參考資料中的日期和時間函數,以及日期時間轉換參考資料

    • CURRENT_ROW_TIMESTAMP - 傳回串流中每一列的新時間戳記。

    • TSDIFF - 傳回兩個時間戳記的差值 (以毫秒為單位)。

    • CHAR_TO_DATE - 將字串轉換為日期。

    • CHAR_TO_TIME - 將字串轉換為時間。

    • CHAR_TO_TIMESTAMP - 將字串轉換為時間戳記。

    • DATE_TO_CHAR - 將日期轉換為字串。

    • TIME_TO_CHAR - 將時間轉換為字串。

    • TIMESTAMP_TO_CHAR - 將時間戳記轉換為字串。

先前大多數 SQL 函數使用格式來轉換欄。此格式具靈活度。舉例來說,您可以指定格式 yyyy-MM-dd hh:mm:ss 將輸入字串 2009-09-16 03:15:24 轉換為時間戳記。如需詳細資訊,請參閱 Amazon Managed Service for Apache Flink SQL 參考資料中的文字至時間戳記 (Sys)

範例:轉換日期

在此範例中,將下列記錄寫入 Amazon Kinesis 資料串流。

{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...

接著,您可以在主控台上建立 Kinesis Data Analytics 應用程式,並將 Kinesis 串流做為串流來源。探索程序會讀取串流來源上的範例記錄,並推斷含有兩個資料欄 (EVENT_TIMETICKER) 的應用程式內結構描述,如下所示。

顯示應用程式內結構描述的主控台螢幕擷取畫面,其中包含事件時間和股票代碼欄。

然後,將應用程式碼與 SQL 函數搭配使用,以各種方式轉換 EVENT_TIME 時間戳記欄位。接著將產生的資料插入另一個應用程式內串流,如下列螢幕擷取畫面所示:

顯示應用程式內串流生成資料的主控台螢幕擷取畫面。

步驟 1:建立 Kinesis 資料串流

建立 Amazon Kinesis 資料串流,並將事件時間和股票記錄填入其中,如下所示:

  1. 登入 AWS Management Console 並開啟運動主控台,網址為 https://console.aws.amazon.com/kinesis

  2. 在導覽窗格中選擇資料串流

  3. 選擇建立 Kinesis 串流,然後建立內含一個碎片之串流。

  4. 執行下列 Python 程式碼,以使用範例資料填入串流。這個簡單的程式碼,會持續將帶有隨機股票代號和當前時間戳記的記錄寫入串流。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

步驟 2:建立 Amazon Kinesis Data Analytics 應用程式

建立應用程式,如下所示:

  1. 前往 https://console.aws.amazon.com/kinesisanalytics 開啟 Managed Service for Apache Flink 主控台。

  2. 選擇建立應用程式,輸入應用程式名稱,然後選擇建立應用程式

  3. 在應用程式詳細資料頁面上,選擇連接串流資料來連接至來源。

  4. 連接至來源頁面,執行下列動作:

    1. 選擇您在上一節建立的串流。

    2. 選擇建立 IAM 角色。

    3. 選擇探索結構描述。等待主控台顯示推斷的結構描述,以及用來推斷建立的應用程式內串流結構描述之範例記錄。推斷的結構描述有兩個資料欄。

    4. 選擇編輯結構描述。將 EVENT_TIME 欄的欄類型變更為 TIMESTAMP

    5. 選擇 儲存結構描述並更新串流範例。主控台儲存結構描述後,選擇結束

    6. 選擇儲存並繼續

  5. 在應用程式詳細資訊頁面上,選擇至 SQL 編輯器。若要啟動應用程式,請在出現的對話方塊中選擇是,啟動應用程式

  6. 在 SQL 編輯器中,編寫應用程式碼並驗證結果,如下所示:

    1. 請複製以下應用程式碼,然後貼到編輯器中。

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
    2. 選擇儲存並執行 SQL。在即時分析標籤上,您可以查看應用程式建立的所有應用程式內串流,並驗證資料。