Talk about flink’s SpoutWrapper

  flink, storm

Order

This article mainly studies flink’s SpoutWrapper

SpoutWrapper

flink-storm_2.11-1.6.2-sources.jar! /org/apache/flink/storm/wrappers/SpoutWrapper.java

/**
 * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
 * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
 * {@link SpoutCollector} for supported types).<br>
 * <br>
 * Per default, {@link SpoutWrapper} calls the wrapped spout's {@link IRichSpout#nextTuple() nextTuple()} method in
 * an infinite loop.<br>
 * Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() nextTuple()} for a finite number of
 * times and terminate automatically afterwards (for finite input streams). The number of {@code nextTuple()} calls can
 * be specified as a certain number of invocations or can be undefined. In the undefined case, {@link SpoutWrapper}
 * terminates if no record was emitted to the output collector for the first time during a call to
 * {@link IRichSpout#nextTuple() nextTuple()}.<br>
 * If the given spout implements {@link FiniteSpout} interface and {@link #numberOfInvocations} is not provided or
 * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until
 * {@link FiniteSpout#reachedEnd()} returns true.
 */
public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction {
    //......

    /** The number of {@link IRichSpout#nextTuple()} calls. */
    private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop

    /**
     * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
     * the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to
     * {@link Tuple25} depending on the spout's declared number of attributes.
     *
     * @param spout
     *            The {@link IRichSpout spout} to be used.
     * @param numberOfInvocations
     *            The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper}
     *            terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is
     *            disabled.
     * @throws IllegalArgumentException
     *             If the number of declared output attributes is not with range [0;25].
     */
    public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations)
            throws IllegalArgumentException {
        this(spout, (Collection<String>) null, numberOfInvocations);
    }

    /**
     * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
     * the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to
     * {@link Tuple25} depending on the spout's declared number of attributes.
     *
     * @param spout
     *            The {@link IRichSpout spout} to be used.
     * @throws IllegalArgumentException
     *             If the number of declared output attributes is not with range [0;25].
     */
    public SpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
        this(spout, (Collection<String>) null, null);
    }

    @Override
    public final void run(final SourceContext<OUT> ctx) throws Exception {
        final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig()
                .getGlobalJobParameters();
        StormConfig stormConfig = new StormConfig();

        if (config != null) {
            if (config instanceof StormConfig) {
                stormConfig = (StormConfig) config;
            } else {
                stormConfig.putAll(config.toMap());
            }
        }

        final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext(
                (StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name,
                this.stormTopology, stormConfig);

        SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes,
                stormTopologyContext.getThisTaskId(), ctx);

        this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector));
        this.spout.activate();

        if (numberOfInvocations == null) {
            if (this.spout instanceof FiniteSpout) {
                final FiniteSpout finiteSpout = (FiniteSpout) this.spout;

                while (this.isRunning && !finiteSpout.reachedEnd()) {
                    finiteSpout.nextTuple();
                }
            } else {
                while (this.isRunning) {
                    this.spout.nextTuple();
                }
            }
        } else {
            int counter = this.numberOfInvocations;
            if (counter >= 0) {
                while ((--counter >= 0) && this.isRunning) {
                    this.spout.nextTuple();
                }
            } else {
                do {
                    collector.tupleEmitted = false;
                    this.spout.nextTuple();
                } while (collector.tupleEmitted && this.isRunning);
            }
        }
    }

    /**
     * {@inheritDoc}
     *
     * <p>Sets the {@link #isRunning} flag to {@code false}.
     */
    @Override
    public void cancel() {
        this.isRunning = false;
    }

    /**
     * {@inheritDoc}
     *
     * <p>Sets the {@link #isRunning} flag to {@code false}.
     */
    @Override
    public void stop() {
        this.isRunning = false;
    }

    @Override
    public void close() throws Exception {
        this.spout.close();
    }
}
  • SpoutWrapper inherits the RichParallelSourceFunction class and implements the stop method of StoppableFunction interface.
  • The run method of spoutWrapper creates flink’s SpoutCollector as the constructor parameter of storm’s SpoutOutputCollector, and then calls Spout’s open method to wrap SpoutCollector (flink) is passed to SpoutOutputCollector data transmitted by spout.
  • Spout.nextTuple () method is called according to numberOfInvocations parameter to transmit data. NumberOfInvocations is to control the number of times the nextTuple of spout is called. it can be set in the constructor when the SpoutWrapper is created. if a constructor without the numberOfInvocations parameter is used, the value is null, indicating infinite loop
  • Flink encapsulates storm’s spout and provides a finitespout interface. Flink has a reachedEnd interface to judge whether the data has been sent, so as to transform storm’s Spout into Finite mode. If you are using storm’s original spout, you are always calling the nextTuple method in a loop.
  • If numberOfInvocations is set and is greater than or equal to 0, the nextTuple method is called according to the specified number of times; If the value is less than 0, it is determined whether to terminate the cycle according to the collector.tupleEmitted value.

