Talk about flink’s PrintSinkFunction

  flink

Order

This article mainly studies flink’s PrintSinkFunction

DataStream.print

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * Writes a DataStream to the standard output stream (stdout).
     *
     * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
     *
     * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
     * worker.
     *
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
        return addSink(printFunction).name("Print to Std. Out");
    }

    /**
     * Writes a DataStream to the standard output stream (stderr).
     *
     * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
     *
     * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink
     * worker.
     *
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> printToErr() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(true);
        return addSink(printFunction).name("Print to Std. Err");
    }

    /**
     * Writes a DataStream to the standard output stream (stdout).
     *
     * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
     *
     * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
     * worker.
     *
     * @param sinkIdentifier The string to prefix the output with.
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> print(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
        return addSink(printFunction).name("Print to Std. Out");
    }

    /**
     * Writes a DataStream to the standard output stream (stderr).
     *
     * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
     *
     * <p>NOTE: This will print to stderr on the machine where the code is executed, i.e. the Flink
     * worker.
     *
     * @param sinkIdentifier The string to prefix the output with.
     * @return The closed DataStream.
     */
    @PublicEvolving
    public DataStreamSink<T> printToErr(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, true);
        return addSink(printFunction).name("Print to Std. Err");
    }

    /**
     * Adds the given sink to this DataStream. Only streams with sinks added
     * will be executed once the {@link StreamExecutionEnvironment#execute()}
     * method is called.
     *
     * @param sinkFunction
     *            The object containing the sink's invoke function.
     * @return The closed DataStream.
     */
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }

        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }
  • DataStream provides several methods that start with print. Internally, PrintSinkFunction is created and added by calling addSink operation.
  • Comments from the addSink method indicate that streams with sinks will be executed when the streamexecutionenvironment. execute () call is made.
  • SinkFunction is first wrapped by StreamSink, then by DataStreamSink, and finally added to ExecutionEnvironment through datastreamsink.gettransformation as an operator.

SinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/sink/SinkFunction.java

/**
 * Interface for implementing user defined sink functionality.
 *
 * @param <IN> Input type parameter.
 */
@Public
public interface SinkFunction<IN> extends Function, Serializable {

    /**
     * @deprecated Use {@link #invoke(Object, Context)}.
     */
    @Deprecated
    default void invoke(IN value) throws Exception {}

    /**
     * Writes the given value to the sink. This function is called for every record.
     *
     * <p>You have to override this method when implementing a {@code SinkFunction}, this is a
     * {@code default} method for backward compatibility with the old-style method only.
     *
     * @param value The input record.
     * @param context Additional context about the input record.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }

    /**
     * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
     * an input record.
     *
     * <p>The context is only valid for the duration of a
     * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
     * afterwards!
     *
     * @param <T> The type of elements accepted by the sink.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface Context<T> {

        /** Returns the current processing time. */
        long currentProcessingTime();

        /** Returns the current event-time watermark. */
        long currentWatermark();

        /**
         * Returns the timestamp of the current input record or {@code null} if the element does not
         * have an assigned timestamp.
         */
        Long timestamp();
    }
}
  • The SinkFunction interface defines the invoke method to trigger the sinkfunction logic. A Context is passed in the invoke method, and the interface defines three methods: currentProcessingTime, currentWatermark, and timestamp.

RichSinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java

/**
 * A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}.
 */
@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {

    private static final long serialVersionUID = 1L;
}
  • RichSinkFunction abstract class inherits AbstractRichFunction class and also declares to implement SinkFunction interface. Most built-in sinkfunctions inherit RichSinkFunction; ; AbstractRichFunction mainly provides the RuntimeContext attribute, which can be used to obtain the runtime context of the function.

PrintSinkFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java

/**
 * Implementation of the SinkFunction writing every tuple to the standard
 * output or standard error stream.
 *
 * <p>
 * Four possible format options:
 *    {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} provided, parallelism > 1
 *    {@code sinkIdentifier}> output         <- {@code sinkIdentifier} provided, parallelism == 1
 *  taskId> output                            <- no {@code sinkIdentifier} provided, parallelism > 1
 *  output                                    <- no {@code sinkIdentifier} provided, parallelism == 1
 * </p>
 *
 * @param <IN> Input record type
 */
