示例:从查询中聚合部分结果 - SQL适用于应用程序的 Amazon Kinesis Data Analytics 开发者指南

经过仔细考虑,我们决定分两个步骤停止使用亚马逊 Kinesis Data Analytics SQL 的应用程序:

1. 从 2025 年 10 月 15 日起,您将无法为应用程序创建新的 Kinesis Data Analytic SQL s。

2. 我们将从 2026 年 1 月 27 日起删除您的申请。您将无法启动或操作适用于应用程序的 Amazon Kinesis Data Analytic SQL s。从那时起,亚马逊 Kinesis Data Analytics SQL 将不再提供支持。有关更多信息,请参阅 适用于应用程序的 Amazon Kinesis Data Analytic SQL s 停产

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

示例:从查询中聚合部分结果

如果 Amazon Kinesis 数据流包含的记录具有与提取时间不完全匹配的事件时间,在滚动窗口中选择的结果将包含在窗口中到达但未必发生的记录。在这种情况下,滚动窗口只包含您需要的部分结果集。您可以通过多种方法来纠正这一问题:

  • 仅使用滚动窗口,并使用 upsert 通过数据库或数据仓库在后处理中聚合部分结果。这种方法在处理应用程序时很有效。它为聚合运算符(summinmax 等)无限期地处理后期数据。这种方法的缺点是,您必须在数据库层开发和维护额外的应用程序逻辑。

  • 使用滚动和滑动窗口,这会在早期生成部分结果,还会继续在滑动窗口期间生成完整的结果。此方法使用 overwrite 操作而非 upsert 操作来处理新近数据,这样就不需要在数据库层添加任何其他应用程序逻辑。这种方法的缺点是,它使用更多 Kinesis 处理单元 (KPU),并且仍生成两个结果,这可能不适用于某些使用案例。

有关滚动和滑动窗口的更多信息,请参阅窗口式查询

在以下过程中,滚动窗口聚合会生成两个部分结果(发送到 CALC_COUNT_SQL_STREAM 应用程序内部流),它们必须合并以生成最终结果。然后,应用程序生成第二个聚合(发送到 DESTINATION_SQL_STREAM 应用程序内部流),它合并这两个部分结果。

创建使用事件时间聚合部分结果的应用程序
  1. 登录到 AWS Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home

  2. 在导航窗格中,选择 Data Analytics (数据分析)。按照 适用于应用程序的 Amazon Kinesis Data Analytics SQL 入门 教程中所述,创建一个 Kinesis Data Analytics 应用程序。

  3. 在 SQL 编辑器中,将应用程序代码替换为以下内容:

    CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);

    应用程序代码中的 SELECT 语句将在 SOURCE_SQL_STREAM_001 中筛选出显示股票价格更改大于 1% 的行,并使用数据泵将这些行插入另一个应用程序内部流 CHANGE_STREAM

  4. 选择 保存并运行 SQL

第一个数据泵将流输出到与以下内容类似的 CALC_COUNT_SQL_STREAM。请注意,结果集不完整:

显示部分结果的控制台屏幕截图。

然后,第二个泵将流输出到 DESTINATION_SQL_STREAM,其中包含完整的结果集:

显示完整结果的控制台屏幕截图。