[case47] Talk about flink’s BoltWrapper

  flink, storm

Order

This article mainly studies flink’s BoltWrapper.

BoltWrapper

flink-storm_2.11-1.6.2-sources.jar! /org/apache/flink/storm/wrappers/BoltWrapper.java

/**
 * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program.
 * It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can
 * process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type {@code OUT}
 * (see {@link AbstractStormCollector} for supported types).<br/>
 * <br/>
 * <strong>Works for single input streams only! See {@link MergedInputsBoltWrapper} for multi-input stream
 * Bolts.</strong>
 */
public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {

    @Override
    public void open() throws Exception {
        super.open();

        this.flinkCollector = new TimestampedCollector<>(this.output);

        GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters();
        StormConfig stormConfig = new StormConfig();

        if (config != null) {
            if (config instanceof StormConfig) {
                stormConfig = (StormConfig) config;
            } else {
                stormConfig.putAll(config.toMap());
            }
        }

        this.topologyContext = WrapperSetupHelper.createTopologyContext(
                getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);

        final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
                this.numberOfAttributes, this.topologyContext.getThisTaskId(), this.flinkCollector));

        if (this.stormTopology != null) {
            Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources();

            for (GlobalStreamId inputStream : inputs.keySet()) {
                for (Integer tid : this.topologyContext.getComponentTasks(inputStream
                        .get_componentId())) {
                    this.inputComponentIds.put(tid, inputStream.get_componentId());
                    this.inputStreamIds.put(tid, inputStream.get_streamId());
                    this.inputSchemas.put(tid,
                            this.topologyContext.getComponentOutputFields(inputStream));
                }
            }
        }

        this.bolt.prepare(stormConfig, this.topologyContext, stormCollector);
    }

    @Override
    public void dispose() throws Exception {
        super.dispose();
        this.bolt.cleanup();
    }

    @Override
    public void processElement(final StreamRecord<IN> element) throws Exception {
        this.flinkCollector.setTimestamp(element);

        IN value = element.getValue();

        if (this.stormTopology != null) {
            Tuple tuple = (Tuple) value;
            Integer producerTaskId = tuple.getField(tuple.getArity() - 1);

            this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(producerTaskId),
                    producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds
                    .get(producerTaskId), MessageId.makeUnanchored()));

        } else {
            this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null,
                    MessageId.makeUnanchored()));
        }
    }


}
  • Flink uses BoltWrapper to wrap storm’s IRichBolt, which implements the OneInputStreamOperator interface and inherits the AbstractStreamOperator class.
  • The OneInputStreamOperator interface inherits the StreamOperator interface and additionally defines three interfaces: processElement, processWatermark, processLatencyMarker.
  • The AbstractStreamOperator class implements the StreamOperator interface, but it helps implement the processWatermark and processLatencyMarker interfaces.
  • BoltWrapper is mainly a processElement method that implements the OneInputStreamOperator interface, followed by the open and dispose methods that override the StreamOperator interface definition.
  • The main point of the open method is to call bolt’s prepare method, pass it into the OutputCollector that wraps the BoltCollector, and collect bolt-emitted data to flink through the BoltCollector, which uses flink’s TimestampedCollector.

BoltCollector

flink-storm_2.11-1.6.2-sources.jar! /org/apache/flink/storm/wrappers/BoltCollector.java

/**
 * A {@link BoltCollector} is used by {@link BoltWrapper} to provided an Storm compatible
 * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples
 * and emits them via the provide {@link Output} object.
 */
class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {

    /** The Flink output Collector. */
    private final Collector<OUT> flinkOutput;

    /**
     * Instantiates a new {@link BoltCollector} that emits Flink tuples to the given Flink output object. If the
     * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
     * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
     *
     * @param numberOfAttributes
     *            The number of attributes of the emitted tuples per output stream.
     * @param taskId
     *            The ID of the producer task (negative value for unknown).
     * @param flinkOutput
     *            The Flink output object to be used.
     * @throws UnsupportedOperationException
     *             if the specified number of attributes is greater than 25
     */
    BoltCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId,
            final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
        super(numberOfAttributes, taskId);
        assert (flinkOutput != null);
        this.flinkOutput = flinkOutput;
    }

    @Override
    protected List<Integer> doEmit(final OUT flinkTuple) {
        this.flinkOutput.collect(flinkTuple);
        // TODO
        return null;
    }

    @Override
    public void reportError(final Throwable error) {
        // not sure, if Flink can support this
    }

    @Override
    public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
        return this.tansformAndEmit(streamId, tuple);
    }

    @Override
    public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
        throw new UnsupportedOperationException("Direct emit is not supported by Flink");
    }

    @Override
    public void ack(final Tuple input) {}

    @Override
    public void fail(final Tuple input) {}

    @Override
    public void resetTimeout(Tuple var1) {}

}
  • BoltCollector implements storm’s IOutputCollector interface, but ack, fail, resetTimeout, reportError operations are all empty, and emitDirect operations are not supported.
  • DoEmit method calls flinkOutput.collect(flinkTuple)
  • The emit method calls tansformAndEmit(streamId, tuple), which is implemented by the inherited parent class AbstractStormCollector

TimestampedCollector.collect

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/api/operators/TimestampedCollector.java

/**
 * Wrapper around an {@link Output} for user functions that expect a {@link Collector}.
 * Before giving the {@link TimestampedCollector} to a user function you must set
 * the timestamp that should be attached to emitted elements. Most operators
 * would set the timestamp of the incoming
 * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
 *
 * @param <T> The type of the elements that can be emitted.
 */
@Internal
public class TimestampedCollector<T> implements Collector<T> {

