Talk about OrderBy and Limit of flink Table

  flink

Order

This article mainly studies the OrderBy and Limit of flink Table.

Example

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");

Table in = tableEnv.fromDataSet(ds, "a, b, c");

// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5); 

// skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);

// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
  • The orderBy method is similar to sql orderby; Limit consists of offset and fetch methods, similar to sql offset and fetch

Table

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {

  //......

  def orderBy(fields: String): Table = {
    val parsedFields = ExpressionParser.parseExpressionList(fields)
    orderBy(parsedFields: _*)
  }

  def orderBy(fields: Expression*): Table = {
    val order: Seq[Ordering] = fields.map {
      case o: Ordering => o
      case e => Asc(e)
    }
    new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
  }

  def offset(offset: Int): Table = {
    new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
  }

  def fetch(fetch: Int): Table = {
    if (fetch < 0) {
      throw new ValidationException("FETCH count must be equal or larger than 0.")
    }
    this.logicalPlan match {
      case Limit(o, -1, c) =>
        // replace LIMIT without FETCH by LIMIT with FETCH
        new Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))
      case Limit(_, _, _) =>
        throw new ValidationException("FETCH is already defined.")
      case _ =>
        new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))
    }
  }

  //......
}
  • Table’s orderBy method supports parameters of String or Expression type, where String type is finally converted to Expression type; The orderBy method finally recreates the Table; using Sort; Offset and fetch methods, using Limit to recreate the Table (Limit created by offset method has fetch of-1; The fetch method creates a Limit with an offset of 0 if offset has not been previously specified.)

Sort

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/plan/logical/operators.scala

case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
  override def output: Seq[Attribute] = child.output

  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    child.construct(relBuilder)
    relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)
  }

  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
      failValidation(s"Sort on stream tables is currently not supported.")
    }
    super.validate(tableEnv)
  }
}
  • Sort inherits UnaryNode, its constructor receives Ordering of Set type, and its construction method uses relBuilder.sort to construct sort conditions.

Ordering

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/expressions/ordering.scala

abstract class Ordering extends UnaryExpression {
  override private[flink] def validateInput(): ValidationResult = {
    if (!child.isInstanceOf[NamedExpression]) {
      ValidationFailure(s"Sort should only based on field reference")
    } else {
      ValidationSuccess
    }
  }
}

case class Asc(child: Expression) extends Ordering {
  override def toString: String = s"($child).asc"

  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    child.toRexNode
  }

  override private[flink] def resultType: TypeInformation[_] = child.resultType
}

case class Desc(child: Expression) extends Ordering {
  override def toString: String = s"($child).desc"

  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    relBuilder.desc(child.toRexNode)
  }

  override private[flink] def resultType: TypeInformation[_] = child.resultType
}
  • Ordering is an abstract class with Asc and Desc subclasses

Limit

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/plan/logical/operators.scala

case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {
  override def output: Seq[Attribute] = child.output

  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    child.construct(relBuilder)
    relBuilder.limit(offset, fetch)
  }

  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
      failValidation(s"Limit on stream tables is currently not supported.")
    }
    if (!child.isInstanceOf[Sort]) {
      failValidation(s"Limit operator must be preceded by an OrderBy operator.")
    }
    if (offset < 0) {
      failValidation(s"Offset should be greater than or equal to zero.")
    }
    super.validate(tableEnv)
  }
}
  • Limit inherits UnaryNode, its constructor receives offset and fetch parameters, and its construct method sets offset and fetch through relBuilder.limit

Summary

  • Table’s orderBy method is similar to sql’s orderby; Limit consists of offset and fetch methods, similar to sql offset and fetch
  • Table’s orderBy method supports parameters of String or Expression type, where String type is finally converted to Expression type; The orderBy method finally recreates the Table; using Sort; Offset and fetch methods, using Limit to recreate the Table (Limit created by offset method has fetch of-1; The fetch method creates a Limit with an offset of 0 if offset has not been previously specified.)
  • Sort inherits UnaryNode, its constructor receives Ordering of Set type, and its construction method uses relBuilder.sort to construct sort conditions; Ordering is an abstract class with Asc and Desc subclasses. Limit inherits UnaryNode, its constructor receives offset and fetch parameters, and its construct method sets offset and fetch through relBuilder.limit

doc