Talk about flink Table’s Over Windows

  flink

Order

This article mainly studies flink Table’s Over Windows.

Example

Table table = input
  .window([OverWindow w].as("w"))           // define over window with alias w
  .select("a, b.sum over w, c.min over w"); // aggregate over the over window w
  • Over Windows is like an over clause of SQL, which can be based on event-time, processing-time, or row-count; ; The concrete can be constructed by the Over class, in which the orderBy, predetermination, and as methods must be set; It has two categories: Unbounded and bound.

Unbounded Over Windows instance


// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));

// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));

// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"));
 
// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
  • Unbounded_range is used for event-time and processing-time to represent unbounded, and unbounded_row is used for row-count to represent unbounded.

Bounded Over Windows instance

// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))

// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))

// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
 
// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
  • For event-time and processing-time, use, for example, 1.minutes to represent Bounded, and for row-count, use, for example, 10.rows to represent Bounded

Table.window

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {

  //......  

  @varargs
  def window(overWindows: OverWindow*): OverWindowedTable = {

    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
      throw new TableException("Over-windows for batch tables are currently not supported.")
    }

    if (overWindows.size != 1) {
      throw new TableException("Over-Windows are currently only supported single window.")
    }

    new OverWindowedTable(this, overWindows.toArray)
  }

  //......

}    
  • Table provides the window method of OverWindow parameter, which is used for Over Windows operation. It creates OverWindowedTable.

OverWindow

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/windows.scala

/**
  * Over window is similar to the traditional OVER SQL.
  */
case class OverWindow(
    private[flink] val alias: Expression,
    private[flink] val partitionBy: Seq[Expression],
    private[flink] val orderBy: Expression,
    private[flink] val preceding: Expression,
    private[flink] val following: Expression)
  • OverWindow defines the alias, partitionBy, orderBy, predetermination, following attributes.

Over

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/java/windows.scala

object Over {

  /**
    * Specifies the time attribute on which rows are grouped.
    *
    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.
    *
    * For batch tables, refer to a timestamp or long attribute.
    */
  def orderBy(orderBy: String): OverWindowWithOrderBy = {
    val orderByExpr = ExpressionParser.parseExpression(orderBy)
    new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
  }

  /**
    * Partitions the elements on some partition keys.
    *
    * @param partitionBy some partition keys.
    * @return A partitionedOver instance that only contains the orderBy method.
    */
  def partitionBy(partitionBy: String): PartitionedOver = {
    val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray
    new PartitionedOver(partitionByExpr)
  }
}

class OverWindowWithOrderBy(
  private val partitionByExpr: Array[Expression],
  private val orderByExpr: Expression) {

  /**
    * Set the preceding offset (based on time or row-count intervals) for over window.
    *
    * @param preceding preceding offset relative to the current row.
    * @return this over window
    */
  def preceding(preceding: String): OverWindowWithPreceding = {
    val precedingExpr = ExpressionParser.parseExpression(preceding)
    new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
  }

}

class PartitionedOver(private val partitionByExpr: Array[Expression]) {

  /**
    * Specifies the time attribute on which rows are grouped.
    *
    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.
    *
    * For batch tables, refer to a timestamp or long attribute.
    */
  def orderBy(orderBy: String): OverWindowWithOrderBy = {
    val orderByExpr = ExpressionParser.parseExpression(orderBy)
    new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
  }
}