    private final Output<StreamRecord<T>> output;

    private final StreamRecord<T> reuse;

    /**
     * Creates a new {@link TimestampedCollector} that wraps the given {@link Output}.
     */
    public TimestampedCollector(Output<StreamRecord<T>> output) {
        this.output = output;
        this.reuse = new StreamRecord<T>(null);
    }

    @Override
    public void collect(T record) {
        output.collect(reuse.replace(record));
    }

    public void setTimestamp(StreamRecord<?> timestampBase) {
        if (timestampBase.hasTimestamp()) {
            reuse.setTimestamp(timestampBase.getTimestamp());
        } else {
            reuse.eraseTimestamp();
        }
    }

    public void setAbsoluteTimestamp(long timestamp) {
        reuse.setTimestamp(timestamp);
    }

    public void eraseTimestamp() {
        reuse.eraseTimestamp();
    }

    @Override
    public void close() {
        output.close();
    }
}
  • TimestampedCollector implements flink’s Collector interface, where setTimestamp, setAbsoluteTimestamp and eraseTimestamp are additionally added to the header.
  • It uses the StreamRecord object, which contains three attributes: value, timestamp and hasTimestamp, and can associate value with timestamp.
  • The collect method here called the object returned by the replace of StreamRecord. the replace method only updated the value reference, but the timestamp in it was not updated.

AbstractStormCollector.tansformAndEmit

flink-storm_2.11-1.6.2-sources.jar! /org/apache/flink/storm/wrappers/AbstractStormCollector.java

    /**
     * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
     * to the specified output stream.
     *
     * @param The
     *            The output stream id.
     * @param tuple
     *            The Storm tuple to be emitted.
     * @return the return value of {@link #doEmit(Object)}
     */
    @SuppressWarnings("unchecked")
    protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
        List<Integer> taskIds;

        int numAtt = this.numberOfAttributes.get(streamId);
        int taskIdIdx = numAtt;
        if (this.taskId >= 0 && numAtt < 0) {
            numAtt = 1;
            taskIdIdx = 0;
        }
        if (numAtt >= 0) {
            assert (tuple.size() == numAtt);
            Tuple out = this.outputTuple.get(streamId);
            for (int i = 0; i < numAtt; ++i) {
                out.setField(tuple.get(i), i);
            }
            if (this.taskId >= 0) {
                out.setField(this.taskId, taskIdIdx);
            }
            if (this.split) {
                this.splitTuple.streamId = streamId;
                this.splitTuple.value = out;

                taskIds = doEmit((OUT) this.splitTuple);
            } else {
                taskIds = doEmit((OUT) out);
            }

        } else {
            assert (tuple.size() == 1);
            if (this.split) {
                this.splitTuple.streamId = streamId;
                this.splitTuple.value = tuple.get(0);

                taskIds = doEmit((OUT) this.splitTuple);
            } else {
                taskIds = doEmit((OUT) tuple.get(0));
            }
        }
        this.tupleEmitted = true;

        return taskIds;
    }
  • AbstractStormcollector. Tansformandemit, here mainly deals with split scenarios, that is, one bolt declare has multiple stream, and finally all transmit data through the subclass BoltCollector.doEmit
  • If split is true, splitTuple, or SplitStreamType, which records streamId and its value, is passed to the doEmit method.
  • If split is false, the Tuple type, which is equivalent to the value in SplitStreamType, is passed to the doEmit method, and streamId information is less than SplitStreamType.

Task.run

flink-runtime_2.11-1.6.2-sources.jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • The run method of a Task calls invokable.invokable (), where invoke is StreamTask.

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/StreamTask.java

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

        //......

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            //......

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;

            //......
        }
    }
}
  • In the invoke method of StreamTask, the run method of subclass is called, where subclass is OneInputStreamTask.

OneInputStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

    @Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
  • The run method mainly calls inputProcessor.processInput (), where inputProcessor is StreamInputProcessor.

StreamInputProcessor.processInput

flink-streaming-java_2.11-1.6.2-sources.jar! /org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

    public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

            //......
        }
    }
  • The processInput method first obtains the nextRecord through currentrecord desterializer.getnextrecord (desterializationdelegate), and then calls to streamOperator.processelement (record) to process, where streamoperator is BoltWrapper.

Summary

  • Flink uses BoltWrapper to wrap storm’s IRichBolt, which implements the processElement method of OneInputStreamOperator interface, in which the bolt.execute method is executed; In addition, the prepare method of bolt is called in the open method to implement the StreamOperator, and is passed into the OutputCollector that wraps the BoltCollector. the BoltCollector collects the data emitted when bolt.execute to flink, which uses flink’s TimestampedCollector.
  • BoltCollector’s emit method internally called AbstractStormcollector. Tansformandemit (It finally calls the BoltCollector.doEmit method to launch), for scenarios with multiple stream, packages the tuple of SplitStreamType to the doEmit method; If there is only one stream, only the ordinary tuple is passed to the doEmit method.
  • Flink’s Task’s run method calls StreamTask’s invoke method, while StreamTask’s invoke method calls subclasses (The subclass here is OneInputStreamTask.The run method of OneInputStreamTask is to continuously call inputProcessor.processInput (), where inputProcessor is StreamInputProcessor. Its processInput () calls currentrecorddesterializer.getnextRecord (desterializationdelegate) to get nextrecord, and then calls streamoperator.processelement (record) method according to the conditions. Here, the streamOperator is a BoltWrapper, and the processElement of the BoltWrapper just calls the storebolt’s execute method to execute the bolt logic and use flink’s BoltCollector to launch.

doc