网站首页 > 博客文章 正文
时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算了。下面我们就来看看Table API和SQL中,怎么利用时间字段做窗口操作。
在Table API和SQL中,主要有两种窗口:Group Windows和Over Windows。
一、Table API中使用窗口
1. Group Windows
分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。
Table API中的Group Windows都是使用.Window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。
滚动窗口:
public class Flink08_TableApi_Window_1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> waterSensorStream = env
.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
table
.window(Tumble.over(lit(10).second()).on($("ts")).as("w")) // 定义滚动窗口并给窗口起一个别名
.groupBy($("id"), $("w")) // 窗口必须出现的分组字段中
.select($("id"), $("w").start(), $("w").end(), $("vc").sum())
.execute()
.print();
env.execute();
}
}
开窗四部曲:
- 窗口类型
- 窗口相关参数:比如窗口大小
- 指定时间字段
- 窗口别名
滑动窗口:
.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("w"))
会话窗口:
.window(Session.withGap(lit(6).second()).on($("ts")).as("w")
2. Over Windows
Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。
Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。
无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。
Unbounded Over Windows:
public class Flink09_TableApi_OverWindow_1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> waterSensorStream = env
.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
table
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w"))
.select($("id"), $("ts"), $("vc").sum().over($("w")).as("sum_vc"))
.execute()
.print();
env.execute();
}
}
# 使用UNBOUNDED_RANGE
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w"))
说明:
Bounded Over Windows
// 当事件时间向前算3s得到一个窗口
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w"))
// 当行向前推算2行算一个窗口
.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(rowInterval(2L)).as("w"))
二、SQL API中使用窗口
1. Group Windows
SQL查询的分组窗口是通过GROUP BY子句定义的。类似于使用常规GROUP BY语句的查询,窗口分组语句的GROUP BY子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
tEnv.executeSql("create table sensor(" +
"id string," +
"ts bigint," +
"vc int, " +
"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
"watermark for t as t - interval '5' second)" +
"with("
+ "'connector' = 'filesystem',"
+ "'path' = 'input/sensor-sql.txt',"
+ "'format' = 'csv'"
+ ")");
tEnv
.sqlQuery(
"SELECT id, " +
" TUMBLE_START(t, INTERVAL '1' minute) as wStart, " +
" TUMBLE_END(t, INTERVAL '1' minute) as wEnd, " +
" SUM(vc) sum_vc " +
"FROM sensor " +
"GROUP BY TUMBLE(t, INTERVAL '1' minute), id"
)
.execute()
.print();
tEnv
.sqlQuery(
"SELECT id, " +
" hop_start(t, INTERVAL '1' minute, INTERVAL '1' hour) as wStart, " +
" hop_end(t, INTERVAL '1' minute, INTERVAL '1' hour) as wEnd, " +
" SUM(vc) sum_vc " +
"FROM sensor " +
"GROUP BY hop(t, INTERVAL '1' minute, INTERVAL '1' hour), id"
)
.execute()
.print();
2. Over Windows
tEnv
.sqlQuery(
"select " +
"id," +
"vc," +
"sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row)"
+ "from sensor"
)
.execute()
.print();
tEnv
.sqlQuery(
"select " +
"id," +
"vc," +
"count(vc) over w, " +
"sum(vc) over w " +
"from sensor " +
"window w as (partition by id order by t rows between 1 PRECEDING and current row)"
)
.execute()
.print();
总结
Flink Table API和SQL中,主要有:Group Windows和Over Windows两种窗口。分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。无界的Over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的Over window是用间隔的大小指定的。
猜你喜欢
- 2024-10-03 FlinkSQL全面指南(flinksql udf)
- 2024-10-03 Apache Flink学习笔记(六)Table API
- 2024-10-03 Flink Table API & SQL 聚合性能调优
- 2024-10-03 美团点评基于 Flink 的实时数仓平台实践
- 2024-10-03 Flink SQL 动态表 & 连续查询详解(建议收藏)
- 2024-10-03 Flink 1.11 与 Hive 批流一体数仓实践
- 2024-10-03 Flink SQL中的动态表和临时表(flink sql动态查询)
- 2024-10-03 大数据_Flink_Java版_Table API 和 Flink SQL(1)_基本介绍和简单示例
- 2024-10-03 新一代大数据计算引擎 Flink从入门到实战
- 2024-10-03 4位资深专家多年大厂经验分享出Flink技术内幕架构设计与实现原理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- messagesource (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)