步骤 2:创建分析应用程序 - SQL适用于应用程序的 Amazon Kinesis Data Analytics 开发者指南

经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:

1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。

2. 我们将从 2026 年 1 月 27 日起删除您的申请。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 2:创建分析应用程序

在本节中,您将创建一个 Amazon Kinesis Data Analytics 应用程序,然后将其配置为使用在 步骤 1:准备数据 中作为流式传输源创建的 Kinesis 数据流。然后,运行使用 RANDOM_CUT_FOREST_WITH_EXPLANATION 函数的应用程序代码。

创建应用程序
  1. 打开 Kinesis 控制台,网址为:https://console.aws.amazon.com/kinesis

  2. 在导航窗格中选择 Data Analytics (数据分析),然后选择创建应用程序

  3. 提供应用程序名称和描述 (可选),并选择 Create application

  4. 选择 Connect streaming data (连接流数据),然后从列表中选择 ExampleInputStream

  5. 选择 Discover schema,并确保 SystolicDiastolic 显示为 INTEGER 列。如果二者为另一种类型,则选择 Edit schema,并将 INTEGER 类型分配给二者。

  6. Real time analytics 下,选择 Go to SQL editor。出现提示时,选择运行您的应用程序。

  7. 将以下代码粘贴到 SQL 编辑器中,然后选择 Save and run SQL

    --Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "Systolic" INTEGER, "Diastolic" INTEGER, "BloodPressureLevel" varchar(20), "ANOMALY_SCORE" DOUBLE, "ANOMALY_EXPLANATION" varchar(512)); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "Systolic" INTEGER, "Diastolic" INTEGER, "BloodPressureLevel" varchar(20), "ANOMALY_SCORE" DOUBLE, "ANOMALY_EXPLANATION" varchar(512)); -- Compute an anomaly score with explanation for each record in the input stream -- using RANDOM_CUT_FOREST_WITH_EXPLANATION CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "Systolic", "Diastolic", "BloodPressureLevel", ANOMALY_SCORE, ANOMALY_EXPLANATION FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true)); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
下一个步骤

步骤 3:检查结果