Talk about flink’s SourceFunction

  flink

Order

This article mainly studies flink’s SourceFunction.

Example

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());

        dataStreamSource.map(new UpperCaseMapFunc()).print();

        env.execute("sourceFunctionDemo");
  • A custom SourceFunction is added here through the addSource method.

SourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/functions/source/SourceFunction.java

/**
 * Base interface for all stream data sources in Flink. The contract of a stream source
 * is the following: When the source should start emitting elements, the {@link #run} method
 * is called with a {@link SourceContext} that can be used for emitting elements.
 * The run method can run for as long as necessary. The source must, however, react to an
 * invocation of {@link #cancel()} by breaking out of its main loop.
 *
 * <h3>CheckpointedFunction Sources</h3>
 *
 * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
 * interface must ensure that state checkpointing, updating of internal state and emission of
 * elements are not done concurrently. This is achieved by using the provided checkpointing lock
 * object to protect update of state and emission of elements in a synchronized block.
 *
 * <p>This is the basic pattern one should follow when implementing a checkpointed source:
 *
 * <pre>{@code
 *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
 *      private long count = 0L;
 *      private volatile boolean isRunning = true;
 *
 *      private transient ListState<Long> checkpointedCount;
 *
 *      public void run(SourceContext<T> ctx) {
 *          while (isRunning && count < 1000) {
 *              // this synchronized block ensures that state checkpointing,
 *              // internal state updates and emission of elements are an atomic operation
 *              synchronized (ctx.getCheckpointLock()) {
 *                  ctx.collect(count);
 *                  count++;
 *              }
 *          }
 *      }
 *
 *      public void cancel() {
 *          isRunning = false;
 *      }
 *
 *      public void initializeState(FunctionInitializationContext context) {
 *          this.checkpointedCount = context
 *              .getOperatorStateStore()
 *              .getListState(new ListStateDescriptor<>("count", Long.class));
 *
 *          if (context.isRestored()) {
 *              for (Long count : this.checkpointedCount.get()) {
 *                  this.count = count;
 *              }
 *          }
 *      }
 *
 *      public void snapshotState(FunctionSnapshotContext context) {
 *          this.checkpointedCount.clear();
 *          this.checkpointedCount.add(count);
 *      }
 * }
 * }</pre>
 *
 *
 * <h3>Timestamps and watermarks:</h3>
 * Sources may assign timestamps to elements and may manually emit watermarks.
 * However, these are only interpreted if the streaming program runs on
 * {@link TimeCharacteristic#EventTime}. On other time characteristics
 * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
 * the watermarks from the source function are ignored.
 *
 * <h3>Gracefully Stopping Functions</h3>
 * Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction}
 * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the
 * state and the emitted elements in a consistent state.
 *
 * <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the
 * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity
 * of state updates and element emission.
 *
 * @param <T> The type of the elements produced by this source.
 *
 * @see org.apache.flink.api.common.functions.StoppableFunction
 * @see org.apache.flink.streaming.api.TimeCharacteristic
 */
@Public
public interface SourceFunction<T> extends Function, Serializable {

    /**
     * Starts the source. Implementations can use the {@link SourceContext} emit
     * elements.
     *
     * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
     * must lock on the checkpoint lock (using a synchronized block) before updating internal
     * state and emitting elements, to make both an atomic operation:
     *
     * <pre>{@code
     *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
     *      private long count = 0L;
     *      private volatile boolean isRunning = true;
     *
     *      private transient ListState<Long> checkpointedCount;
     *
     *      public void run(SourceContext<T> ctx) {
     *          while (isRunning && count < 1000) {
     *              // this synchronized block ensures that state checkpointing,
     *              // internal state updates and emission of elements are an atomic operation
     *              synchronized (ctx.getCheckpointLock()) {
     *                  ctx.collect(count);
     *                  count++;
     *              }
     *          }
     *      }
     *
     *      public void cancel() {
     *          isRunning = false;
     *      }
     *
     *      public void initializeState(FunctionInitializationContext context) {
     *          this.checkpointedCount = context
     *              .getOperatorStateStore()
     *              .getListState(new ListStateDescriptor<>("count", Long.class));
     *
     *          if (context.isRestored()) {
     *              for (Long count : this.checkpointedCount.get()) {
     *                  this.count = count;
     *              }
     *          }
     *      }
     *
     *      public void snapshotState(FunctionSnapshotContext context) {
     *          this.checkpointedCount.clear();
     *          this.checkpointedCount.add(count);
     *      }
     * }
     * }</pre>
     *
     * @param ctx The context to emit elements to and for accessing locks.
     */
    void run(SourceContext<T> ctx) throws Exception;