class OverWindowWithPreceding(
    private val partitionBy: Seq[Expression],
    private val orderBy: Expression,
    private val preceding: Expression) {

  private[flink] var following: Expression = _

  /**
    * Assigns an alias for this window that the following `select()` clause can refer to.
    *
    * @param alias alias for this over window
    * @return over window
    */
  def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))

  /**
    * Assigns an alias for this window that the following `select()` clause can refer to.
    *
    * @param alias alias for this over window
    * @return over window
    */
  def as(alias: Expression): OverWindow = {

    // set following to CURRENT_ROW / CURRENT_RANGE if not defined
    if (null == following) {
      if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
        following = CURRENT_ROW
      } else {
        following = CURRENT_RANGE
      }
    }
    OverWindow(alias, partitionBy, orderBy, preceding, following)
  }

  /**
    * Set the following offset (based on time or row-count intervals) for over window.
    *
    * @param following following offset that relative to the current row.
    * @return this over window
    */
  def following(following: String): OverWindowWithPreceding = {
    this.following(ExpressionParser.parseExpression(following))
  }

  /**
    * Set the following offset (based on time or row-count intervals) for over window.
    *
    * @param following following offset that relative to the current row.
    * @return this over window
    */
  def following(following: Expression): OverWindowWithPreceding = {
    this.following = following
    this
  }
}
  • The Over class is a help class for creating over window. It provides two methods, orderBy and partitionBy, respectively creating OverWindowWithOrderBy and PartitionedOver
  • PartitionedOver provides the orderBy method, creating an OverWindowWithOrderBy; ; OverWindowWithOrderBy provides the preceding method and creates OverWindowWithPreceding.
  • OverWindowWithPreceding includes partitionBy, orderBy, preceding attributes, which provide as method to create OverWindow, and following method to set following offset.

OverWindowedTable

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/table.scala

class OverWindowedTable(
    private[flink] val table: Table,
    private[flink] val overWindows: Array[OverWindow]) {

  def select(fields: Expression*): Table = {
    val expandedFields = expandProjectList(
      fields,
      table.logicalPlan,
      table.tableEnv)

    if(fields.exists(_.isInstanceOf[WindowProperty])){
      throw new ValidationException(
        "Window start and end properties are not available for Over windows.")
    }

    val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)

    new Table(
      table.tableEnv,
      Project(
        expandedOverFields.map(UnresolvedAlias),
        table.logicalPlan,
        // required for proper projection push down
        explicitAlias = true)
        .validate(table.tableEnv)
    )
  }

  def select(fields: String): Table = {
    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    //get the correct expression for AggFunctionCall
    val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
    select(withResolvedAggFunctionCall: _*)
  }
}
  • OverWindowedTable constructor requires overWindows parameter; It only provides select operation, in which select can receive parameters of type String or type Expression; Parameters of String type are converted to Expression type, and the select method of Expression type parameter is finally called. The select method creates a new Table, and its Project’s projectList is expanded fields.map (unresolved alias), while expanded fields are obtained through expanded fields (overwindows, table.tableenv)

Summary

  • Over Windows is like an over clause of SQL, which can be based on event-time, processing-time, or row-count;; The concrete can be constructed by the Over class, in which the orderBy, predetermination, and as methods must be set; It has two categories: Unbounded and bound (Unbounded_range is used for event-time and processing-time to represent unbounded, and unbounded_row is used for row-count to represent unbounded; For event-time and processing-time, use, for example, 1.minutes to represent Bounded, and for row-count, use, for example, 10.rows to represent Bounded)
  • Table provides the window method of OverWindow parameter, which is used for Over Windows operation. It creates OverWindowedTable; ; OverWindow defines alias, partitionBy, orderBy, preceding, following attributes; The Over class is a help class for creating over window. It provides two methods, orderBy and partitionBy, respectively creating OverWindowWithOrderBy and PartitionedOver, while PartitionedOver provides orderBy and creates OVERWINDOWWITHORDERBY. OverWindowWithOrderBy provides the method of preceding, creating OverWindowWithPreceding; ; OverWindowWithPreceding includes partitionBy, orderBy, preceding attributes, which provide as method to create OverWindow, and following method to set following offset.
  • OverWindowedTable constructor requires overWindows parameter; It only provides select operation, in which select can receive parameters of type String or type Expression; Parameters of String type are converted to Expression type, and the select method of Expression type parameter is finally called. The select method creates a new Table, and its Project’s projectList is expanded fields.map (unresolved alias), while expanded fields are obtained through expanded fields (overwindows, table.tableenv)

doc