Windowed Aggregation on Streams - Amazon Kinesis Data Analytics SQL Reference

Windowed Aggregation on Streams

To illustrate how windowed aggregation works on Amazon Kinesis data streams, assume that the data in the following table is flowing through a stream called WEATHERSTREAM.

ROWTIME CITY TEMP

2018-11-01 01:00:00.0

Denver

29

2018-11-01 01:00:00.0

Anchorage

2

2018-11-01 06:00:00.0

Miami

65

2018-11-01 07:00:00.0

Denver

32

2018-11-01 09:00:00.0

Anchorage

9

2018-11-01 13:00:00.0

Denver

50

2018-11-01 17:00:00.0

Anchorage

10

2018-11-01 18:00:00.0

Miami

71

2018-11-01 19:00:00.0

Denver

43

2018-11-02 01:00:00.0

Anchorage

4

2018-11-02 01:00:00.0

Denver

39

2018-11-02 07:00:00.0

Denver

46

2018-11-02 09:00:00.0

Anchorage

3

2018-11-02 13:00:00.0

Denver

56

2018-11-02 17:00:00.0

Anchorage

2

2018-11-02 19:00:00.0

Denver

50

2018-11-03 01:00:00.0

Denver

36

2018-11-03 01:00:00.0

Anchorage

1

Suppose that you want to find the minimum and maximum temperature recorded in the 24-hour period prior to any given reading, globally, regardless of city. To do this, you define a window of RANGE INTERVAL '1' DAY PRECEDING, and use it in the OVER clause for the MIN and MAX analytic functions:

SELECT STREAM        ROWTIME,        MIN(TEMP) OVER W1 AS WMIN_TEMP,        MAX(TEMP) OVER W1 AS WMAX_TEMP FROM WEATHERSTREAM WINDOW W1 AS (    RANGE INTERVAL '1' DAY PRECEDING );

Results

ROWTIME WMIN_TEMP WMAX_TEMP

2018-11-01 01:00:00.0

29

29

2018-11-01 01:00:00.0

2

29

2018-11-01 06:00:00.0

2

65

2018-11-01 07:00:00.0

2

65

2018-11-01 09:00:00.0

2

65

2018-11-01 13:00:00.0

2

65

2018-11-01 17:00:00.0

2

65

2018-11-01 18:00:00.0

2

71

2018-11-01 19:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 01:00:00.0

2

71

2018-11-02 07:00:00.0

4

71

2018-11-02 09:00:00.0

3

71

2018-11-02 13:00:00.0

3

71

2018-11-02 17:00:00.0

2

71

2018-11-02 19:00:00.0

2

56

2018-11-03 01:00:00.0

2

56

2018-11-03 01:00:00.0

1

56

Now, assume that you want to find the minimum, maximum, and average temperature recorded in the 24-hour period prior to any given reading, broken down by city. To do this, you add a PARTITION BY clause on CITY to the window specification, and add the AVG analytic function over the same window to the selection list:

SELECT STREAM        ROWTIME,        CITY,        MIN(TEMP) over W1 AS WMIN_TEMP,        MAX(TEMP) over W1 AS WMAX_TEMP,        AVG(TEMP) over W1 AS WAVG_TEMP FROM AGGTEST.WEATHERSTREAM WINDOW W1 AS (        PARTITION BY CITY        RANGE INTERVAL '1' DAY PRECEDING );

Results

ROWTIME CITY WMIN_TEMP WMAX_TEMP WAVG_TEMP

2018-11-01 01:00:00.0

Denver

29

29

29

2018-11-01 01:00:00.0

Anchorage

2

2

2

2018-11-01 06:00:00.0

Miami

65

65

65

2018-11-01 07:00:00.0

Denver

29

32

30

2018-11-01 09:00:00.0

Anchorage

2

9

5

2018-11-01 13:00:00.0

Denver

29

50

37

2018-11-01 17:00:00.0

Anchorage

2

10

7

2018-11-01 18:00:00.0

Miami

65

71

68

2018-11-01 19:00:00.0

Denver

29

50

38

2018-11-02 01:00:00.0

Anchorage

2

10

6

2018-11-02 01:00:00.0

Denver

29

50

38

2018-11-02 07:00:00.0

Denver

32

50

42

2018-11-02 09:00:00.0

Anchorage

3

10

6

2018-11-02 13:00:00.0

Denver

39

56

46

2018-11-02 17:00:00.0

Anchorage

2

10

4

2018-11-02 19:00:00.0

Denver

39

56

46

2018-11-03 01:00:00.0

Denver

36

56

45

2018-11-03 01:00:00.0

Anchorage

1

4

2

Examples of Rowtime Bounds and Windowed Aggregation

This is an example of a windowed aggregate query:

SELECT STREAM ROWTIME, ticker, amount, SUM(amount)    OVER (        PARTITION BY ticker        RANGE INTERVAL '1' HOUR PRECEDING) AS hourlyVolume FROM Trades

Because this is a query on a stream, rows pop out of this query as soon as they go in. For example, given the following inputs:

Trades: IBM 10 10 10:00:00 Trades: ORCL 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 10:25:00 Trades: IBM 30 11:05:00 Trades.bound: 11:10:00

In this example, the output is as follows:

Trades: IBM 10 10 10:00:00 Trades: ORCL 20 20 10:10:00 Trades.bound: 10:15:00 Trades: ORCL 15 35 10:25:00 Trades: IBM 30 30 11:05:00 Trades.bound: 11:10:00

The rows still hang around behind the scenes for an hour, and thus the second ORCL row output has a total of 35; but the original IBM trade falls outside the "hour preceding" window, and so it is excluded from the IBM sum.

Example

Some business problems seem to need totals over the whole history of a stream, but this is usually not practical to compute. However, such business problems are often solvable by looking at the last day, the last hour, or the last N records. Sets of such records are called windowed aggregates.

They are easy to compute in a stream database, and can be expressed in ANSI (SQL:2008) standard SQL as follows:

SELECT STREAM ticker,  avg(price) OVER lastHour AS avgPrice,      max(price) OVER lastHour AS maxPrice   FROM Bids   WINDOW lastHour AS  (      PARTITION BY ticker      RANGE INTERVAL '1' HOUR PRECEDING)

Note

The Interval_clause must be of one of the following appropriate types:

  • Integer literal with ROWS

  • Numeric value for RANGE over a numeric column

  • INTERVAL for a RANGE over a date/time/timestamp