    /**
     * Cancels the source. Most sources will have a while loop inside the
     * {@link #run(SourceContext)} method. The implementation needs to ensure that the
     * source will break out of that loop after this method is called.
     *
     * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to
     * {@code false} in this method. That flag is checked in the loop condition.
     *
     * <p>When a source is canceled, the executing thread will also be interrupted
     * (via {@link Thread#interrupt()}). The interruption happens strictly after this
     * method has been called, so any interruption handler can rely on the fact that
     * this method has completed. It is good practice to make any flags altered by
     * this method "volatile", in order to guarantee the visibility of the effects of
     * this method to any interruption handler.
     */
    void cancel();

    // ------------------------------------------------------------------------
    //  source context
    // ------------------------------------------------------------------------

    /**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        //......
    }
}
  • SourceFunction is the basic interface of flink stream data sources. The header defines the run method and cancel method, and defines the SourceContext interface at the same time.

SourceContext

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/functions/source/SourceFunction.java

    /**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        /**
         * Emits one element from the source, without attaching a timestamp. In most cases,
         * this is the default way of emitting elements.
         *
         * <p>The timestamp that the element will get assigned depends on the time characteristic of
         * the streaming program:
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
         *         current time as the timestamp.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
         *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
         *         operation (like time windows).</li>
         * </ul>
         *
         * @param element The element to emit
         */
        void collect(T element);

        /**
         * Emits one element from the source, and attaches the given timestamp. This method
         * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
         * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
         * on the stream.
         *
         * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
         * This allows programs to switch between the different time characteristics and behaviors
         * without changing the code of the source functions.
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
         *         because processing time never works with element timestamps.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
         *         system's current time, to realize proper ingestion time semantics.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
         * </ul>
         *
         * @param element The element to emit
         * @param timestamp The timestamp in milliseconds since the Epoch
         */
        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        /**
         * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
         * elements with a timestamp {@code t' <= t} will occur any more. If further such
         * elements will be emitted, those elements are considered <i>late</i>.
         *
         * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
         * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
         * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
         * automatic ingestion time watermarks.
         *
         * @param mark The Watermark to emit
         */
        @PublicEvolving
        void emitWatermark(Watermark mark);

        /**
         * Marks the source to be temporarily idle. This tells the system that this source will
         * temporarily stop emitting records and watermarks for an indefinite amount of time. This
         * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
         * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
         * watermarks without the need to wait for watermarks from this source while it is idle.
         *
         * <p>Source functions should make a best effort to call this method as soon as they
         * acknowledge themselves to be idle. The system will consider the source to resume activity
         * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
         * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
         */
        @PublicEvolving
        void markAsTemporarilyIdle();

        /**
         * Returns the checkpoint lock. Please refer to the class-level comment in
         * {@link SourceFunction} for details about how to write a consistent checkpointed
         * source.
         *
         * @return The object to use as the lock
         */
        Object getCheckpointLock();

        /**
         * This method is called by the system to shut down the context.
         */
        void close();
    }
  • The SourceContext mainly defines the interface where the data source transmits data, here is the collect method (If the data itself does not have time, when using TimeCharacteristic.EventTime, you can use TimestampAssigner to specify TimeStamp when performing time-dependent related operations; If it is matched with TimeCharacteristic. Ingression Time, the system will automatically generate timestamp without specifying it.); In addition to the collect method, collectWithTimestamp transmits data while specifying timestamp (Used in conjunction with TimeCharacteristic.EventTime)
  • In addition, the emitWatermark method is also defined, which is only used to consider the data in which time range when dealing with data disorder. This method is only valid when it is matched with TimeCha racteristic.EventTime. If it is timecharacterstics.processing time, watermark will be ignored; If it is time characterstics. ingression time, watermarks will be replaced by automatically generated ingestion time watermarks.
  • The markAsTemporarilyIdle method is also defined here, which is used to tell the system that the current source will suspend transmitting data for a period of time. This method is only valid when it is use d in conjunction with TimeCharacteristic. Ingression Time or TimeCharacteristic.EventTime. When SourceContext.collect(T) or sourcecontext.collectwithtimestamps (t, long) or sourcecontext.emitmark (watermark) is called, the system will think that sourcehas returned to continue production of data.
  • The getCheckpointLock method is also defined here, which is used to return the lock of checkpoint to facilitate source to handle the logic related to checkpoint.
  • The close method is mainly called by the system to close the context related resources.

Task.run(Upstream)

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • The run method of a Task calls invokable.invokable (), where invoke is StreamTask.

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/StreamTask.java

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

        //......

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            //......

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;

            //......
        }
    }
}
  • In the invoke method of StreamTask, the run method of the subclass is called, where the subclass is SourceStreamTask.

SourceStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

    @Override
    protected void run() throws Exception {
        headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
    }
  • The run method of SourceStreamTask mainly calls the run method of headOperator, which is SourceStream here.

SourceStream.run

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/operators/StreamSource.java

    public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        run(lockingObject, streamStatusMaintainer, output);
    }

    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector) throws Exception {

        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

        final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
            ? getExecutionConfig().getLatencyTrackingInterval()
            : configuration.getLong(MetricOptions.LATENCY_INTERVAL);

        LatencyMarksEmitter<OUT> latencyEmitter = null;
        if (latencyTrackingInterval > 0) {
            latencyEmitter = new LatencyMarksEmitter<>(
                getProcessingTimeService(),
                collector,
                latencyTrackingInterval,
                this.getOperatorID(),
                getRuntimeContext().getIndexOfThisSubtask());
        }

        final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

        this.ctx = StreamSourceContexts.getSourceContext(
            timeCharacteristic,
            getProcessingTimeService(),
            lockingObject,
            streamStatusMaintainer,
            collector,
            watermarkInterval,
            -1);

        try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }
  • For the run method of SourceStream, the SourceFunction.SourceContext is first constructed by streamsourcecontexts.getsourcecontext, and then the run method of userFunction is called. the userFunction here is RandomWordSource, that is, the user-defined SourceFunction (Note here that before calling userFunction.run(ctx), if latencyTrackingInterval is greater than 0, LatencyMarksEmitter is also created.)

RandomWordSource.run

public class RandomWordSource implements SourceFunction<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);

    private volatile boolean isRunning = true;

    private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            Thread.sleep(300);
            int rnd = (int) (Math.random() * 10 % words.length);
            LOGGER.info("emit word: {}", words[rnd]);
            ctx.collect(words[rnd]);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
  • RandomWordSource’s run method will always cycle through the data

StreamSource.LatencyMarksEmitter

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/operators/StreamSource.java

    private static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(
                final ProcessingTimeService processingTimeService,
                final Output<StreamRecord<OUT>> output,
                long latencyTrackingInterval,
                final OperatorID operatorId,
                final int subtaskIndex) {

            latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        try {
                            // ProcessingTimeService callbacks are executed under the checkpointing lock
                            output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
                        } catch (Throwable t) {
                            // we catch the Throwables here so that we don't trigger the processing
                            // timer services async exception handler
                            LOG.warn("Error while emitting latency marker.", t);
                        }
                    }
                },
                0L,
                latencyTrackingInterval);
        }

        public void close() {
            latencyMarkTimer.cancel(true);
        }
    }
  • LatencyMarksEmitter was created in the StreamSource run method before calling the userFunction run method (If latencytrackingterval > 0), the latencyTrackingInterval here first calls getexecutionConfig (). islatencytrackingconfigured () to determine whether executionconfig has configured this value. If configured, use the value returned by getexecutionconfig (). getlatencytrackinginterval (). if not configured, use the value returned by configuration. getlong (metric options. latency _ interval), which defaults to 2000L (The latter configuration, 2000, is used here.)
  • The ProcessingTimeService. ScheduleLettfixedrate method is called in the constructor of LatencyMarksEmitter to register a fixedRate timing task with a scheduling interval of latencyTrackingInterval.
  • The processing content of the timed task is in the onProcessTime method of ProcessingTimeCallback, in which Output. EmitlatenCymarker (New Latency Marker (Timestamp, Operator ID, Subtask Index)) is called to send Latency Marker; ProcessingTimeService here is SystemProcessingTimeService; ; Output here is abstractstreamoperator.countingoutput

SystemProcessingTimeService.scheduleAtFixedRate

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
        long nextTimestamp = getCurrentProcessingTime() + initialDelay;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.scheduleAtFixedRate(
                new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
                initialDelay,
                period,
                TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(initialDelay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

    @Override
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }
  • SystemProcessingTimeService’s ScheduleLeatfiXEDRATE method is actually implemented by entrusting timerService’s ScheduleLeatfiXEDRATE, where timerService is ScheduledThreadPoolExecutor, with its corePoolSize of 1, and then its scheduled task is RepeatedTriggerTask

RepeatedTriggerTask

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

    /**
     * Internal task which is repeatedly called by the processing time service.
     */
    private static final class RepeatedTriggerTask implements Runnable {

        private final AtomicInteger serviceStatus;
        private final Object lock;
        private final ProcessingTimeCallback target;
        private final long period;
        private final AsyncExceptionHandler exceptionHandler;

        private long nextTimestamp;

        private RepeatedTriggerTask(
                final AtomicInteger serviceStatus,
                final AsyncExceptionHandler exceptionHandler,
                final Object lock,
                final ProcessingTimeCallback target,
                final long nextTimestamp,
                final long period) {

            this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
            this.lock = Preconditions.checkNotNull(lock);
            this.target = Preconditions.checkNotNull(target);
            this.period = period;
            this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);

            this.nextTimestamp = nextTimestamp;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    if (serviceStatus.get() == STATUS_ALIVE) {
                        target.onProcessingTime(nextTimestamp);
                    }

                    nextTimestamp += period;
                } catch (Throwable t) {
                    TimerException asyncException = new TimerException(t);
                    exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
                }
            }
        }
    }
  • RepeatedTriggerTask will call onProcessingTime; of ProcessingTimeCallback when serviceStatus is STATUS_ALIVE; The nextTimestamp here was first passed in according to getcurrentprocessingtime ()+initialdelay, and then accumulated period continuously.

