时间:2021-07-01 10:21:17 帮助过:17人阅读
表查询示例:
SELECT *
FROM Orders;
rowtime | productId | orderId | units
----------+-----------+---------+-------
08:30:00 | 10 | 1 | 3
08:45:10 | 20 | 2 | 1
09:12:21 | 10 | 3 | 10
09:27:44 | 30 | 4 | 2
4 records returned.
流和表的查询不能混用,否则会报错
SELECT * FROM Shipments;
ERROR: Cannot convert stream ‘SHIPMENTS‘ to a table
SELECT STREAM * FROM Products;
ERROR: Cannot convert table ‘PRODUCTS‘ to a stream
SELECT STREAM rowtime, productId
FROM (
SELECT TUMBLE_END(rowtime, INTERVAL ‘1‘ HOUR) AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS su
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1‘ HOUR), productId)
WHERE c > 2 OR su > 10;
rowtime | productId
----------+-----------
10:00:00 | 30
11:00:00 | 10
11:00:00 | 40
两个窗口之间数据没有重叠。
SELECT STREAM CEIL(rowtime TO HOUR) AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY CEIL(rowtime TO HOUR), productId;
rowtime | productId | c | units
----------+-----------+---------+-------
11:00:00 | 30 | 2 | 24
11:00:00 | 10 | 1 | 1
11:00:00 | 20 | 1 | 7
12:00:00 | 10 | 3 | 11
12:00:00 | 40 | 1 | 12
该示例每个小时结束的时候,输出这个小时的统计结果,11点整,输出1点的统计结果。以事件为驱动,内部不包含定时器。
下面的例子和上面的例子等价
SELECT STREAM TUMBLE_END(rowtime, INTERVAL ‘1‘ HOUR) AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1‘ HOUR), productId;
rowtime | productId | c | units
----------+-----------+---------+-------
11:00:00 | 30 | 2 | 24
11:00:00 | 10 | 1 | 1
11:00:00 | 20 | 1 | 7
12:00:00 | 10 | 3 | 11
12:00:00 | 40 | 1 | 12
又比如,需要没半个小时输出一次结果,以12分钟为对齐时间,
SELECT STREAM
TUMBLE_END(rowtime, INTERVAL ‘30‘ MINUTE, TIME ‘0:12‘) AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘30‘ MINUTE, TIME ‘0:12‘),
productId;
rowtime | productId | c | units
----------+-----------+---------+-------
10:42:00 | 30 | 2 | 24
10:42:00 | 10 | 1 | 1
10:42:00 | 20 | 1 | 7
11:12:00 | 10 | 2 | 7
11:12:00 | 40 | 1 | 12
11:42:00 | 10 | 1 | 4
两个窗口之间的数据有一定重叠。
跳动窗口是广义化的翻滚窗,允许数据在窗口中保存更长时间。
比如下面的例子,数据输出时间为11:00,但是其中还包含08:00到11:00的数据,以及09:00到12:00的数据,一个输入行对应三个输出行。
SELECT STREAM
HOP_END(rowtime, INTERVAL ‘1‘ HOUR, INTERVAL ‘3‘ HOUR) AS rowtime,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY HOP(rowtime, INTERVAL ‘1‘ HOUR, INTERVAL ‘3‘ HOUR);
rowtime | c | units
----------+----------+-------
11:00:00 | 4 | 27
12:00:00 | 8 | 50
Calcite中的滑动窗采用标准的Over方式,直接套用了标准SQL中的分析函数。
SELECT STREAM rowtime,
productId,
units,
SUM(units) OVER (ORDER BY rowtime RANGE INTERVAL ‘1‘ HOUR PRECEDING) unitsLastHour
FROM Orders;
下面的一个例子展示了过去10分钟的平均订单大小和上周平均订单的比较数据
SELECT STREAM *
FROM (
SELECT STREAM rowtime,
productId,
units,
AVG(units) OVER product (RANGE INTERVAL ‘10‘ MINUTE PRECEDING) AS m10,
AVG(units) OVER product (RANGE INTERVAL ‘7‘ DAY PRECEDING) AS d7
FROM Orders
WINDOW product AS (
ORDER BY rowtime
PARTITION BY productId))
WHERE m10 > d7;
在这个例子中,使用Window子句来定义部分窗口,然后在每个OVER子句中在进行细化。初次之外,还可以在window子句中定义所有的窗口。
这种实现方式中,在后台同时维护了两张表,10分钟和7天的窗口数据。你可以直接访问到这些表,不需要做显示的查询。
这种语法还可以实现其他一些功能:
* 行组窗口
* 引用尚未到达的行,流将等待,直到它们到达。
* 可以支持其他RANK等统计分析函数
下面的查询显示了这样一个场景,返回每个记录的统计结果,但是该结果会在固定时间被重置。
SELECT STREAM rowtime,
productId,
units,
SUM(units) OVER (PARTITION BY FLOOR(rowtime TO HOUR)) AS unitsSinceTopOfHour
FROM Orders;
这种方式类似滑窗的查询,但是单调表达式发生在Partition by子句中。随着时间从10:59:59到11:00:00,Floor从10:00:00变为11:00:00,因此,一个新的分组开始产生了。sum的统一结果开始重置。
Calcite知道旧分组永远不会再次使用,因此会从内部存储中删除该分组的所有统计结果。
使用Window语法和Over方式可以做到。
这是作者在Calcite的StreamSQL中提出的概念。
如果一个列或者表达式是递增或者递减的,那么就成为是单调的。
如果列或者表达式是乱序的,并且有一种机制(比如标点符号或者水印)来生成特定值永远不会被看到,那么这列或者表达式就是准单调的。
概念很南理解,但是其实就是要求流上的数据是全局有序的。可以是事件顺序,或者事件id的顺序。一般情况下,我们会自动为事件补齐时间。
有了这种顺序,我们就能很容易实现水印这样的功能了。
CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) AS
SELECT TUMBLE_END(rowtime, INTERVAL ‘1‘ HOUR),
productId,
COUNT(*),
SUM(units)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1‘ HOUR), productId;
SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;
rowtime | productId
----------+-----------
10:00:00 | 30
11:00:00 | 10
11:00:00 | 40
看看上面的视图,这个是一张表还是一个流?
因为它没有使用Stream关键字,所以必然是一个关系,是一张表,但是它是可以被转化为流的表。你可以在流和关系的查询中使用它。
和它等价的查询还有:
WITH HourlyOrderTotals (rowtime, productId, c, su) AS (
SELECT TUMBLE_END(rowtime, INTERVAL ‘1‘ HOUR),
productId,
COUNT(*),
SUM(units)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1‘ HOUR), productId)
SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;
rowtime | productId
----------+-----------
10:00:00 | 30
11:00:00 | 10
11:00:00 | 40
这种方法不限于子查询和视图,流式SQL中的每个查询都被定义为关系查询,并且使用最顶层Select子句中的Stream关键字被转换为流。
流上的Join分为两种,流和表的Join以及流和流的Join
流上的Join实际都是窗口和窗口的JOin,或者窗口和表的Join,本质上都是表之间的Join,因为窗口就是一张表。
比如下面一个流和表之间的Join
SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
p.name, p.unitPrice
FROM Orders AS o
JOIN Products AS p
ON o.productId = p.productId;
rowtime | productId | orderId | units | name | unitPrice
----------+-----------+---------+-------+ -------+-----------
10:17:00 | 30 | 5 | 4 | Cheese | 17
10:17:05 | 10 | 6 | 1 | Beer | 0.25
10:18:05 | 20 | 7 | 2 | Wine | 6
10:18:07 | 30 | 8 | 20 | Cheese | 17
11:02:00 | 10 | 9 | 6 | Beer | 0.25
11:04:00 | 10 | 10 | 1 | Beer | 0.25
11:09:30 | 40 | 11 | 12 | Bread | 100
11:24:11 | 10 | 12 | 4 | Beer | 0.25
Order是流,Products是表。两个Join之后结果肯定是流,然后,因为没有窗口,所以默认情况下应该是一个仅仅保存当前数据的长度为1的窗口,当前Order数据和Products做Join。
流和流的Join如下:
SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1‘ HOUR;
rowtime | productId | orderId | shipTime
----------+-----------+---------+----------
10:17:00 | 30 | 5 | 10:55:00
10:17:05 | 10 | 6 | 10:20:00
11:02:00 | 10 | 9 | 11:58:00
11:24:11 | 10 | 12 | 11:44:00
这个查询中没有显式的定义窗口,但是实际上已经通过where条件来锁定了数据范围。也就是说,会自动将数据保存在一个窗口中。
可以使用Create View语句来创建视图,上面已经可以看到,同时,也可以使用Insert AS select的方式将流上的数据导入其他流。
比如:
CREATE VIEW LargeOrders AS
SELECT STREAM * FROM Orders WHERE units > 1000;
INSERT INTO LargeOrders
SELECT STREAM * FROM Orders WHERE units > 1000;
还可以通过Upsert语句来维护窗口数据
UPSERT INTO OrdersSummary
SELECT STREAM productId,
COUNT(*) OVER lastHour AS c
FROM Orders
WINDOW lastHour AS (
PARTITION BY productId
ORDER BY rowtime
RANGE INTERVAL ‘1‘ HOUR PRECEDING)
> SELECT STREAM * FROM (VALUES (1, ‘abc‘));
ERROR: Cannot stream VALUES
引用:
http://calcite.apache.org/docs/stream.html
Calcite中的流式SQL
标签:视图 text 生成 趋势 pretty 场景 情况 转换 部分