SpoutCollector

flink-storm_2.11-1.6.2-sources.jar! /org/apache/flink/storm/wrappers/SpoutCollector.java

/**
 * A {@link SpoutCollector} is used by {@link SpoutWrapper} to provided an Storm
 * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
 * Flink tuples and emits them via the provide {@link SourceContext} object.
 */
class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {

    /** The Flink source context object. */
    private final SourceContext<OUT> flinkContext;

    /**
     * Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the
     * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
     * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
     *
     * @param numberOfAttributes
     *            The number of attributes of the emitted tuples.
     * @param taskId
     *            The ID of the producer task (negative value for unknown).
     * @param flinkContext
     *            The Flink source context to be used.
     * @throws UnsupportedOperationException
     *             if the specified number of attributes is greater than 25
     */
    SpoutCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId,
            final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
        super(numberOfAttributes, taskId);
        assert (flinkContext != null);
        this.flinkContext = flinkContext;
    }

    @Override
    protected List<Integer> doEmit(final OUT flinkTuple) {
        this.flinkContext.collect(flinkTuple);
        // TODO
        return null;
    }

    @Override
    public void reportError(final Throwable error) {
        // not sure, if Flink can support this
    }

    @Override
    public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
        return this.tansformAndEmit(streamId, tuple);
    }

    @Override
    public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
        throw new UnsupportedOperationException("Direct emit is not supported by Flink");
    }

    public long getPendingCount() {
        return 0;
    }

}
  • SpoutCollector implements storm’s ISpoutOutputCollector interface and implements the emit, emitDirect, getPendingCount, reportError methods defined by the interface. Flink currently does not support the emitDirect method. in addition, getPendingCount always returns 0. the reportError method is an empty operation
  • Flinkcontext.collect (flinktuple) is called in doEmit to transmit data. this method is protected and is mainly called for tansformAndEmit.
  • The tansformAndEmit method is provided by the parent class AbstractStormCollector

AbstractStormCollector.tansformAndEmit

flink-storm_2.11-1.6.2-sources.jar! /org/apache/flink/storm/wrappers/AbstractStormCollector.java

    /**
     * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
     * to the specified output stream.
     *
     * @param The
     *            The output stream id.
     * @param tuple
     *            The Storm tuple to be emitted.
     * @return the return value of {@link #doEmit(Object)}
     */
    @SuppressWarnings("unchecked")
    protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
        List<Integer> taskIds;

        int numAtt = this.numberOfAttributes.get(streamId);
        int taskIdIdx = numAtt;
        if (this.taskId >= 0 && numAtt < 0) {
            numAtt = 1;
            taskIdIdx = 0;
        }
        if (numAtt >= 0) {
            assert (tuple.size() == numAtt);
            Tuple out = this.outputTuple.get(streamId);
            for (int i = 0; i < numAtt; ++i) {
                out.setField(tuple.get(i), i);
            }
            if (this.taskId >= 0) {
                out.setField(this.taskId, taskIdIdx);
            }
            if (this.split) {
                this.splitTuple.streamId = streamId;
                this.splitTuple.value = out;

                taskIds = doEmit((OUT) this.splitTuple);
            } else {
                taskIds = doEmit((OUT) out);
            }

        } else {
            assert (tuple.size() == 1);
            if (this.split) {
                this.splitTuple.streamId = streamId;
                this.splitTuple.value = tuple.get(0);

                taskIds = doEmit((OUT) this.splitTuple);
            } else {
                taskIds = doEmit((OUT) tuple.get(0));
            }
        }
        this.tupleEmitted = true;

        return taskIds;
    }
  • AbstractStormcollector. Tansformandemit, here mainly deals with split scenarios, i.e. one spout declare has multiple stream, and finally all transmit data through the subclass SpoutCollector.doEmit
  • If split is true, splitTuple, or SplitStreamType, which records streamId and its value, is passed to the doEmit method.
  • If split is false, the Tuple type, which is equivalent to the value in SplitStreamType, is passed to the doEmit method, and streamId information is less than SplitStreamType.

