本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
加入条款
SELECT 语句中的 JOIN 子句合并了来自一个或多个流或引用表中的列。
流-流联接
Amazon Kinesis Data Analytics 支持使用 SQL 将应用程序内数据流与另一个应用程序内数据流连接起来,从而将这一重要的传统数据库功能引入流环境中。
本节介绍了 Kinesis Data Analytics 支持的联接类型,包括基于时间和基于行的窗口联接,以及有关流式联接的详细信息。
联接类型
有五种连接类型:
内部加入(或者直接加入) |
返回来自左侧和来自右侧并且联接条件的计算结果为 TRUE 的所有行对。 |
左外连接(或仅向左连接) |
作为 INNER JOIN,但即使左侧的行不与右侧的任何行匹配,也会保留左侧的行。在右侧生成 NULL 值。 |
右外连接(或刚好右连接) |
作为 INNER JOIN,但即使右侧的行不与左侧的任何行匹配,也会保留右侧的行。在左侧为这些行生成 NULL 值。 |
完整的外部连接(或者只是 FULL JOIN) |
作为 INNER JOIN,但即使两侧的行不与任一侧的任何行匹配,也会保留两侧的行。在另一侧为这些行生成 NULL 值。 |
CROSS JOIN |
返回输入的笛卡尔积:左侧的每个行均与右侧的每个行配对。 |
基于时间的窗口与基于行的窗口联接
将左侧流的整个历史记录与右侧流的整个历史记录相联接是不切实际的。因此,您必须使用 OVER 子句将至少一个流限制到一个时间窗口。OVER 子句定义了一个在给定时间考虑加入的行窗口。
该窗口可以是基于时间的,也可以是基于行的:
-
基于时间的窗口使用 RANGE 关键字。它将窗口定义为其 ROWTIME 列落在查询的当前时间的特定时间间隔内的一组行。
例如,以下子句指定窗口包含其 ROWTIME 在流当前时间之前的那个小时内的所有行:
OVER (RANGE INTERVAL '1' HOUR PRECEDING)
-
基于行的窗口使用 ROWS 关键字。它将窗口定义为具有当前时间戳的行之前或之后的给定行计数。
例如,以下子句指定窗口中仅包含最新的 10 行:
OVER (ROWS 10 PRECEDING)
注意
如果在联接的一侧没有指定时间窗口或基于行的窗口,则只有该侧的当前行参与联接计算。
流到流联接的示例
以下示例演示了应用程序内 stream-to-stream 联接的工作原理、何时返回联接结果以及联接结果的行次。
主题
示例数据集
本节中的示例基于以下数据集和流定义:
Orders 数据示例
{ "orderid":"101", "orders":"1" }
Shipments 数据示例
{ "orderid":"101", "shipments":"2" }
创建 ORDERS_STREAM 应用程序内流
CREATE OR REPLACE STREAM "ORDERS_STREAM" ("orderid" int, "orderrowtime" timestamp); CREATE OR REPLACE PUMP "ORDERS_STREAM_PUMP" AS INSERT INTO "ORDERS_STREAM" SELECT STREAM "orderid", "ROWTIME" FROM "SOURCE_SQL_STREAM_001" WHERE "orders" = 1;
创建 SHIPMENTS_STREAM 应用程序内流
CREATE OR REPLACE STREAM "SHIPMENTS_STREAM" ("orderid" int, "shipmentrowtime" timestamp); CREATE OR REPLACE PUMP "SHIPMENTS_STREAM_PUMP" AS INSERT INTO "SHIPMENTS_STREAM" SELECT STREAM "orderid", "ROWTIME" FROM "SOURCE_SQL_STREAM_001" WHERE "shipments" = 2;
示例 1: JOIN (INNER JOIN) 一侧的时间窗口
此示例演示了一个查询,该查询返回在最后一分钟执行发货的所有订单。
联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "shipmenttime" timestamp, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", s.rowtime as "shipmenttime", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o JOIN SHIPMENTS_STREAM AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM | SHIPMENTS_STREAM | OUTPUT_STREAM | |||||
---|---|---|---|---|---|---|---|
ROWTIME | orderid | ROWTIME | orderid | resultrowtime | orderid | shipmenttime | OrderTime |
10:00:00 | 101 | 10:00:00 | 100 | ||||
10:00:20 | 102 | ||||||
10:00:30 | 103 | ||||||
10:00:40 | 104 | ||||||
10:00:45 | 104 | ||||||
10:00:45 | 100* | 10:00:45 | 104 | 10:00:45 | 10:00:40 | ||
10:00:50 | 105 |
* - orderid = 100 的记录是 Orders 流中的较迟事件。
联接的可视化表示
下图表示一个查询,该查询返回在最后一分钟执行发货的所有订单。
触发结果
下面介绍了触发查询结果的事件。
由于未在 Shipments 流上指定时间或行窗口,因此只有Shipments 流的当前行参与联接。
由于对 Orders 流的查询指定了前一分钟的窗口,因此 Orders 流中最后一分钟带有 ROWTIME 的行将参与联接。
当 Shipments 流中的记录对于 orderid 104 在 10:00:45到达时,将触发 JOIN 结果,因为前一分钟在 Orders 流中对于 orderid 存在匹配项。
Orders 流中 Orderid 为 100 的记录到达较迟,因此 Shipments 流中的对应记录不是最新记录。由于未在 Shipments 流上指定任何窗口,因此只有Shipments 流的当前行参与联接。因此,JOIN 语句不会针对 orderid 100 返回任何记录。有关在 JOIN 语句中包含较迟行的信息,请参阅 示例 2。
因为在 Shipments 流中对于 Orderid 105 没有匹配的记录,所以不会发出任何结果,并且该记录将被忽略。
结果的 ROWTIME
输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。
示例 2: JOIN (INNER JOIN) 两侧的时间窗口
此示例演示了一个查询,该查询返回最后一分钟执行的所有订单以及最后一分钟执行的发货。
联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "shipmenttime" timestamp, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", s.rowtime as "shipmenttime", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o JOIN SHIPMENTS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM | SHIPMENTS_STREAM | OUTPUT_STREAM | |||||
---|---|---|---|---|---|---|---|
ROWTIME | orderid | ROWTIME | orderid | resultrowtime | orderid | shipmenttime | OrderTime |
10:00:00 | 101 | 10:00:00 | 100 | ||||
10:00:20 | 102 | ||||||
10:00:30 | 103 | ||||||
10:00:40 | 104 | ||||||
10:00:45 | 104 | ||||||
10:00:45 | 100* | 10:00:45 | 104 | 10:00:45 | 10:00:40 | ||
10:00:45 | 100 | 10:00:00 | 10:00:45 | ||||
10:00:50 | 105 |
* - orderid = 100 的记录是 Orders 流中的较迟事件。
联接的可视化表示
下图表示一个查询,该查询返回最后一分钟执行的所有订单以及最后一分钟执行的发货。
触发结果
下面介绍了触发查询结果的事件。
在联接的两侧指定窗口。因此,Orders 流和 Shipments 流的当前行之前的一分钟内的所有行都参与联接。
当 Shipments 流中与 orderid 104 对应的记录到达时,Orders 流中的对应记录在一分钟窗口内。因此,一条记录返回到 Output 流。
即使 orderid 100 的订单事件在 Orders 流中到达较晚,也返回了联接结果。这是因为 Shipments 流中的窗口包括过去一分钟的订单,其中包括对应的记录。
在联接的两侧设置一个窗口有助于在联接的两侧包含迟到的记录;例如,如果订单或发货记录延迟收到或出现问题。
结果的 ROWTIME
输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。
示例 3:RIGHT JOIN (RIGHT OUTER JOIN) 一侧的时间窗口
此示例演示了一个查询,该查询返回最后一分钟执行的所有发货,无论最后一分钟是否存在对应的订单。
联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o RIGHT JOIN SHIPMENTS_STREAM AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM | SHIPMENTS_STREAM | OUTPUT_STREAM | ||||
---|---|---|---|---|---|---|
ROWTIME | orderid | ROWTIME | orderid | resultrowtime | orderid | OrderTime |
10:00:00 | 101 | 10:00:00 | 100 | |||
10:00:00 | 100 | null | ||||
10:00:20 | 102 | |||||
10:00:30 | 103 | |||||
10:00:40 | 104 | |||||
10:00:45 | 104 | |||||
10:00:45 | 100* | 10:00:45 | 104 | 10:00:40 | ||
10:00:50 | 105 | |||||
10:00:50 | 105 | null |
* - orderid = 100 的记录是 Orders 流中的较迟事件。
联接的可视化表示
下图表示一个查询,该查询返回最后一分钟执行的所有装运,无论是否在最后一分钟内存在对应的订单。
触发结果
下面介绍了触发查询结果的事件。
当 Shipments 流中与 orderid 104 对应的记录到达时,将在 Output 流中发出结果。
只要 Shipments 流中与 orderid 105 对应的记录到达,就在 Output 流中发出一条记录。但是,订单流中没有匹配的记录,因此该 OrderTime 值为空。
结果的 ROWTIME
输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。
因为联接(Shipments 流)的右侧没有窗口,所以具有不匹配联接的结果的 ROWTIME 是不匹配行的 ROWTIME。
示例 4:RIGHT JOIN (RIGHT OUTER JOIN) 两侧的时间窗口
此示例演示一个查询,该查询返回最后一分钟执行的所有发货,无论它们是否具有对应的订单。
联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "shipmenttime" timestamp, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", s.ROWTIME as "shipmenttime", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o RIGHT JOIN SHIPMENTS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM | SHIPMENTS_STREAM | OUTPUT_STREAM | |||||
---|---|---|---|---|---|---|---|
ROWTIME | orderid | ROWTIME | orderid | resultrowtime | orderid | shipmenttime | OrderTime |
10:00:00 | 101 | 10:00:00 | 100 | ||||
10:00:20 | 102 | ||||||
10:00:30 | 103 | ||||||
10:00:40 | 104 | ||||||
10:00:45 | 104 | ||||||
10:00:45 | 100* | 10:00:45 | 104 | 10:00:40 | 10:00:45 | ||
10:00:45 | 100 | 10:00:45 | 10:00:00 | ||||
10:00:50 | 105 | ||||||
10:01:50 | 105 | 10:00:50 | null |
* - orderid = 100 的记录是 Orders 流中的较迟事件。
联接的可视化表示
下图表示一个查询,该查询返回最后一分钟执行的所有发货,无论它们是否具有对应的订单。
触发结果
下面介绍了触发查询结果的事件。
当 Shipments 流中与 orderid 104 对应的记录到达时,将在 Output 流中发出结果。
即使 orderid 100 的订单事件在 Orders 流中到达较晚,也会返回联接结果。这是因为 Shipments 流中的窗口包括过去一分钟的订单,其中包括对应的记录。
对于未找到其订单的发货(对于 orderid 105),直到 Shipments 流上的一分钟窗口结束时,结果才会发送到 Output 流。
结果的 ROWTIME
输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。
对于没有匹配的订单记录的发货记录,结果的 ROWTIME 是窗口末尾的 ROWTIME。这是因为联接的右侧(来自 Shipments 流)现在是事件的一分钟窗口,并且服务正在等待窗口结束以确定是否有任何匹配的记录到达。当窗口结束且没有找到匹配的记录时,将以与窗口结束对应的 ROWTIME 发出结果。
示例 5:LEFT JOIN (LEFT OUTER JOIN) 一侧的时间窗口
此示例演示了一个查询,该查询返回最后一分钟执行的所有订单,无论最后一分钟是否存在对应的发货。
联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", o."orderid", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o LEFT JOIN SHIPMENTS_STREAM AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM | SHIPMENTS_STREAM | OUTPUT_STREAM | ||||
---|---|---|---|---|---|---|
ROWTIME | orderid | ROWTIME | orderid | resultrowtime | orderid | OrderTime |
10:00:00 | 101 | 10:00:00 | 100 | |||
10:00:20 | 102 | |||||
10:00:30 | 103 | |||||
10:00:40 | 104 | |||||
10:00:45 | 104 | |||||
10:00:45 | 100* | 10:00:45 | 104 | 10:00:40 | ||
10:00:50 | 105 | |||||
10:01:00 | 101 | 10:00:00 | ||||
10:01:20 | 102 | 10:00:20 | ||||
10:01:30 | 103 | 10:00:30 | ||||
10:01:40 | 104 | 10:00:40 | ||||
10:01:45 | 100 | 10:00:45 |
* - orderid = 100 的记录是 Orders 流中的较迟事件。
联接的可视化表示
下图表示一个查询,该查询返回最后一分钟执行的所有订单,无论最后一分钟是否存在对应的发货。
触发结果
下面介绍了触发查询结果的事件。
当 Shipments 流中与 orderid 104 对应的记录到达时,将在 Output 流中发出结果。
对于位于 Orders 流中但在 Shipments 流中没有相应记录的记录,直到一分钟窗口结束时才会将记录发出到 Output 流。这是因为服务正在等待直到窗口结束以获取匹配的记录。
结果的 ROWTIME
输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。
对于位于 Orders 流中但在 Shipments 流中没有相应记录的记录,结果的 ROWTIME 是当前窗口结束的 ROWTIME。
示例 6:LEFT JOIN (LEFT OUTER JOIN) 两侧的时间窗口
此示例演示一个查询,该查询返回最后一分钟执行的所有订单,无论它们是否具有对应的发货。
联接查询
CREATE OR REPLACE STREAM "OUTPUT_STREAM" ("resultrowtime" timestamp, "orderid" int, "shipmenttime" timestamp, "ordertime" timestamp); CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS INSERT INTO "OUTPUT_STREAM" SELECT STREAM ROWTIME as "resultrowtime", s."orderid", s.ROWTIME as "shipmenttime", o.ROWTIME as "ordertime" FROM ORDERS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o LEFT JOIN SHIPMENTS_STREAM OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS s ON o."orderid" = s."orderid";
查询结果
ORDERS_STREAM | SHIPMENTS_STREAM | OUTPUT_STREAM | |||||
---|---|---|---|---|---|---|---|
ROWTIME | orderid | ROWTIME | orderid | resultrowtime | orderid | shipmenttime | OrderTime |
10:00:00 | 101 | 10:00:00 | 100 | ||||
10:00:20 | 102 | ||||||
10:00:30 | 103 | ||||||
10:00:40 | 104 | ||||||
10:00:45 | 104 | ||||||
10:00:45 | 100* | 10:00:45 | 104 | 10:00:40 | 10:00:45 | ||
10:00:50 | 105 | 10:00:45 | 100 | 10:00:00 | 10:00:45 | ||
10:01:00 | 101 | null | 10:00:00 | ||||
10:01:20 | 102 | null | 10:00:20 | ||||
10:01:30 | 103 | null | 10:00:30 | ||||
10:01:40 | 104 | null | 10:00:40 | ||||
10:01:45 | 100 | null | 10:00:45 |
* - orderid = 100 的记录是 Orders 流中的较迟事件。
联接的可视化表示
下图表示一个查询,该查询返回最后一分钟执行的所有订单,无论它们是否具有对应的发货。
触发结果
下面介绍了触发查询结果的事件。
当 Shipments 流中与 orderid 104 和 100 对应的记录到达时,将在 Output 流中发出结果。即使 Orders 流中与 orderid 100 对应的记录较迟到达,也会发生这种情况。
对于位于 Orders 流中但在 Shipments 流中没有相应记录的记录,将在一分钟窗口结束时在 Output 流中发出。这是因为该服务一直等到窗口结束,以获取 Shipments 流中的对应记录。
结果的 ROWTIME
输出流中记录的 ROWTIME 是与联接匹配的行的 ROWTIME 的较迟者。
对于位于 Orders 流中但在 Shipments 流中没有相应记录的记录,订单的 ROWTIME 是与窗口结束相对应的 ROWTIME。
摘要
Kinesis Data Analytics 始终按照 ROWTIME 的升序返回连接中的行。
对于内部联接,输出行的 ROWTIME 是两个输入行的 ROWTIME 的较迟者。这也适用于在其中发现了匹配项的输入行的外部联接。
对于未发现其匹配项的外部联接,输出行的 ROWTIME 是以下两个时间的较迟者:
未发现其匹配项的输入行的 ROWTIME。
在已发现任何可能的匹配项的时间点,另一个输入流的窗口的较迟边界。
流-表联接
如果其中一个关系是流而另一个关系是有限关系,则将其称为流表连接。对于流中的每一行,查询会查找表中与连接条件相匹配的一行或多行。
例如,订单是一个数据流 PriceList ,是一个表。联接的效果是向订单添加价目表信息。
有关创建参考数据源和将数据流加入参考表的信息,请参阅 Amazon Kinesis 数据分析开发者指南中的示例:添加参考数据源。