範例:偵測串流上的資料異常 (RANDOM_CUT_FOREST 函數) - Amazon Kinesis Data Analytics for SQL Applications 開發人員指南

在仔細考慮之後,我們決定在兩個步驟中停止 Amazon Kinesis Data Analytics for SQL 應用程式:

1. 從 2025 年 10 月 15 日起,您將無法為SQL應用程式建立新的 Kinesis Data Analytics。

2. 我們將從 2026 年 1 月 27 日起刪除您的應用程式。您將無法啟動或操作SQL應用程式的 Amazon Kinesis Data Analytics。從那時SQL起,Amazon Kinesis Data Analytics 將不再提供 的支援。如需詳細資訊,請參閱Amazon Kinesis Data Analytics for SQL 應用程式終止

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

範例:偵測串流上的資料異常 (RANDOM_CUT_FOREST 函數)

Amazon Kinesis Data Analytics 提供函數 (RANDOM_CUT_FOREST),可根據數值欄中的值為每筆記錄指派異常分數。如需詳細資訊,請參閱 Amazon Managed Service for Apache Flink SQL 參考資料中的 RANDOM_CUT_FOREST 函數

在本練習中,撰寫應用程式碼,將異常分數指派給應用程式串流來源上的記錄。若要設定應用程式,請執行下列動作:

  1. 設定串流來源:設定 Kinesis 資料串流並寫入範例 heartRate 資料,如下所示:

    {"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}

    此程序會提供 Python 指令碼供您填入串流。這些 heartRate 值是隨機產生的,99% 記錄的 heartRate 值介於 60 到 100 之間,而只有 1% 的 heartRate 值介於 150 和 200 之間。因此,heartRate 值介於 150 和 200 之間的記錄為異常。

  2. 設定輸入:使用主控台可建立 Kinesis Data Analytics 應用程式,並透過將串流來源對應至應用程式內串流 (SOURCE_SQL_STREAM_001) ,以設定應用程式輸入。當應用程式啟動時,Kinesis Data Analytics 會持續讀取串流來源,並將記錄插入應用程式內串流。

  3. 指定應用程式碼:此範例使用下列應用程式碼:

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

    程式碼會讀取 SOURCE_SQL_STREAM_001 中的資料列、指派異常分數,然後將產生的資料欄寫入另一個應用程式內串流 (TEMP_STREAM)。然後,應用程式碼會排序 TEMP_STREAM 中的記錄,並將結果儲存到另一個應用程式內串流 (DESTINATION_SQL_STREAM)。您可以使用幫浦將資料列插入應用程式內串流。如需更多詳細資訊,請參閱 應用程式內串流與幫浦

  4. 配置輸出:設定將應用程式輸出,將 DESTINATION_SQL_STREAM 中的資料保存在外部目的地,即另一個 Kinesis 資料串流。檢閱指派給每筆記錄的異常分數,並判斷哪些分數表示應用程式外部的異常 (且需要收到提醒)。您可以使用 AWS Lambda 函數來處理這些異常分數並設定提醒。

本練習會使用美國東部 (維吉尼亞北部) (us-east-1) 來建立這些串流,以及您的應用程式。如果您使用任何其他區域,則須更新相應程式碼。

後續步驟

步驟 1:準備