Talk about flink Table’s where and filter operations

  flink

Order

This article mainly studies flink Table’s where and filter operations.

Table

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {

  //......

  def where(predicate: String): Table = {
    filter(predicate)
  }

  def where(predicate: Expression): Table = {
    filter(predicate)
  }

  def filter(predicate: String): Table = {
    val predicateExpr = ExpressionParser.parseExpression(predicate)
    filter(predicateExpr)
  }

  def filter(predicate: Expression): Table = {
    new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
  }

  //......
}
  • The where and filter operations of Table have two methods, one is String parameter and the other is Expression parameter. The inside of the where method is to call the filter method. The filter method uses filter (predict, logical plan). validate (Table; env) to create a new table; The String parameter is finally converted to the Expression type by the expressionparser.parseexpression method.

Filter

flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/plan/logical/operators.scala

case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode {
  override def output: Seq[Attribute] = child.output

  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    child.construct(relBuilder)
    relBuilder.filter(condition.toRexNode(relBuilder))
  }

  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    val resolvedFilter = super.validate(tableEnv).asInstanceOf[Filter]
    if (resolvedFilter.condition.resultType != BOOLEAN_TYPE_INFO) {
      failValidation(s"Filter operator requires a boolean expression as input," +
        s" but ${resolvedFilter.condition} is of type ${resolvedFilter.condition.resultType}")
    }
    resolvedFilter
  }
}
  • Filter object inherits UnaryNode, which covers methods such as output, construct, validate, etc. The construct method first converts flink’s Expression into RexNode of Apache calcete through Expression.toRexNode, and then executes the filter method of RelBuilder of Apache calcete.

RexNode

calcite-core-1.18.0-sources.jar! /org/apache/calcite/rex/RexNode.java

public abstract class RexNode {
  //~ Instance fields --------------------------------------------------------

  // Effectively final. Set in each sub-class constructor, and never re-set.
  protected String digest;

  //~ Methods ----------------------------------------------------------------

  public abstract RelDataType getType();

  public boolean isAlwaysTrue() {
    return false;
  }

  public boolean isAlwaysFalse() {
    return false;
  }

  public boolean isA(SqlKind kind) {
    return getKind() == kind;
  }

  public boolean isA(Collection<SqlKind> kinds) {
    return getKind().belongsTo(kinds);
  }

  public SqlKind getKind() {
    return SqlKind.OTHER;
  }

  public String toString() {
    return digest;
  }

  public abstract <R> R accept(RexVisitor<R> visitor);

  public abstract <R, P> R accept(RexBiVisitor<R, P> visitor, P arg);

  @Override public abstract boolean equals(Object obj);

  @Override public abstract int hashCode();
}
  • RexNode is Row expression and can be created by RexBuilder. It has many subclasses, such as RexCall, RexVariable, RexFieldAccess, etc.

RelBuilder.filter

calcite-core-1.18.0-sources.jar! /org/apache/calcite/tools/RelBuilder.java

public class RelBuilder {
  protected final RelOptCluster cluster;
  protected final RelOptSchema relOptSchema;
  private final RelFactories.FilterFactory filterFactory;
  private final RelFactories.ProjectFactory projectFactory;
  private final RelFactories.AggregateFactory aggregateFactory;
  private final RelFactories.SortFactory sortFactory;
  private final RelFactories.ExchangeFactory exchangeFactory;
  private final RelFactories.SortExchangeFactory sortExchangeFactory;
  private final RelFactories.SetOpFactory setOpFactory;
  private final RelFactories.JoinFactory joinFactory;
  private final RelFactories.SemiJoinFactory semiJoinFactory;
  private final RelFactories.CorrelateFactory correlateFactory;
  private final RelFactories.ValuesFactory valuesFactory;
  private final RelFactories.TableScanFactory scanFactory;
  private final RelFactories.MatchFactory matchFactory;
  private final Deque<Frame> stack = new ArrayDeque<>();
  private final boolean simplify;
  private final RexSimplify simplifier;