AbstractStreamOperator.CountingOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

    /**
     * Wrapping {@link Output} that updates metrics on the number of emitted elements.
     */
    public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            output.emitWatermark(mark);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            output.emitLatencyMarker(latencyMarker);
        }

        @Override
        public void collect(StreamRecord<OUT> record) {
            numRecordsOut.inc();
            output.collect(record);
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            numRecordsOut.inc();
            output.collect(outputTag, record);
        }

        @Override
        public void close() {
            output.close();
        }
    }
  • It actually wraps RecordWriterOutput

RecordWriterOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/io/RecordWriterOutput.java

/**
 * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
 */
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

    private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;

    private SerializationDelegate<StreamElement> serializationDelegate;

    //......

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        serializationDelegate.setInstance(latencyMarker);

        try {
            recordWriter.randomEmit(serializationDelegate);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}
  • EmitLatencyMarker here mainly called StreamRecordWriter’s randomEmit (It is actually launched by the parent RecordWriter.), to launch LatencyMarker

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

    /**
     * This is used to send LatencyMarks to a random target channel.
     */
    public void randomEmit(T record) throws IOException, InterruptedException {
        sendToTarget(record, rng.nextInt(numChannels));
    }

    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }
  • Random of RecordWriter is to randomly select a targetChannel and send it.

Task.run(Downstream)

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • The run method of the downstream Task calls invokable.invoke (), here invokable is OneInputStreamTask

OneInputStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

    @Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
  • The run method of a Task calls the invoke method of a StreamTask, while the invoke method calls the run method of OneInputStreamTask, which is mainly to call inputprocessor.processinput (); The inputProcessor here is StreamInputProcessor.

StreamInputProcessor

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

    public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }
  • The processInput method first calls currentrecorddesterializer.getnextRecord (desterializationdelegate) to get nextrecord, and then processes it only when result.isFullRecord ()
  • When processing, different processing will be carried out according to different types of StreamElement, mainly including watermark, streamStatus, latencyMakrker and normal data.
  • If it is normal data, call streamOperator.processelement (record), where streamoperator is StreamMap.

StreamMap.processElement

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/operators/StreamMap.java

/**
 * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
 */
@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}
  • Here, userFunction.map (element.getvalue ()) is called for map operation, and userfunction here is UpperCaseMapFunc.

Summary

  • SourceFunction is the basic interface of flink stream data sources. The header defines run method and cancel method, and defines SourceContext interface at the same time. The SourceContext interface mainly defines collect and collectWithTimestamp methods to transmit data, and also provides emitWatermark to transmit Watermark.
  • For data transmission, the call order is task.run-> streamtask.invoke-> sourcestream task.run-> sourcestream.run-> userfunction.run (CTX) (RandomWordSource.run); Before calling userFunction.run, the SourceStream.run will determine whether latencyTrackingInterval is greater than 0. if it is greater than 0, LatencyMarksEmitter will be created. It registers the onProcessingTime method of timing tasks to periodically callback ProcessingTimeCallback to trigger Output. EmitlatenCymarker (New Latency Marker (Timestamp, Operator ID, Subtask Index))
  • This is equivalent to receiving the user data sent by userFunction.run downstream and also receiving the LatencyMarker; sent by the scheduled task. The downstream call order is task.run-> streamtask.invoke-> oneinputstreamtask.run-> streaminputprocessor.processinput-> statuswatermarkvalue.inputwatermark or status Inputstreamstatus or streamoperator.processlatencymarker or streamOperator.processElement;; It can be seen that the streaminputprocessor.processinput will do different processing according to different types of data. if it is user data, it will call streamOperator.processElement, that is streammap.processelement-> userfunction.map (UpperCaseMapFunc.map)

doc