範例:將字串拆分為多個字段 (VARIABLE_COLUMN_LOG_PARSE 函數) - Amazon Kinesis Data Analytics for SQL Applications 開發人員指南

在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:

1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。

2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

範例:將字串拆分為多個字段 (VARIABLE_COLUMN_LOG_PARSE 函數)

此範例使用 VARIABLE_COLUMN_LOG_PARSE 函數來操作 Kinesis Data Analytics 中的字串。VARIABLE_COLUMN_LOG_PARSE 將輸入字串分割,變成以分隔符號字元或字串分開的欄位。如需詳細資訊,請參閱 Amazon Managed Service for Apache Flink SQL 參考資料中的VARIABLE_COLUMN_LOG_PARSE

在此範例中,將半結構化記錄寫入 Amazon Kinesis 資料串流。範例記錄如下:

{ "Col_A" : "string", "Col_B" : "string", "Col_C" : "string", "Col_D_Unstructured" : "value,value,value,value"} { "Col_A" : "string", "Col_B" : "string", "Col_C" : "string", "Col_D_Unstructured" : "value,value,value,value"}

接著,您可以在主控台上建立 Kinesis Data Analytics 應用程式,並將 Kinesis 資料串流當作串流來源。探索程序會讀取串流來源上的範例記錄,並以四個資料欄推斷應用程式內結構描述,如下所示:

主控台螢幕擷取畫面,顯示帶 4 個資料欄的應用程式內結構描述

然後,您可以將應用程式碼與 VARIABLE_COLUMN_LOG_PARSE 函數搭配使用來剖析逗號分隔的值,並將正規化的列插入另一個應用程式內串流,如下所示:

主控台螢幕擷取畫面,顯示帶有應用程式內串流的即時分析標籤。

步驟 1:建立 Kinesis 資料串流

建立 Amazon Kinesis 資料串流,並填入日誌紀錄,如下所示:

  1. 前往 https://console.aws.amazon.com/kinesis 登入 AWS Management Console 並開啟 Kinesis 主控台。

  2. 在導覽窗格中選擇資料串流

  3. 選擇建立 Kinesis 串流,然後建立內含一個碎片之串流。如需詳細資訊,請參閱 Amazon Kinesis Data Streams 開發人員指南中的建立串流

  4. 執行下列 Python 程式碼,以填入範例日誌記錄。這個簡單的代碼會持續寫入相同的日誌記錄到串流。

    import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

步驟 2:建立 Kinesis Data Analytics 應用程式

建立 Kinesis Data Analytics 應用程式,如下所示。

  1. 前往 https://console.aws.amazon.com/kinesisanalytics 開啟 Managed Service for Apache Flink 主控台。

  2. 選擇建立應用程式,輸入應用程式名稱,然後選擇建立應用程式

  3. 在應用程式詳細資料頁面上,選擇連接串流資料

  4. 連接至來源頁面,執行下列動作:

    1. 選擇您在上一節建立的串流。

    2. 選擇建立 IAM 角色 選項。

    3. 選擇探索結構描述。等待主控台顯示推斷的結構描述和範例記錄,這些記錄可用來推斷應用程式內串流所建立的結構描述。請注意,推斷的結構描述只有一個資料欄。

    4. 選擇儲存並繼續

  5. 在應用程式詳細資訊頁面上,選擇至 SQL 編輯器。若要啟動應用程式,請在出現的對話方塊中選擇是,啟動應用程式

  6. 在 SQL 編輯器中,撰寫應用程式碼,然後驗證結果:

    1. 請複製以下應用程式碼,然後貼到編輯器中:

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16), "column_B" VARCHAR(16), "column_C" VARCHAR(16), "COL_1" VARCHAR(16), "COL_2" VARCHAR(16), "COL_3" VARCHAR(16)); CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM t."Col_A", t."Col_B", t."Col_C", t.r."COL_1", t.r."COL_2", t.r."COL_3" FROM (SELECT STREAM "Col_A", "Col_B", "Col_C", VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured", 'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)', ',') AS r FROM "SOURCE_SQL_STREAM_001") as t;
    2. 選擇儲存並執行 SQL。在即時分析標籤上,您可以查看應用程式建立的所有應用程式內串流,並驗證資料。