Talk about flink’s TableFunction

  flink

Order

This article mainly studies flink’s TableFunction.

Example

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
    private String separator = " ";
    
    public Split(String separator) {
        this.separator = separator;
    }
    
    public void eval(String str) {
        for (String s : str.split(separator)) {
            // use collect(...) to emit a row
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]

// Register the function.
tableEnv.registerFunction("split", new Split("#"));

// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");
myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
  • This example has customized Split function, registered through Tableenvironment.registerfunction, and finally used in table’s api or TableEnvironment.sqlQuery; Split here defines public’s eval method for transmitting data

UserDefinedFunction

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/functions/UserDefinedFunction.scala

abstract class UserDefinedFunction extends Serializable {
  /**
    * Setup method for user-defined function. It can be used for initialization work.
    *
    * By default, this method does nothing.
    */
  @throws(classOf[Exception])
  def open(context: FunctionContext): Unit = {}

  /**
    * Tear-down method for user-defined function. It can be used for clean up work.
    *
    * By default, this method does nothing.
    */
  @throws(classOf[Exception])
  def close(): Unit = {}

  /**
    * @return true if and only if a call to this function is guaranteed to always return
    *         the same result given the same parameters; true is assumed by default
    *         if user's function is not pure functional, like random(), date(), now()...
    *         isDeterministic must return false
    */
  def isDeterministic: Boolean = true

  final def functionIdentifier: String = {
    val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)))
    getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
  }

  /**
    * Returns the name of the UDF that is used for plan explain and logging.
    */
  override def toString: String = getClass.getSimpleName

}
  • The UserDefinedFunction defines the open, close, functionIdentifier methods.

TableFunction

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/functions/TableFunction.scala

abstract class TableFunction[T] extends UserDefinedFunction {

  // ----------------------------------------------------------------------------------------------

  /**
    * Emit an output row.
    *
    * @param row the output row
    */
  protected def collect(row: T): Unit = {
    collector.collect(row)
  }

  // ----------------------------------------------------------------------------------------------

  /**
    * The code generated collector used to emit row.
    */
  private var collector: Collector[T] = _

  /**
    * Internal use. Sets the current collector.
    */
  private[flink] final def setCollector(collector: Collector[T]): Unit = {
    this.collector = collector
  }

  // ----------------------------------------------------------------------------------------------

  /**
    * Returns the result type of the evaluation method with a given signature.
    *
    * This method needs to be overridden in case Flink's type extraction facilities are not
    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
    * method. Flink's type extraction facilities can handle basic types or
    * simple POJOs but might be wrong for more complex, custom, or composite types.
    *
    * @return [[TypeInformation]] of result type or null if Flink should determine the type
    */
  def getResultType: TypeInformation[T] = null

  /**
    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
    * signature.
    *
    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
    * By default Flink's type extraction facilities are used for this but might be wrong for
    * more complex, custom, or composite types.
    *
    * @param signature signature of the method the operand types need to be determined
    * @return [[TypeInformation]] of operand types
    */
  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
    signature.map { c =>
      try {
        TypeExtractor.getForClass(c)
      } catch {
        case ite: InvalidTypesException =>
          throw new ValidationException(
            s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " +
            s"automatically determined. Please provide type information manually.")
      }
    }
  }

}
  • TableFunction inherits UserDefinedFunction and defines methods of collect, setCollector, getResultType, getParameterTypes.

ProcessOperator

flink-streaming-java_2.11-1.7.1-sources.jar! /org/apache/flink/streaming/api/operators/ProcessOperator.java

@Internal
public class ProcessOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    private transient TimestampedCollector<OUT> collector;

    private transient ContextImpl context;

    /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
    private long currentWatermark = Long.MIN_VALUE;

    public ProcessOperator(ProcessFunction<IN, OUT> function) {
        super(function);

        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);

        context = new ContextImpl(userFunction, getProcessingTimeService());
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    //......
}
  • The processElement method of ProcessOperator calls userFunction.processElement, where userFunction is CRowCorrelateProcessRunner.

CRowCorrelateProcessRunner

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala

class CRowCorrelateProcessRunner(
    processName: String,
    processCode: String,
    collectorName: String,
    collectorCode: String,
    @transient var returnType: TypeInformation[CRow])
  extends ProcessFunction[CRow, CRow]
  with ResultTypeQueryable[CRow]
  with Compiler[Any]
  with Logging {

  private var function: ProcessFunction[Row, Row] = _
  private var collector: TableFunctionCollector[_] = _
  private var cRowWrapper: CRowWrappingCollector = _

  override def open(parameters: Configuration): Unit = {
    LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
    LOG.debug("Instantiating TableFunctionCollector.")
    collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
    this.cRowWrapper = new CRowWrappingCollector()

    LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode")
    val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode)
    val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]])
    LOG.debug("Instantiating ProcessFunction.")
    function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]]
    FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext)
    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
    FunctionUtils.openFunction(collector, parameters)
    FunctionUtils.openFunction(function, parameters)
  }

  override def processElement(
      in: CRow,
      ctx: ProcessFunction[CRow, CRow]#Context,
      out: Collector[CRow])
    : Unit = {

    cRowWrapper.out = out
    cRowWrapper.setChange(in.change)

    collector.setCollector(cRowWrapper)
    collector.setInput(in.row)
    collector.reset()

    function.processElement(
      in.row,
      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
      cRowWrapper)
  }

  override def getProducedType: TypeInformation[CRow] = returnType

  override def close(): Unit = {
    FunctionUtils.closeFunction(collector)
    FunctionUtils.closeFunction(function)
  }
}
  • The processElement method of CRowCorrelateProcessRunner calls function.processElement, where function calls Split’s eval method.

Summary

  • TableFunction inherits UserDefinedFunction and defines methods of collect, setCollector, getResultType, getParameterTypes; The UserDefinedFunction defines the open, close, functionIdentifier methods.
  • To customize the TableFunction, in addition to inheriting the TableFunction class, you need to define a public eval method. the parameter type of this method needs to be defined according to the usage scenario. for example, when split is called in this example, the a field of table is passed in, which is of String type, so the input parameter of eval method is defined as String type
  • The processElement method of ProcessOperator calls userFunction.processElement, where userFunction is CRowCorrelateProcessRunner; ; The processElement method of CRowCorrelateProcessRunner calls function.processElement, where function calls Split’s eval method.

doc