网站首页 > 博客文章 正文
序
本文主要研究一下flink Table的Group Windows
实例
Table table = input .window([Window w].as("w")) // define window with alias w .groupBy("w") // group the table by window w .select("b.sum"); // aggregate ? Table table = input .window([Window w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w .select("a, b.sum"); // aggregate ? Table table = input .window([Window w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
- window操作可以对Window进行别名,然后可以在groupBy及select中引用,window有start、end、rowtime属性可以用,其中start及rowtime是inclusive的,而end为exclusive
Tumbling Windows实例
// Tumbling Event-time Window .window(Tumble.over("10.minutes").on("rowtime").as("w")); ? // Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble.over("10.minutes").on("proctime").as("w")); ? // Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble.over("10.rows").on("proctime").as("w"));
- Tumbling Windows按固定窗口大小来移动,因而窗口不重叠;over方法用于指定窗口大小;窗口大小可以基于event-time、processing-time、row-count来定义
Sliding Windows实例
// Sliding Event-time Window .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w")); ? // Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w")); ? // Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
- Sliding Windows在slide interval小于window size的时候,窗口会有重叠,因而rows可能归属多个窗口;over方法用于指定窗口大小,窗口大小可以基于event-time、processing-time、row-count来定义;every方法用于指定slide interval
Session Windows实例
// Session Event-time Window .window(Session.withGap("10.minutes").on("rowtime").as("w")); ? // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session.withGap("10.minutes").on("proctime").as("w"));
- Session Windows没有固定的窗口大小,它基于inactivity的程度来关闭窗口,withGap方法用于指定两个窗口的gap,作为time interval;Session Windows只能使用event-time或者processing-time
Table.window
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val logicalPlan: LogicalNode) { ? //...... def window(window: Window): WindowedTable = { new WindowedTable(this, window) } //...... }
- Table提供了window操作,接收Window参数,创建的是WindowedTable
WindowedTable
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class WindowedTable( private[flink] val table: Table, private[flink] val window: Window) { ? def groupBy(fields: Expression*): WindowGroupedTable = { val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_)) if (fields.size != fieldsWithoutWindow.size + 1) { throw new ValidationException("GroupBy must contain exactly one window alias.") } ? new WindowGroupedTable(table, fieldsWithoutWindow, window) } ? def groupBy(fields: String): WindowGroupedTable = { val fieldsExpr = ExpressionParser.parseExpressionList(fields) groupBy(fieldsExpr: _*) } ? }
- WindowedTable只提供groupBy操作,其中groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的groupBy方法;如果groupBy除了window没有其他属性,则其parallelism为1,只会在单一task上执行;groupBy方法创建的是WindowGroupedTable
WindowGroupedTable
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
class WindowGroupedTable( private[flink] val table: Table, private[flink] val groupKeys: Seq[Expression], private[flink] val window: Window) { ? def select(fields: Expression*): Table = { val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv) val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv) ? val projectsOnAgg = replaceAggregationsAndProperties( expandedFields, table.tableEnv, aggNames, propNames) ? val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField) ? new Table(table.tableEnv, Project( projectsOnAgg, WindowAggregate( groupKeys, window.toLogicalWindow, propNames.map(a => Alias(a._1, a._2)).toSeq, aggNames.map(a => Alias(a._1, a._2)).toSeq, Project(projectFields, table.logicalPlan).validate(table.tableEnv) ).validate(table.tableEnv), // required for proper resolution of the time attribute in multi-windows explicitAlias = true ).validate(table.tableEnv)) } ? def select(fields: String): Table = { val fieldExprs = ExpressionParser.parseExpressionList(fields) //get the correct expression for AggFunctionCall val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv)) select(withResolvedAggFunctionCall: _*) } }
- WindowGroupedTable只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的child为WindowAggregate
小结
- window操作可以对Window进行别名,然后可以在groupBy及select中引用,window有start、end、rowtime属性可以用,其中start及rowtime是inclusive的,而end为exclusive
- Tumbling Windows按固定窗口大小来移动,因而窗口不重叠;over方法用于指定窗口大小;窗口大小可以基于event-time、processing-time、row-count来定义;Sliding Windows在slide interval小于window size的时候,窗口会有重叠,因而rows可能归属多个窗口;over方法用于指定窗口大小,窗口大小可以基于event-time、processing-time、row-count来定义;every方法用于指定slide interval;Session Windows没有固定的窗口大小,它基于inactivity的程度来关闭窗口,withGap方法用于指定两个窗口的gap,作为time interval;Session Windows只能使用event-time或者processing-time
- Table提供了window操作,接收Window参数,创建的是WindowedTable;WindowedTable只提供groupBy操作,其中groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的groupBy方法;如果groupBy除了window没有其他属性,则其parallelism为1,只会在单一task上执行;groupBy方法创建的是WindowGroupedTable;WindowGroupedTable只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的child为WindowAggregate
doc
- Group Windows
猜你喜欢
- 2024-10-03 FlinkSQL全面指南(flinksql udf)
- 2024-10-03 Apache Flink学习笔记(六)Table API
- 2024-10-03 Flink Table API & SQL 聚合性能调优
- 2024-10-03 美团点评基于 Flink 的实时数仓平台实践
- 2024-10-03 Flink SQL 动态表 & 连续查询详解(建议收藏)
- 2024-10-03 Flink 1.11 与 Hive 批流一体数仓实践
- 2024-10-03 Flink SQL中的动态表和临时表(flink sql动态查询)
- 2024-10-03 三分钟速懂大数据Flink | 窗口操作
- 2024-10-03 大数据_Flink_Java版_Table API 和 Flink SQL(1)_基本介绍和简单示例
- 2024-10-03 新一代大数据计算引擎 Flink从入门到实战
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- messagesource (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)