Task.run

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • The run method of a Task calls invokable.invokable (), where invoke is StreamTask.

StreamTask

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/StreamTask.java

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

        //......

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            //......

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;

            //......
        }
    }
}
  • In the invoke method of StreamTask, the run method of the subclass is called, where the subclass is StoppableSourceStreamTask.

StoppableSourceStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java

/**
 * Stoppable task for executing stoppable streaming sources.
 *
 * @param <OUT> Type of the produced elements
 * @param <SRC> Stoppable source function
 */
public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
    extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {

    private volatile boolean stopped;

    public StoppableSourceStreamTask(Environment environment) {
        super(environment);
    }

    @Override
    protected void run() throws Exception {
        if (!stopped) {
            super.run();
        }
    }

    @Override
    public void stop() {
        stopped = true;
        if (this.headOperator != null) {
            this.headOperator.stop();
        }
    }
}
  • StoppableSourceStreamTask inherits SourcesStreamTask and mainly implements the stop method of StoppableTask. Its run method is implemented by its direct parent class SourcesStreamTask.

SourceStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

/**
 * {@link StreamTask} for executing a {@link StreamSource}.
 *
 * <p>One important aspect of this is that the checkpointing and the emission of elements must never
 * occur at the same time. The execution must be serial. This is achieved by having the contract
 * with the StreamFunction that it must only modify its state or emit elements in
 * a synchronized block that locks on the lock Object. Also, the modification of the state
 * and the emission of elements must happen in the same block of code that is protected by the
 * synchronized block.
 *
 * @param <OUT> Type of the output elements of this source.
 * @param <SRC> Type of the source function for the stream source operator
 * @param <OP> Type of the stream source operator
 */
@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
    extends StreamTask<OUT, OP> {

    //......

    @Override
    protected void run() throws Exception {
        headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
    }
}
  • SourceStreamTask mainly calls the run method of StreamSource.

StreamSource

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/operators/StreamSource.java

/**
 * {@link StreamOperator} for streaming sources.
 *
 * @param <OUT> Type of the output elements
 * @param <SRC> Type of the source function of this stream source operator
 */
@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
        extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

    //......    

    public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        run(lockingObject, streamStatusMaintainer, output);
    }

    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector) throws Exception {

        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

        final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
            ? getExecutionConfig().getLatencyTrackingInterval()
            : configuration.getLong(MetricOptions.LATENCY_INTERVAL);

        LatencyMarksEmitter<OUT> latencyEmitter = null;
        if (latencyTrackingInterval > 0) {
            latencyEmitter = new LatencyMarksEmitter<>(
                getProcessingTimeService(),
                collector,
                latencyTrackingInterval,
                this.getOperatorID(),
                getRuntimeContext().getIndexOfThisSubtask());
        }

        final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

        this.ctx = StreamSourceContexts.getSourceContext(
            timeCharacteristic,
            getProcessingTimeService(),
            lockingObject,
            streamStatusMaintainer,
            collector,
            watermarkInterval,
            -1);

        try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }
  • It calls userFunction.run(ctx), where userFunction is SpoutWrapper, thus completing the triggering of spout’s nextTuple.

Summary

  • Flink uses spoutWrapper to wrap storm’s original spout. It creates flink’s SpoutCollector in the run method as the constructor parameter of storm’s SpoutOutputCollector, and then calls Spout’s open method to wrap SpoutCollector (flink) is passed to SpoutOutputCollector data transmitted by spout. Spout.nextTuple () method is called according to numberOfInvocations parameter to transmit data. NumberOfInvocations is to control the number of times the nextTuple of spout is called. it can be set in the constructor when the SpoutWrapper is created. if a constructor without the numberOfInvocations parameter is used, the value is null, indicating infinite loop
  • SpoutCollector’s emit method internally called AbstractStormcollector. TansformandEMIT (It finally calls the SpoutCollector.doEmit method to launch.), for scenarios with multiple stream, packages the tuple of SplitStreamType to the doEmit method; If there is only one stream, only the ordinary tuple is passed to the doEmit method.
  • Flink’s Task’s run method calls StreamTask’s invoke method, while StreamTask’s invoke method calls subclasses (The subclass here is StoppableSourceStreamTask) is implemented by the direct parent StoppableSourceStreamTask, and it mainly calls the run method of StreamSource, while the run method of StreamSource calls userFunction.run(ctx). The userFunction here is a SpoutWrapper, thus executing the logic of the sportnext nextTuple and transmitting through flink’s SpoutCollector.

doc