专业的编程技术博客社区

网站首页 > 博客文章 正文

三分钟速懂大数据Flink | 窗口操作

baijin 2024-10-03 17:37:20 博客文章 5 ℃ 0 评论

时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算了。下面我们就来看看Table API和SQL中,怎么利用时间字段做窗口操作。

在Table API和SQL中,主要有两种窗口:Group Windows和Over Windows。

一、Table API中使用窗口

1. Group Windows

分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。

Table API中的Group Windows都是使用.Window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。

滚动窗口:

public class Flink08_TableApi_Window_1 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator<WaterSensor> waterSensorStream = env

.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60))

.assignTimestampsAndWatermarks(

WatermarkStrategy

.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))

.withTimestampAssigner((element, recordTimestamp) -> element.getTs())

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table table = tableEnv

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

table

.window(Tumble.over(lit(10).second()).on($("ts")).as("w")) // 定义滚动窗口并给窗口起一个别名

.groupBy($("id"), $("w")) // 窗口必须出现的分组字段中

.select($("id"), $("w").start(), $("w").end(), $("vc").sum())

.execute()

.print();

env.execute();

}

}

开窗四部曲:

  • 窗口类型
  • 窗口相关参数:比如窗口大小
  • 指定时间字段
  • 窗口别名

滑动窗口:

.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("ts")).as("w"))

会话窗口:

.window(Session.withGap(lit(6).second()).on($("ts")).as("w")

2. Over Windows

Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。

Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。

无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。

Unbounded Over Windows:

public class Flink09_TableApi_OverWindow_1 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator<WaterSensor> waterSensorStream = env

.fromElements(new WaterSensor("sensor_1", 1000L, 10),

new WaterSensor("sensor_1", 4000L, 40),

new WaterSensor("sensor_1", 2000L, 20),

new WaterSensor("sensor_2", 3000L, 30),

new WaterSensor("sensor_1", 5000L, 50),

new WaterSensor("sensor_2", 6000L, 60))

.assignTimestampsAndWatermarks(

WatermarkStrategy

.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(1))

.withTimestampAssigner((element, recordTimestamp) -> element.getTs())

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table table = tableEnv

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

table

.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_ROW).as("w"))

.select($("id"), $("ts"), $("vc").sum().over($("w")).as("sum_vc"))

.execute()

.print();

env.execute();

}

}

# 使用UNBOUNDED_RANGE

.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w"))

说明:

Bounded Over Windows

// 当事件时间向前算3s得到一个窗口

.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(lit(3).second()).as("w"))

// 当行向前推算2行算一个窗口

.window(Over.partitionBy($("id")).orderBy($("ts")).preceding(rowInterval(2L)).as("w"))

二、SQL API中使用窗口

1. Group Windows

SQL查询的分组窗口是通过GROUP BY子句定义的。类似于使用常规GROUP BY语句的查询,窗口分组语句的GROUP BY子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t

tEnv.executeSql("create table sensor(" +

"id string," +

"ts bigint," +

"vc int, " +

"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +

"watermark for t as t - interval '5' second)" +

"with("

+ "'connector' = 'filesystem',"

+ "'path' = 'input/sensor-sql.txt',"

+ "'format' = 'csv'"

+ ")");

tEnv

.sqlQuery(

"SELECT id, " +

" TUMBLE_START(t, INTERVAL '1' minute) as wStart, " +

" TUMBLE_END(t, INTERVAL '1' minute) as wEnd, " +

" SUM(vc) sum_vc " +

"FROM sensor " +

"GROUP BY TUMBLE(t, INTERVAL '1' minute), id"

)

.execute()

.print();

tEnv

.sqlQuery(

"SELECT id, " +

" hop_start(t, INTERVAL '1' minute, INTERVAL '1' hour) as wStart, " +

" hop_end(t, INTERVAL '1' minute, INTERVAL '1' hour) as wEnd, " +

" SUM(vc) sum_vc " +

"FROM sensor " +

"GROUP BY hop(t, INTERVAL '1' minute, INTERVAL '1' hour), id"

)

.execute()

.print();

2. Over Windows

tEnv

.sqlQuery(

"select " +

"id," +

"vc," +

"sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row)"

+ "from sensor"

)

.execute()

.print();

tEnv

.sqlQuery(

"select " +

"id," +

"vc," +

"count(vc) over w, " +

"sum(vc) over w " +

"from sensor " +

"window w as (partition by id order by t rows between 1 PRECEDING and current row)"

)

.execute()

.print();

总结

Flink Table API和SQL中,主要有:Group Windows和Over Windows两种窗口。分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。无界的Over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的Over window是用间隔的大小指定的。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表