专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊flink Table的Group Windows(flink table api window)

baijin 2024-10-03 17:36:32 博客文章 8 ℃ 0 评论

本文主要研究一下flink Table的Group Windows

实例

Table table = input
 .window([Window w].as("w")) // define window with alias w
 .groupBy("w") // group the table by window w
 .select("b.sum"); // aggregate
?
Table table = input
 .window([Window w].as("w")) // define window with alias w
 .groupBy("w, a") // group the table by attribute a and window w 
 .select("a, b.sum"); // aggregate
?
Table table = input
 .window([Window w].as("w")) // define window with alias w
 .groupBy("w, a") // group the table by attribute a and window w 
 .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
  • window操作可以对Window进行别名,然后可以在groupBy及select中引用,window有start、end、rowtime属性可以用,其中start及rowtime是inclusive的,而end为exclusive

Tumbling Windows实例

// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));
?
// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));
?
// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));
  • Tumbling Windows按固定窗口大小来移动,因而窗口不重叠;over方法用于指定窗口大小;窗口大小可以基于event-time、processing-time、row-count来定义

Sliding Windows实例

// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
?
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
?
// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
  • Sliding Windows在slide interval小于window size的时候,窗口会有重叠,因而rows可能归属多个窗口;over方法用于指定窗口大小,窗口大小可以基于event-time、processing-time、row-count来定义;every方法用于指定slide interval

Session Windows实例

// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"));
?
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));
  • Session Windows没有固定的窗口大小,它基于inactivity的程度来关闭窗口,withGap方法用于指定两个窗口的gap,作为time interval;Session Windows只能使用event-time或者processing-time

Table.window

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(
 private[flink] val tableEnv: TableEnvironment,
 private[flink] val logicalPlan: LogicalNode) {
?
 //......
 
 def window(window: Window): WindowedTable = {
 new WindowedTable(this, window)
 }
 
 //......
}
  • Table提供了window操作,接收Window参数,创建的是WindowedTable

WindowedTable

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class WindowedTable(
 private[flink] val table: Table,
 private[flink] val window: Window) {
?
 def groupBy(fields: Expression*): WindowGroupedTable = {
 val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_))
 if (fields.size != fieldsWithoutWindow.size + 1) {
 throw new ValidationException("GroupBy must contain exactly one window alias.")
 }
?
 new WindowGroupedTable(table, fieldsWithoutWindow, window)
 }
?
 def groupBy(fields: String): WindowGroupedTable = {
 val fieldsExpr = ExpressionParser.parseExpressionList(fields)
 groupBy(fieldsExpr: _*)
 }
?
}
  • WindowedTable只提供groupBy操作,其中groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的groupBy方法;如果groupBy除了window没有其他属性,则其parallelism为1,只会在单一task上执行;groupBy方法创建的是WindowGroupedTable

WindowGroupedTable

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class WindowGroupedTable(
 private[flink] val table: Table,
 private[flink] val groupKeys: Seq[Expression],
 private[flink] val window: Window) {
?
 def select(fields: Expression*): Table = {
 val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
 val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)
?
 val projectsOnAgg = replaceAggregationsAndProperties(
 expandedFields, table.tableEnv, aggNames, propNames)
?
 val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField)
?
 new Table(table.tableEnv,
 Project(
 projectsOnAgg,
 WindowAggregate(
 groupKeys,
 window.toLogicalWindow,
 propNames.map(a => Alias(a._1, a._2)).toSeq,
 aggNames.map(a => Alias(a._1, a._2)).toSeq,
 Project(projectFields, table.logicalPlan).validate(table.tableEnv)
 ).validate(table.tableEnv),
 // required for proper resolution of the time attribute in multi-windows
 explicitAlias = true
 ).validate(table.tableEnv))
 }
?
 def select(fields: String): Table = {
 val fieldExprs = ExpressionParser.parseExpressionList(fields)
 //get the correct expression for AggFunctionCall
 val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
 select(withResolvedAggFunctionCall: _*)
 }
}
  • WindowGroupedTable只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的child为WindowAggregate

小结

  • window操作可以对Window进行别名,然后可以在groupBy及select中引用,window有start、end、rowtime属性可以用,其中start及rowtime是inclusive的,而end为exclusive
  • Tumbling Windows按固定窗口大小来移动,因而窗口不重叠;over方法用于指定窗口大小;窗口大小可以基于event-time、processing-time、row-count来定义;Sliding Windows在slide interval小于window size的时候,窗口会有重叠,因而rows可能归属多个窗口;over方法用于指定窗口大小,窗口大小可以基于event-time、processing-time、row-count来定义;every方法用于指定slide interval;Session Windows没有固定的窗口大小,它基于inactivity的程度来关闭窗口,withGap方法用于指定两个窗口的gap,作为time interval;Session Windows只能使用event-time或者processing-time
  • Table提供了window操作,接收Window参数,创建的是WindowedTable;WindowedTable只提供groupBy操作,其中groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的groupBy方法;如果groupBy除了window没有其他属性,则其parallelism为1,只会在单一task上执行;groupBy方法创建的是WindowGroupedTable;WindowGroupedTable只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的child为WindowAggregate

doc

  • Group Windows

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表