Talk about ScalarFunction of flink Table.

  flink

Order

This article mainly studies the ScalarFunction of flink Table.

Example

public class HashCode extends ScalarFunction {

    private int factor = 0;

    @Override
    public void open(FunctionContext context) throws Exception {
        // access "hashcode_factor" parameter
        // "12" would be the default value if parameter does not exist
        factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
    }

    public int eval(String s) {
        return s.hashCode() * factor;
    }
}

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);

// register the function
tableEnv.registerFunction("hashCode", new HashCode());

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
  • HashCode inherits ScalarFunction, which defines eval methods.

ScalarFunction

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/functions/ScalarFunction.scala

abstract class ScalarFunction extends UserDefinedFunction {

  /**
    * Creates a call to a [[ScalarFunction]] in Scala Table API.
    *
    * @param params actual parameters of function
    * @return [[Expression]] in form of a [[ScalarFunctionCall]]
    */
  final def apply(params: Expression*): Expression = {
    ScalarFunctionCall(this, params)
  }

  // ----------------------------------------------------------------------------------------------

  /**
    * Returns the result type of the evaluation method with a given signature.
    *
    * This method needs to be overridden in case Flink's type extraction facilities are not
    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
    * method. Flink's type extraction facilities can handle basic types or
    * simple POJOs but might be wrong for more complex, custom, or composite types.
    *
    * @param signature signature of the method the return type needs to be determined
    * @return [[TypeInformation]] of result type or null if Flink should determine the type
    */
  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null

  /**
    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
    * signature.
    *
    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
    * By default Flink's type extraction facilities are used for this but might be wrong for
    * more complex, custom, or composite types.
    *
    * @param signature signature of the method the operand types need to be determined
    * @return [[TypeInformation]] of operand types
    */
  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
    signature.map { c =>
      try {
        TypeExtractor.getForClass(c)
      } catch {
        case ite: InvalidTypesException =>
          throw new ValidationException(
            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
            s"automatically determined. Please provide type information manually.")
      }
    }
  }
}
  • ScalarFunction inherits UserDefinedFunction, which defines the methods apply, getResultType, getParameterTypes; ScalarFunction is mainly used to map zero/one/more scalar values into new Scalar values, and its map behavior is implemented by the user-defined public eval method. In addition, it is generally recommended to use the original type as declare parameters or result types, such as replacing DATE/TIME with int and TIMESTAMP with long.

CRowProcessRunner

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/runtime/CRowProcessRunner.scala

class CRowProcessRunner(
    name: String,
    code: String,
    @transient var returnType: TypeInformation[CRow])
  extends ProcessFunction[CRow, CRow]
  with ResultTypeQueryable[CRow]
  with Compiler[ProcessFunction[Row, Row]]
  with Logging {

  private var function: ProcessFunction[Row, Row] = _
  private var cRowWrapper: CRowWrappingCollector = _

  override def open(parameters: Configuration): Unit = {
    LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
    LOG.debug("Instantiating ProcessFunction.")
    function = clazz.newInstance()
    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
    FunctionUtils.openFunction(function, parameters)

    this.cRowWrapper = new CRowWrappingCollector()
  }

  override def processElement(
      in: CRow,
      ctx: ProcessFunction[CRow, CRow]#Context,
      out: Collector[CRow])
    : Unit = {

    cRowWrapper.out = out
    cRowWrapper.setChange(in.change)
    function.processElement(
      in.row,
      ctx.asInstanceOf[ProcessFunction[Row, Row]#Context],
      cRowWrapper)
  }

  override def getProducedType: TypeInformation[CRow] = returnType

  override def close(): Unit = {
    FunctionUtils.closeFunction(function)
  }
}
  • The processElement method of CRowProcessRunner calls function.processElement, while function.processElement calls the eval method of the user-defined ScalarFunction. The function here inherits the ProcessFunction, and its code is the constructor parameter of cronprocessrunner, which is generated by DataStreamCalc when creating cronprocessrunner in the translateToPlan method.

ProcessFunction

flink-streaming-java_2.11-1.7.1-sources.jar! /org/apache/flink/streaming/api/functions/ProcessFunction.java

@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    /**
     * Process one element from the input stream.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter
     * and also update internal state or set timers using the {@link Context} parameter.
     *
     * @param value The input value.
     * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
     *            a {@link TimerService} for registering timers and querying the time. The
     *            context is only valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    /**
     * Called when a timer set using {@link TimerService} fires.
     *
     * @param timestamp The timestamp of the firing timer.
     * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
     *            querying the {@link TimeDomain} of the firing timer and getting a
     *            {@link TimerService} for registering timers and querying the time.
     *            The context is only valid during the invocation of this method, do not store it.
     * @param out The collector for returning result values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    /**
     * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
     * or {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class Context {

        /**
         * Timestamp of the element currently being processed or timestamp of a firing timer.
         *
         * <p>This might be {@code null}, for example if the time characteristic of your program
         * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
         */
        public abstract Long timestamp();

        /**
         * A {@link TimerService} for querying time and registering timers.
         */
        public abstract TimerService timerService();

        /**
         * Emits a record to the side output identified by the {@link OutputTag}.
         *
         * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
         * @param value The record to emit.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }

    /**
     * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
     */
    public abstract class OnTimerContext extends Context {
        /**
         * The {@link TimeDomain} of the firing timer.
         */
        public abstract TimeDomain timeDomain();
    }

}
  • ProcessFunction inherits AbstractRichFunction, which defines abstract method processElement.

