Joins of flink Table

  flink

Order

This article focuses on the study of flink Table’s Joins.

Example

Inner Join

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.join(right).where("a = d").select("a, b, e");
  • The join method is called inner join.

Outer Join

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");

Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
  • Outer join is divided into leftOuterJoin, rightOuterJoin and fullOuterJoin

Time-windowed Join

Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");

Table result = left.join(right)
  .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
  .select("a, b, e, ltime");
  • Time-windowed join requires at least one equivalent condition, and then a condition related to both sides of the time (You can use <, < =, > =, >)

Inner Join with Table Function

// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);

// join
Table orders = tableEnv.scan("Orders");
Table result = orders
    .join(new Table(tableEnv, "split(c)").as("s", "t", "v"))
    .select("a, b, s, t, v");
  • Table can also be innerjoined with table function. if table function returns empty, the table record is discarded.

Left Outer Join with Table Function

// register User-Defined Table Function
TableFunction<String> split = new MySplitUDTF();
tableEnv.registerFunction("split", split);

// join
Table orders = tableEnv.scan("Orders");
Table result = orders
    .leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v"))
    .select("a, b, s, t, v");
  • Table can also be left outer join with table function. if table function returns null, the record of table is kept, and the null part is null value.

Join with Temporal Table

Table ratesHistory = tableEnv.scan("RatesHistory");

// register temporal table function with a time attribute and primary key
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
    "r_proctime",
    "r_currency");
tableEnv.registerFunction("rates", rates);

// join with "Orders" based on the time attribute and key
Table orders = tableEnv.scan("Orders");
Table result = orders
    .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
  • Table can also join with Temporal tables. Temporal tables come from Table’s createTemporalTableFunction. Currently, only inner join is supported.

Table

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {
  //......

  def join(right: Table): Table = {
    join(right, None, JoinType.INNER)
  }

  def join(right: Table, joinPredicate: String): Table = {
    join(right, joinPredicate, JoinType.INNER)
  }

  def join(right: Table, joinPredicate: Expression): Table = {
    join(right, Some(joinPredicate), JoinType.INNER)
  }

  def leftOuterJoin(right: Table): Table = {
    join(right, None, JoinType.LEFT_OUTER)
  }

  def leftOuterJoin(right: Table, joinPredicate: String): Table = {
    join(right, joinPredicate, JoinType.LEFT_OUTER)
  }

  def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
    join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
  }

  def rightOuterJoin(right: Table, joinPredicate: String): Table = {
    join(right, joinPredicate, JoinType.RIGHT_OUTER)
  }

  def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
    join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
  }

  def fullOuterJoin(right: Table, joinPredicate: String): Table = {
    join(right, joinPredicate, JoinType.FULL_OUTER)
  }

  def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
    join(right, Some(joinPredicate), JoinType.FULL_OUTER)
  }

  private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
    val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
    join(right, Some(joinPredicateExpr), joinType)
  }

  private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {

    // check if we join with a table or a table function
    if (!containsUnboundedUDTFCall(right.logicalPlan)) {
      // regular table-table join

      // check that the TableEnvironment of right table is not null
      // and right table belongs to the same TableEnvironment
      if (right.tableEnv != this.tableEnv) {
        throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
      }

      new Table(
        tableEnv,
        Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
          .validate(tableEnv))

    } else {
      // join with a table function

      // check join type
      if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) {
        throw new ValidationException(
          "TableFunctions are currently supported for join and leftOuterJoin.")
      }

      val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall]
      val udtfCall = LogicalTableFunctionCall(
        udtf.functionName,
        udtf.tableFunction,
        udtf.parameters,
        udtf.resultType,
        udtf.fieldNames,
        this.logicalPlan
      ).validate(tableEnv)

      new Table(
        tableEnv,
        Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true)
          .validate(tableEnv))
    }
  }

  //......
}
  • Table defines join, leftOuterjoin, rightOuterjoin, fullOuterJoin methods, which are all called private Join methods in the end. JoinType is used to express Join types, including INNER, LEFT _ OUTER, RIGHT _ OUTER, and FULL _ OUTER. In addition, conditional expressions of String type or Expression are received, wherein String type is finally resolved to Expression type; The Join method finally uses join to create a new Table

Join

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/plan/logical/operators.scala

