Talk about Group Windows of flink Table.

  flink

Order

This article mainly studies Group Windows of flink Table.

Example

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w")  // group the table by window w
  .select("b.sum");  // aggregate

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, b.sum");  // aggregate

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
  • The Window operation can alias the window, and then can be referenced in groupBy and select. the window has start, end and rowtime attributes, where start and rowtime are inclusive and end is exclusive.

Tumbling Windows instance

// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));
  • Tumbling Windows moves by a fixed window size, so windows do not overlap; The over method is used to specify the window size; The window size can be defined based on event-time, processing-time, row-count.

Sliding Windows instance

// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
  • When Sliding Windows is smaller than window size, the windows will overlap, so rows may belong to multiple windows. The over method is used to specify the window size, which can be defined based on event-time, processing-time, row-count; The every method is used to specify slide interval

Session Windows instance

// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"));

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));
  • Session Windows does not have a fixed window size. It closes windows based on the degree of inactivity. The withGap method is used to specify the gap of two windows as time interval; . Session Windows can only use event-time or processing-time

Table.window

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {

  //......
  
  def window(window: Window): WindowedTable = {
    new WindowedTable(this, window)
  }
  
  //......
}
  • Table provides Window operation, receives window parameters, and creates WindowedTable.

WindowedTable

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/table.scala

class WindowedTable(
    private[flink] val table: Table,
    private[flink] val window: Window) {

  def groupBy(fields: Expression*): WindowGroupedTable = {
    val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_))
    if (fields.size != fieldsWithoutWindow.size + 1) {
      throw new ValidationException("GroupBy must contain exactly one window alias.")
    }

    new WindowGroupedTable(table, fieldsWithoutWindow, window)
  }

  def groupBy(fields: String): WindowGroupedTable = {
    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
    groupBy(fieldsExpr: _*)
  }

}
  • WindowedTable only provides groupBy operations, where groupBy can receive parameters of String type or Expression type; Parameters of String type will be converted to Expression type, and the groupBy method of Expression type parameter will be called last. If groupBy has no other attribute than window, its parallelism is 1 and will only be executed on a single task; The groupBy method creates WindowGroupedTable.

WindowGroupedTable

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/table.scala

class WindowGroupedTable(
    private[flink] val table: Table,
    private[flink] val groupKeys: Seq[Expression],
    private[flink] val window: Window) {

  def select(fields: Expression*): Table = {
    val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)

    val projectsOnAgg = replaceAggregationsAndProperties(
      expandedFields, table.tableEnv, aggNames, propNames)

    val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField)

    new Table(table.tableEnv,
      Project(
        projectsOnAgg,
        WindowAggregate(
          groupKeys,
          window.toLogicalWindow,
          propNames.map(a => Alias(a._1, a._2)).toSeq,
          aggNames.map(a => Alias(a._1, a._2)).toSeq,
          Project(projectFields, table.logicalPlan).validate(table.tableEnv)
        ).validate(table.tableEnv),
        // required for proper resolution of the time attribute in multi-windows
        explicitAlias = true
      ).validate(table.tableEnv))
  }

  def select(fields: String): Table = {
    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    //get the correct expression for AggFunctionCall
    val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
    select(withResolvedAggFunctionCall: _*)
  }
}
  • WindowGroupedTable only provides select operation, in which select can receive parameters of type String or type Expression; Parameters of String type are converted to Expression type, and the select method of Expression type parameter is finally called. The select method creates a new Table, whose Project’s child is WindowAggregate

Summary

  • The Window operation can alias the window, and then can be referenced in groupBy and select. the window has start, end and rowtime attributes, where start and rowtime are inclusive and end is exclusive.
  • Tumbling Windows moves by a fixed window size, so windows do not overlap; The over method is used to specify the window size; The window size can be defined based on event-time, processing-time, row-count; When Sliding Windows is smaller than window size, the windows will overlap, so rows may belong to multiple windows. The over method is used to specify the window size, which can be defined based on event-time, processing-time, row-count; The every method is used to specify slideinterval; Session Windows does not have a fixed window size. It closes windows based on the degree of inactivity. The withGap method is used to specify the gap of two windows as time interval; . Session Windows can only use event-time or processing-time
  • Table provides window operation, receives Window parameters, and creates WindowedTable; ; WindowedTable only provides groupBy operations, where groupBy can receive parameters of String type or Expression type; Parameters of String type will be converted to Expression type, and the groupBy method of Expression type parameter will be called last. If groupBy has no other attribute than window, its parallelism is 1 and will only be executed on a single task; The groupBy method creates WindowGroupedTable; ; WindowGroupedTable only provides select operation, in which select can receive parameters of type String or type Expression; Parameters of String type are converted to Expression type, and the select method of Expression type parameter is finally called. The select method creates a new Table, whose Project’s child is WindowAggregate

doc