DataStreamCalc

flink-table_2.11-1.7.1-sources.jar! /org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala

class DataStreamCalc(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    input: RelNode,
    inputSchema: RowSchema,
    schema: RowSchema,
    calcProgram: RexProgram,
    ruleDescription: String)
  extends Calc(cluster, traitSet, input, calcProgram)
  with CommonCalc
  with DataStreamRel {

  //......

  override def translateToPlan(
      tableEnv: StreamTableEnvironment,
      queryConfig: StreamQueryConfig): DataStream[CRow] = {

    val config = tableEnv.getConfig

    val inputDataStream =
      getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)

    // materialize time attributes in condition
    val condition = if (calcProgram.getCondition != null) {
      val materializedCondition = RelTimeIndicatorConverter.convertExpression(
        calcProgram.expandLocalRef(calcProgram.getCondition),
        inputSchema.relDataType,
        cluster.getRexBuilder)
      Some(materializedCondition)
    } else {
      None
    }

    // filter out time attributes
    val projection = calcProgram.getProjectList.asScala
      .map(calcProgram.expandLocalRef)

    val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo)

    val genFunction = generateFunction(
      generator,
      ruleDescription,
      inputSchema,
      schema,
      projection,
      condition,
      config,
      classOf[ProcessFunction[CRow, CRow]])

    val inputParallelism = inputDataStream.getParallelism

    val processFunc = new CRowProcessRunner(
      genFunction.name,
      genFunction.code,
      CRowTypeInfo(schema.typeInfo))

    inputDataStream
      .process(processFunc)
      .name(calcOpName(calcProgram, getExpressionString))
      // keep parallelism to ensure order of accumulate and retract messages
      .setParallelism(inputParallelism)
  }
}
  • DataStreamCalc’s translateToPlan called the generateFunction method of CommonCalc to generate genFunction, and then created CRowProcessRunner; through genFunction.name, genFunction.code and CRowTypeInfo. The generated code inherits the ProcessFunction and implements the processElement method, which calls the eval method of the user-defined ScalarFunction.

Summary

  • ScalarFunction inherits UserDefinedFunction, which defines the methods apply, getResultType, getParameterTypes; ScalarFunction is mainly used to map zero/one/more scalar values into new Scalar values, and its map behavior is implemented by the user-defined public eval method. In addition, it is generally recommended to use the original type as declare parameters or result types, such as replacing DATE/TIME with int and TIMESTAMP with long.
  • The processElement method of CRowProcessRunner calls function.processElement, while function.processElement calls the eval method of the user-defined ScalarFunction. The function here inherits the ProcessFunction, and its code is the constructor parameter of CRowProcessRunner, which is generated when DataStreamCalc creates CRowProcessRunner in the translateToPlan method. ProcessFunction inherits AbstractRichFunction, which defines abstract method processElement.
  • DataStreamCalc’s translateToPlan called the generateFunction method of CommonCalc to generate genFunction, and then created CRowProcessRunner; through genFunction.name, genFunction.code and CRowTypeInfo. The generated code inherits the ProcessFunction and implements the processElement method, which calls the eval method of the user-defined ScalarFunction.

doc