经过仔细考虑,我们决定分两个步骤停用 Amazon Kinesis Data Analytics for SQL 应用程序:
1. 从 2025 年 10 月 15 日起,您将无法创建新的 Kinesis Data Analytics for SQL 应用程序。
2. 从 2026 年 1 月 27 日起,我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起,将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息,请参阅 Amazon Kinesis Data Analytics for SQL 应用程序停用。
示例:从查询中聚合部分结果
如果 Amazon Kinesis 数据流包含的记录具有与提取时间不完全匹配的事件时间,在滚动窗口中选择的结果将包含在窗口中到达但未必发生的记录。在这种情况下,滚动窗口只包含您需要的部分结果集。您可以通过多种方法来纠正这一问题:
-
仅使用滚动窗口,并使用 upsert 通过数据库或数据仓库在后处理中聚合部分结果。这种方法在处理应用程序时很有效。它为聚合运算符(
sum
、min
、max
等)无限期地处理后期数据。这种方法的缺点是,您必须在数据库层开发和维护额外的应用程序逻辑。 -
使用滚动和滑动窗口,这会在早期生成部分结果,还会继续在滑动窗口期间生成完整的结果。此方法使用 overwrite 操作而非 upsert 操作来处理新近数据,这样就不需要在数据库层添加任何其他应用程序逻辑。这种方法的缺点是,它使用更多 Kinesis 处理单元 (KPU),并且仍生成两个结果,这可能不适用于某些使用案例。
有关滚动和滑动窗口的更多信息,请参阅窗口式查询。
在以下过程中,滚动窗口聚合会生成两个部分结果(发送到 CALC_COUNT_SQL_STREAM
应用程序内部流),它们必须合并以生成最终结果。然后,应用程序生成第二个聚合(发送到 DESTINATION_SQL_STREAM
应用程序内部流),它合并这两个部分结果。
创建使用事件时间聚合部分结果的应用程序
登录到 AWS Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
在导航窗格中,选择 Data Analytics (数据分析)。按照 Amazon Kinesis Data Analytics·for·SQL 应用程序入门 教程中所述,创建一个 Kinesis Data Analytics 应用程序。
-
在 SQL 编辑器中,将应用程序代码替换为以下内容:
CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), TRADETIME TIMESTAMP, TICKERCOUNT DOUBLE); CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER_SYMBOL", STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime", COUNT(*) AS "TickerCount" FROM "SOURCE_SQL_STREAM_001" GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE), STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE), TICKER_SYMBOL; CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT") SELECT STREAM "TICKER", "TRADETIME", SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" FROM "CALC_COUNT_SQL_STREAM" WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);
应用程序代码中的
SELECT
语句将在SOURCE_SQL_STREAM_001
中筛选出显示股票价格更改大于 1% 的行,并使用数据泵将这些行插入另一个应用程序内部流CHANGE_STREAM
。 -
选择 保存并运行 SQL。
第一个数据泵将流输出到与以下内容类似的 CALC_COUNT_SQL_STREAM
。请注意,结果集不完整:
然后,第二个泵将流输出到 DESTINATION_SQL_STREAM
,其中包含完整的结果集: