例: ストリームでデータの異常を検出する (RANDOM_CUT_FOREST 関数) - Amazon Kinesis Data Analytics for SQL Applications デベロッパーガイド

慎重に検討した結果、アプリケーションの Amazon Kinesis Data Analytics は 2 つのステップSQLで中止することにしました。

1. 2025 年 10 月 15 日以降、SQLアプリケーションの新しい Kinesis Data Analytics を作成することはできません。

2. 2026 年 1 月 27 日以降、アプリケーションを削除します。SQL アプリケーションの Amazon Kinesis Data Analytics を開始または操作することはできません。SQL それ以降、Amazon Kinesis Data Analytics のサポートは終了します。詳細については、「Amazon Kinesis Data Analytics for SQL Applications の中止」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

例: ストリームでデータの異常を検出する (RANDOM_CUT_FOREST 関数)

Amazon Kinesis Data Analytics では、数値列の値に基づいて異常スコアを各レコードに割り当てる関数 (RANDOM_CUT_FOREST) を提供しています。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「RANDOM_CUT_FOREST関数」を参照してください。

この実習では、アプリケーションのストリーミングソースのレコードに異常スコアを割り当てるアプリケーションコードを作成します。アプリケーションをセットアップするには、以下を実行します。

  1. ストリーミングソースのセットアップ – Kinesis データストリームをセットアップして、次のようにサンプル heartRate データを書き込みます。

    {"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}

    この手順では、ストリームに入力するための Python スクリプトを提供しています。heartRate 値はランダムに生成されます。レコードの 99 パーセントは heartRate 値が 60 から 100 の間で、heartRate 値の 1 パーセントのみが 150 から 200 の間です。したがって、heartRate 値が 150 から 200 の間のレコードは異常です。

  2. 入力の設定 – コンソールを使用して、 Kinesis Data Analyticsアプリケーションを作成し、ストリーミングソースをアプリケーション内ストリーム (SOURCE_SQL_STREAM_001) にマッピングすることでアプリケーション入力を設定します。アプリケーションが起動すると、Kinesis Data Analytics は継続的にストリーミングソースを読み取り、アプリケーション内ストリームにレコードを挿入します。

  3. アプリケーションコードの指定 – この例では、次のアプリケーションコードを使用します。

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "heartRate" INTEGER, "rateType" varchar(20), "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"))); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

    コードは SOURCE_SQL_STREAM_001 の行を読み取り、異常スコアを割り当て、結果の行を別のアプリケーション内ストリーム (TEMP_STREAM) に書き込みます。次に、アプリケーションコードはTEMP_STREAM のレコードをソートして、結果を別のアプリケーション内ストリーム (DESTINATION_SQL_STREAM) に保存します。ポンプを使用して、アプリケーション内ストリームに行を挿入します。詳細については、「アプリケーション内ストリームとポンプ」を参照してください。

  4. 出力の設定DESTINATION_SQL_STREAM のデータを永続化して、別の Kinesis データストリームの外部宛先に書き込むようにアプリケーション出力を設定できます。各レコードに割り当てられた異常スコアを確認して、どのスコアが異常を示しているか (また、アラートが必要か) を調べるのは、アプリケーションの範囲外です。このような異常スコアを処理してアラートを設定するには、AWS Lambda 関数を使用できます。

この実習では、米国東部 (バージニア北部) (us-east-1) を使用して、これらのストリームとアプリケーションを作成します。他のリージョンも使用する場合は、それに応じてコードを更新する必要があります。

次のステップ

ステップ 1: 準備