@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {

    private static final long serialVersionUID = 1L;

    private final PrintSinkOutputWriter<IN> writer;

    /**
     * Instantiates a print sink function that prints to standard out.
     */
    public PrintSinkFunction() {
        writer = new PrintSinkOutputWriter<>(false);
    }

    /**
     * Instantiates a print sink function that prints to standard out.
     *
     * @param stdErr True, if the format should print to standard error instead of standard out.
     */
    public PrintSinkFunction(final boolean stdErr) {
        writer = new PrintSinkOutputWriter<>(stdErr);
    }

    /**
     * Instantiates a print sink function that prints to standard out and gives a sink identifier.
     *
     * @param stdErr True, if the format should print to standard error instead of standard out.
     * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
     */
    public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
        writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
        writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
    }

    @Override
    public void invoke(IN record) {
        writer.write(record);
    }

    @Override
    public String toString() {
        return writer.toString();
    }
}
  • PrintSinkFunction inherits RichSinkFunction, which mainly uses printsinkoutputriter and calls printsinkoutputriter’s write method to execute output when invoking

PrintSinkOutputWriter

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java

/**
 * Print sink output writer for DataStream and DataSet print API.
 */
@Internal
public class PrintSinkOutputWriter<IN> implements Serializable {

    private static final long serialVersionUID = 1L;

    private static final boolean STD_OUT = false;
    private static final boolean STD_ERR = true;

    private final boolean target;
    private transient PrintStream stream;
    private final String sinkIdentifier;
    private transient String completedPrefix;

    public PrintSinkOutputWriter() {
        this("", STD_OUT);
    }

    public PrintSinkOutputWriter(final boolean stdErr) {
        this("", stdErr);
    }

    public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) {
        this.target = stdErr;
        this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier);
    }

    public void open(int subtaskIndex, int numParallelSubtasks) {
        // get the target stream
        stream = target == STD_OUT ? System.out : System.err;

        completedPrefix = sinkIdentifier;

        if (numParallelSubtasks > 1) {
            if (!completedPrefix.isEmpty()) {
                completedPrefix += ":";
            }
            completedPrefix += (subtaskIndex + 1);
        }

        if (!completedPrefix.isEmpty()) {
            completedPrefix += "> ";
        }
    }

    public void write(IN record) {
        stream.println(completedPrefix + record.toString());
    }

    @Override
    public String toString() {
        return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
    }
}
  • The constructor of PrintSinkOutputWriter can receive at most two parameters, namely sinkIdentifier and stdErr; ; SinkIdentifier is the prefix of the output, stdErr is used to indicate whether to output to System.err
  • The open method is mainly used to do some preparatory work. It will be called in the open method of PrintSinkFunction, and the open method of PrintSinkFunction will obtain subtaskIndex and numParallelSubtasks from the RuntimeContext defined by AbstractRichFunction. Th e open method here constructs a completedPrefix based on the sinkIdentifier and subtaskIndex, numParallelSubtasks information.
  • The write method is to call the println method of System.out or System.err with the information of completedPrefix and record.

Summary

  • The PrintSinkFunction is created inside several printstarting methods of DataStream, and then the addSink method is called to add to the ExecutionEnvironment (First it is wrapped by StreamSink, then by DataStreamSink, and finally it is added to the ExecutionEnvironment as an operator through datastreamsink.gettransformation.)
  • SinkFunction is the basic interface of sink function, which mainly defines the invoke method, in which a Context; is passed; However, some built-in sinkfunctions are mostly inherited richsinkfunctions. richsinkfunctions mainly inherit AbstractRichFunction and can provide funtion context information when the fund runs.
  • PrintSinkFunction inherits RichSinkFunction, which mainly uses printsinkoutputriter and calls printsinkoutputriter’s write method to execute output when invoking

doc