本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
直播上的窗口聚合
为了说明窗口化聚合在 Amazon Kinesis 数据流上的工作原理,假设下表中的数据流经名为 WEATHERSTREAM 的数据流。
ROWTIME | CITY | TEMP |
---|---|---|
2018-11-01 01 年 11 月 1 日 |
丹佛 |
29 |
2018-11-01 01 年 11 月 1 日 |
安克雷奇 |
2 |
2018-11-01 06:00:0 |
迈阿密 |
65 |
2018-11-01 07 年 11 月 1 日 |
丹佛 |
32 |
2018-11-01 09:00:0 |
安克雷奇 |
9 |
2018-11-01 13:00:0 |
丹佛 |
50 |
2018-11-01 17:00:0 |
安克雷奇 |
10 |
2018-11-01 18-08-00-0 |
迈阿密 |
71 |
2018-11-01 19:00:0 |
丹佛 |
43 |
2018-11-02 01:00:0 |
安克雷奇 |
4 |
2018-11-02 01:00:0 |
丹佛 |
39 |
2018-11-02 07 年 11 月 2 日 |
丹佛 |
46 |
2018-11-02 09:00:0 |
安克雷奇 |
3 |
2018-11-02 13:00:0 |
丹佛 |
56 |
2018-11-02 17:00:0 |
安克雷奇 |
2 |
2018-11-02 19:00:0 |
丹佛 |
50 |
2018-11-03 01:00:0 |
丹佛 |
36 |
2018-11-03 01:00:0 |
安克雷奇 |
1 |
假设您要查找 24 小时期间内记录的全球(无论是哪个城市)的最低和最高温度(在任意给定读数之前)。为此,您需要定义 RANGE INTERVAL '1' DAY PRECEDING
的一个窗口,并在 MIN
和 MAX
分析函数的 OVER
子句中使用它:
SELECT STREAM ROWTIME, MIN(TEMP) OVER W1 AS WMIN_TEMP, MAX(TEMP) OVER W1 AS WMAX_TEMP FROM WEATHERSTREAM WINDOW W1 AS ( RANGE INTERVAL '1' DAY PRECEDING );
结果
ROWTIME | WMIN_TEMP | WMAX_TEMP |
---|---|---|
2018-11-01 01 年 11 月 1 日 |
29 |
29 |
2018-11-01 01 年 11 月 1 日 |
2 |
29 |
2018-11-01 06:00:0 |
2 |
65 |
2018-11-01 07 年 11 月 1 日 |
2 |
65 |
2018-11-01 09:00:0 |
2 |
65 |
2018-11-01 13:00:0 |
2 |
65 |
2018-11-01 17:00:0 |
2 |
65 |
2018-11-01 18-08-00-0 |
2 |
71 |
2018-11-01 19:00:0 |
2 |
71 |
2018-11-02 01:00:0 |
2 |
71 |
2018-11-02 01:00:0 |
2 |
71 |
2018-11-02 07 年 11 月 2 日 |
4 |
71 |
2018-11-02 09:00:0 |
3 |
71 |
2018-11-02 13:00:0 |
3 |
71 |
2018-11-02 17:00:0 |
2 |
71 |
2018-11-02 19:00:0 |
2 |
56 |
2018-11-03 01:00:0 |
2 |
56 |
2018-11-03 01:00:0 |
1 |
56 |
现在,假定您要查找在 24 小时期间内记录的在任意给定读数之前的最低、最高和平均温度(按城市划分)。为此,您可以向窗口规范添加针对 CITY
的 PARTITION BY
子句,并向选择列表添加针对同一个窗口的 AVG
分析函数:
SELECT STREAM ROWTIME, CITY, MIN(TEMP) over W1 AS WMIN_TEMP, MAX(TEMP) over W1 AS WMAX_TEMP, AVG(TEMP) over W1 AS WAVG_TEMP FROM AGGTEST.WEATHERSTREAM WINDOW W1 AS ( PARTITION BY CITY RANGE INTERVAL '1' DAY PRECEDING );
结果
ROWTIME | CITY | WMIN_TEMP | WMAX_TEMP | WAVG_TEMP |
---|---|---|---|---|
2018-11-01 01 年 11 月 1 日 |
丹佛 |
29 |
29 |
29 |
2018-11-01 01 年 11 月 1 日 |
安克雷奇 |
2 |
2 |
2 |
2018-11-01 06:00:0 |
迈阿密 |
65 |
65 |
65 |
2018-11-01 07 年 11 月 1 日 |
丹佛 |
29 |
32 |
30 |
2018-11-01 09:00:0 |
安克雷奇 |
2 |
9 |
5 |
2018-11-01 13:00:0 |
丹佛 |
29 |
50 |
37 |
2018-11-01 17:00:0 |
安克雷奇 |
2 |
10 |
7 |
2018-11-01 18-08-00-0 |
迈阿密 |
65 |
71 |
68 |
2018-11-01 19:00:0 |
丹佛 |
29 |
50 |
38 |
2018-11-02 01:00:0 |
安克雷奇 |
2 |
10 |
6 |
2018-11-02 01:00:0 |
丹佛 |
29 |
50 |
38 |
2018-11-02 07 年 11 月 2 日 |
丹佛 |
32 |
50 |
42 |
2018-11-02 09:00:0 |
安克雷奇 |
3 |
10 |
6 |
2018-11-02 13:00:0 |
丹佛 |
39 |
56 |
46 |
2018-11-02 17:00:0 |
安克雷奇 |
2 |
10 |
4 |
2018-11-02 19:00:0 |
丹佛 |
39 |
56 |
46 |
2018-11-03 01:00:0 |
丹佛 |
36 |
56 |
45 |
2018-11-03 01:00:0 |
安克雷奇 |
1 |
4 |
2 |
行时边界和窗口聚合示例
这是窗口化聚合查询的示例:
SELECT STREAM ROWTIME, ticker, amount, SUM(amount) OVER ( PARTITION BY ticker RANGE INTERVAL '1' HOUR PRECEDING) AS hourlyVolume FROM Trades
因为这是对流的查询,所以行一进去就会从这个查询中弹出。例如,给定以下输入:
Trades: IBM 10 10 10:00:00 Trades: ORCL 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 10:25:00 Trades: IBM 30 11:05:00 Trades.bound: 11:10:00
输出示
Trades: IBM 10 10 10:00:00 Trades: ORCL 20 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 35 10:25:00 Trades: IBM 30 30 11:05:00 Trades.bound: 11:10:00
这些行仍然在后台逗留一个小时,因此第二个 ORCL 行输出的总计为 35;但原始 IBM 交易落在“上一个小时”窗口外,因此它未包含在 IBM 总计中。
示例
有些业务问题似乎需要对直播的整个历史记录进行总计,但计算起来通常不切实际。但是,此类业务问题通常可以通过查看最后一天、最后一小时或最后 N 条记录来解决。此类记录的集合称为窗口化聚合。
它们易于在流数据库中计算,可以用 ANSI (SQL: 2008) 标准 SQL 表示,如下所示:
SELECT STREAM ticker, avg(price) OVER lastHour AS avgPrice, max(price) OVER lastHour AS maxPrice FROM Bids WINDOW lastHour AS ( PARTITION BY ticker RANGE INTERVAL '1' HOUR PRECEDING)
注意
Interval_clause
必须属于以下合适的类型之一:
-
带有 ROWS 的整数文字
-
数值列上的 RANGE 的数值
-
某个范围在日期/时间/时间戳上的间隔