  protected RelBuilder(Context context, RelOptCluster cluster,
      RelOptSchema relOptSchema) {
    this.cluster = cluster;
    this.relOptSchema = relOptSchema;
    if (context == null) {
      context = Contexts.EMPTY_CONTEXT;
    }
    this.simplify = Hook.REL_BUILDER_SIMPLIFY.get(true);
    this.aggregateFactory =
        Util.first(context.unwrap(RelFactories.AggregateFactory.class),
            RelFactories.DEFAULT_AGGREGATE_FACTORY);
    this.filterFactory =
        Util.first(context.unwrap(RelFactories.FilterFactory.class),
            RelFactories.DEFAULT_FILTER_FACTORY);
    this.projectFactory =
        Util.first(context.unwrap(RelFactories.ProjectFactory.class),
            RelFactories.DEFAULT_PROJECT_FACTORY);
    this.sortFactory =
        Util.first(context.unwrap(RelFactories.SortFactory.class),
            RelFactories.DEFAULT_SORT_FACTORY);
    this.exchangeFactory =
        Util.first(context.unwrap(RelFactories.ExchangeFactory.class),
            RelFactories.DEFAULT_EXCHANGE_FACTORY);
    this.sortExchangeFactory =
        Util.first(context.unwrap(RelFactories.SortExchangeFactory.class),
            RelFactories.DEFAULT_SORT_EXCHANGE_FACTORY);
    this.setOpFactory =
        Util.first(context.unwrap(RelFactories.SetOpFactory.class),
            RelFactories.DEFAULT_SET_OP_FACTORY);
    this.joinFactory =
        Util.first(context.unwrap(RelFactories.JoinFactory.class),
            RelFactories.DEFAULT_JOIN_FACTORY);
    this.semiJoinFactory =
        Util.first(context.unwrap(RelFactories.SemiJoinFactory.class),
            RelFactories.DEFAULT_SEMI_JOIN_FACTORY);
    this.correlateFactory =
        Util.first(context.unwrap(RelFactories.CorrelateFactory.class),
            RelFactories.DEFAULT_CORRELATE_FACTORY);
    this.valuesFactory =
        Util.first(context.unwrap(RelFactories.ValuesFactory.class),
            RelFactories.DEFAULT_VALUES_FACTORY);
    this.scanFactory =
        Util.first(context.unwrap(RelFactories.TableScanFactory.class),
            RelFactories.DEFAULT_TABLE_SCAN_FACTORY);
    this.matchFactory =
        Util.first(context.unwrap(RelFactories.MatchFactory.class),
            RelFactories.DEFAULT_MATCH_FACTORY);
    final RexExecutor executor =
        Util.first(context.unwrap(RexExecutor.class),
            Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR));
    final RelOptPredicateList predicates = RelOptPredicateList.EMPTY;
    this.simplifier =
        new RexSimplify(cluster.getRexBuilder(), predicates, executor);
  }

  public RelBuilder filter(RexNode... predicates) {
    return filter(ImmutableList.copyOf(predicates));
  }

  public RelBuilder filter(Iterable<? extends RexNode> predicates) {
    final RexNode simplifiedPredicates =
        simplifier.simplifyFilterPredicates(predicates);
    if (simplifiedPredicates == null) {
      return empty();
    }

    if (!simplifiedPredicates.isAlwaysTrue()) {
      final Frame frame = stack.pop();
      final RelNode filter = filterFactory.createFilter(frame.rel, simplifiedPredicates);
      stack.push(new Frame(filter, frame.fields));
    }
    return this;
  }

  //......

}
  • RelBuilder creates RelFactories.FilterFactory in the constructor, which provides two filterfactory methods, one is RexNode variable-length array parameter and the other is RexNode type Iterable parameter. The filter method first uses the simplified.simplified.simplifiedPredicates to convert the RexNode type Iterable into simplified predictors (RexNodeAfter that, as long as simplified predictions. isalwaytrue () is false, the Frame (LIFO (Last-In-First-Out) stacks, call filterFactory.createFilter to create RelNode to construct a new Frame, and then put it back into deque’s queue head.

Frame

calcite-core-1.18.0-sources.jar! /org/apache/calcite/tools/RelBuilder.java

  private static class Frame {
    final RelNode rel;
    final ImmutableList<Field> fields;

    private Frame(RelNode rel, ImmutableList<Field> fields) {
      this.rel = rel;
      this.fields = fields;
    }

    private Frame(RelNode rel) {
      String tableAlias = deriveAlias(rel);
      ImmutableList.Builder<Field> builder = ImmutableList.builder();
      ImmutableSet<String> aliases = tableAlias == null
          ? ImmutableSet.of()
          : ImmutableSet.of(tableAlias);
      for (RelDataTypeField field : rel.getRowType().getFieldList()) {
        builder.add(new Field(aliases, field));
      }
      this.rel = rel;
      this.fields = builder.build();
    }

    private static String deriveAlias(RelNode rel) {
      if (rel instanceof TableScan) {
        final List<String> names = rel.getTable().getQualifiedName();
        if (!names.isEmpty()) {
          return Util.last(names);
        }
      }
      return null;
    }

    List<RelDataTypeField> fields() {
      return Pair.right(fields);
    }
  }
  • Frame is stored in ArrayDeque, which is actually a relational expression used to describe the previous operation and how the alias of table is mapped into row type.

RelFactories.FilterFactory.createFilter

calcite-core-1.18.0-sources.jar! /org/apache/calcite/rel/core/RelFactories.java

  public interface FilterFactory {
    /** Creates a filter. */
    RelNode createFilter(RelNode input, RexNode condition);
  }

  private static class FilterFactoryImpl implements FilterFactory {
    public RelNode createFilter(RelNode input, RexNode condition) {
      return LogicalFilter.create(input, condition);
    }
  }
  • FilterFactoryImpl implements the FilterFactory interface, and the createFilter method executes logical filter.create (input, condition), where input is RelNode type (RelNode takes the rel of Frame.), condition is RexNode type.

LogicalFilter

calcite-core-1.18.0-sources.jar! /org/apache/calcite/rel/logical/LogicalFilter.java

public final class LogicalFilter extends Filter {
  private final ImmutableSet<CorrelationId> variablesSet;

  /** Creates a LogicalFilter. */
  public static LogicalFilter create(final RelNode input, RexNode condition) {
    return create(input, condition, ImmutableSet.of());
  }

  /** Creates a LogicalFilter. */
  public static LogicalFilter create(final RelNode input, RexNode condition,
      ImmutableSet<CorrelationId> variablesSet) {
    final RelOptCluster cluster = input.getCluster();
    final RelMetadataQuery mq = cluster.getMetadataQuery();
    final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
        .replaceIfs(RelCollationTraitDef.INSTANCE,
            () -> RelMdCollation.filter(mq, input))
        .replaceIf(RelDistributionTraitDef.INSTANCE,
            () -> RelMdDistribution.filter(mq, input));
    return new LogicalFilter(cluster, traitSet, input, condition, variablesSet);
  }

  //......
}
  • LogicalFilter inherits abstract class Filter, Filter inherits SingleRel, SingleRel inherits abstract ctRelNode, and abstract ctrelnode implements relnode interface.

Summary

  • The where and filter operations of Table have two methods, one is String parameter and the other is Expression parameter. The inside of the where method is to call the filter method. The filter method uses filter (predict, logical plan). validate (Table; env) to create a new table; The String parameter is finally converted to the Expression type by the expressionparser.parseexpression method.
  • Filter object inherits UnaryNode, which covers methods such as output, construct, validate, etc. ToRexNode converts flink’s Expression.toRexNode (RexNode is Row expression and can be created by RexBuilder. It has many subclasses, such as RexCall, RexVariable, RexFieldAccess, etc.), and then execute Apache Calcite’s RelBuilder’s filter method.
  • RelBuilder creates RelFactories.FilterFactory in the constructor, which provides two filterfactory methods, one is RexNode variable-length array parameter and the other is RexNode type Iterable parameter. The filter method first uses the simplified.simplified.simplifiedPredicates to convert the RexNode type Iterable into simplified predictors (RexNodeAfter that, as long as simplified predictions. isalwaytrue () is false, the Frame (LIFO (Last-In-First-Out) stacks and Frame are stored in ArrayDeque, which is actually a relational expression used to describe the previous operation and how the alias of table is mapped into row type), call filterFactory.createFilter to create RelNode to construct a new Frame, and then put it back into deque’s queue head; FilterFactoryImpl implements the FilterFactory interface, and the createFilter method executes logical filter.create (input, condition), where input is RelNode type (RelNode takes the rel of Frame.), condition is a RexNode type (RexNode is Row expression and can be created by RexBuilder. It has many subclasses, such as RexCall, RexVariable, RexFieldAccess, etc.); LogicalFilter inherits abstract class Filter, Filter inherits SingleRel, SingleRel inherits abstract ctRelNode, and abstract ctrelnode implements relnode interface.

doc