case class Join(
    left: LogicalNode,
    right: LogicalNode,
    joinType: JoinType,
    condition: Option[Expression],
    correlated: Boolean) extends BinaryNode {

  override def output: Seq[Attribute] = {
    left.output ++ right.output
  }

  private case class JoinFieldReference(
    name: String,
    resultType: TypeInformation[_],
    left: LogicalNode,
    right: LogicalNode) extends Attribute {

    val isFromLeftInput: Boolean = left.output.map(_.name).contains(name)

    val (indexInInput, indexInJoin) = if (isFromLeftInput) {
      val indexInLeft = left.output.map(_.name).indexOf(name)
      (indexInLeft, indexInLeft)
    } else {
      val indexInRight = right.output.map(_.name).indexOf(name)
      (indexInRight, indexInRight + left.output.length)
    }

    override def toString = s"'$name"

    override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
      // look up type of field
      val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType
      // create a new RexInputRef with index offset
      new RexInputRef(indexInJoin, fieldType)
    }

    override def withName(newName: String): Attribute = {
      if (newName == name) {
        this
      } else {
        JoinFieldReference(newName, resultType, left, right)
      }
    }
  }

  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
    val node = super.resolveExpressions(tableEnv).asInstanceOf[Join]
    val partialFunction: PartialFunction[Expression, Expression] = {
      case field: ResolvedFieldReference => JoinFieldReference(
        field.name,
        field.resultType,
        left,
        right)
    }
    val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction))
    Join(node.left, node.right, node.joinType, resolvedCondition, correlated)
  }

  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    left.construct(relBuilder)
    right.construct(relBuilder)

    val corSet = mutable.Set[CorrelationId]()
    if (correlated) {
      corSet += relBuilder.peek().getCluster.createCorrel()
    }

    relBuilder.join(
      convertJoinType(joinType),
      condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
      corSet.asJava)
  }

  private def convertJoinType(joinType: JoinType) = joinType match {
    case JoinType.INNER => JoinRelType.INNER
    case JoinType.LEFT_OUTER => JoinRelType.LEFT
    case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
    case JoinType.FULL_OUTER => JoinRelType.FULL
  }

  private def ambiguousName: Set[String] =
    left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)

  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
    if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
      failValidation(s"Filter operator requires a boolean expression as input, " +
        s"but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}")
    } else if (ambiguousName.nonEmpty) {
      failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
    }

    resolvedJoin.condition.foreach(testJoinCondition)
    resolvedJoin
  }

  private def testJoinCondition(expression: Expression): Unit = {

    def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
      case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
        if x.isFromLeftInput != y.isFromLeftInput => true
      case _ => false
    }

    def checkIfFilterCondition(exp: BinaryComparison) = exp.children match {
      case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil => false
      case (x: JoinFieldReference) :: (_) :: Nil => true
      case (_) :: (y: JoinFieldReference) :: Nil => true
      case _ => false
    }

    var equiJoinPredicateFound = false
    // Whether the predicate is literal true.
    val alwaysTrue = expression match {
      case x: Literal if x.value.equals(true) => true
      case _ => false
    }

    def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
      case x: And => x.children.foreach(validateConditions(_, isAndBranch))
      case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
      case x: EqualTo =>
        if (isAndBranch && checkIfJoinCondition(x)) {
          equiJoinPredicateFound = true
        }
      case x: BinaryComparison =>
      // The boolean literal should be a valid condition type.
      case x: Literal if x.resultType == Types.BOOLEAN =>
      case x => failValidation(
        s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
    }

    validateConditions(expression, isAndBranch = true)

    // Due to a bug in Apache Calcite (see CALCITE-2004 and FLINK-7865) we cannot accept join
    // predicates except literal true for TableFunction left outer join.
    if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) {
      if (!alwaysTrue) failValidation("TableFunction left outer join predicate can only be " +
        "empty or literal true.")
    } else {
      if (!equiJoinPredicateFound) {
        failValidation(
          s"Invalid join condition: $expression. At least one equi-join predicate is " +
            s"required.")
      }
    }
  }
}
  • Join inherits BinaryNode, which internally converts flink’s JoinType into calcite’s JoinRelType. The construction method uses relBuilder.join to construct the Join relationship.

Summary

  • Table supports many forms of join, including Inner Join, Outer Join, Time-windowed Join, Inner Join with Table Function, Left Outer Join with Table Function, Join with Temporal Table.
  • Table defines join, leftOuterjoin, rightOuterjoin, fullOuterJoin methods, which are all called private Join methods in the end. JoinType is used to express Join types, including INNER, LEFT _ OUTER, RIGHT _ OUTER, and FULL _ OUTER. In addition, conditional expressions of String type or Expression are received, wherein String type is finally resolved to Expression type; The Join method finally uses join to create a new Table
  • Join inherits BinaryNode, which internally converts flink’s JoinType into calcite’s JoinRelType. The construction method uses relBuilder.join to construct the Join relationship.

doc