Talk about how flink is compatible with StormTopology.

  flink, storm


This article mainly studies how flink is compatible with StormTopology.


    public void testStormWordCount() throws Exception {
        //NOTE 1 build Topology the Storm way
        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomWordSpout(), 1);
        builder.setBolt("count", new WordCountBolt(), 5)
                .fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1)

        //NOTE 2 convert StormTopology to FlinkTopology
        FlinkTopology flinkTopology = FlinkTopology.createTopology(builder);

        //NOTE 3 execute program locally using FlinkLocalCluster
        Config conf = new Config();
        // only required to stabilize integration test
        conf.put(FlinkLocalCluster.SUBMIT_BLOCKING, true);

        final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
        cluster.submitTopology("stormWordCount", conf, flinkTopology);
  • FlinkLocalcluster. GetLocalCLUSTER () is used here to create or obtain FlinkLocalCLUSTER, then FlinkLocalCLUSTER. Submit topology is called to submit Topology, and at the end, FlinkLocalCluster.shutdown is used to close Cluster.
  • RandomWordSpout built here is inherited from storem’s BaseRichSpout, and WordCountBolt is inherited from storem’s BaseBasicBolt´╝Ť. PrintBolt inherits from storm’s BaseRichBolt (Because flink is a Checkpoint mechanism used, it will not convert storm's ack operation, so there is no special requirement for BaseBasicBolt or BaseRichBolt to be used here.)
  • Submit topology the topology used here is the FlinkTopology converted by StormTopoloy.



    // ------------------------------------------------------------------------
    //  Access to default local cluster
    // ------------------------------------------------------------------------

    // A different {@link FlinkLocalCluster} to be used for execution of ITCases
    private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();

     * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
     * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
     * @return a {@link FlinkLocalCluster} to be used for execution
    public static FlinkLocalCluster getLocalCluster() {
        return currentFactory.createLocalCluster();

     * Sets a different factory for FlinkLocalClusters to be used for execution.
     * @param clusterFactory
     *         The LocalClusterFactory to create the local clusters for execution.
    public static void initialize(LocalClusterFactory clusterFactory) {
        currentFactory = Objects.requireNonNull(clusterFactory);

    // ------------------------------------------------------------------------
    //  Cluster factory
    // ------------------------------------------------------------------------

     * A factory that creates local clusters.
    public interface LocalClusterFactory {

         * Creates a local Flink cluster.
         * @return A local Flink cluster.
        FlinkLocalCluster createLocalCluster();

     * A factory that instantiates a FlinkLocalCluster.
    public static class DefaultLocalClusterFactory implements LocalClusterFactory {

        public FlinkLocalCluster createLocalCluster() {
            return new FlinkLocalCluster();
  • Flink provides a static method getLocalCluster in FlinkLocalCluster to obtain FlinkLocalCluster, which creates a FlinkLocalCluster through LocalClusterFactory.
  • LocalClusterFactory uses the DefaultLocalClusterFactory implementation class, and its createLocalCluster method directly adds a FlinkLocalCluster
  • As far as the current implementation is concerned, every time FlinkLocalCluster. GetLocalCLuster is called, a new FlinkLocalCLuster will be created, which should be paid attention to when calling.



     * Creates a Flink program that uses the specified spouts and bolts.
     * @param stormBuilder The Storm topology builder to use for creating the Flink topology.
     * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
    public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
        return new FlinkTopology(stormBuilder);

    private FlinkTopology(TopologyBuilder builder) {
        this.builder = builder;
        this.stormTopology = builder.createTopology();
        // extract the spouts and bolts
        this.spouts = getPrivateField("_spouts");
        this.bolts = getPrivateField("_bolts");

        this.env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kick off the translation immediately
  • FlinkTopology provides a static factory method createTopology to create FlinkTopology.
  • FlinkTopology first saves TopologyBuilder, then calls getDeclaredField through getPrivateField reflection to obtain the private properties of _ sports and _bolts, and then saves them, which is convenient to convert topology later.
  • After that, the ExecutionEnvironment is obtained first, and finally the translateTopology is called to convert the whole StormTopology.



     * Creates a Flink program that uses the specified spouts and bolts.
    private void translateTopology() {


        // Storm defaults to parallelism 1

        /* Translation of topology */

        for (final Entry<String, IRichSpout> spout : spouts.entrySet()) {
            final String spoutId = spout.getKey();
            final IRichSpout userSpout = spout.getValue();

            final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
            final HashMap<String, Fields> sourceStreams = declarer.outputStreams;
            this.outputStreams.put(spoutId, sourceStreams);
            declarers.put(spoutId, declarer);

            final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>();
            final DataStreamSource<?> source;

            if (sourceStreams.size() == 1) {
                final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);

                final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];

                DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,

                outputStreams.put(outputStreamId, src);
                source = src;
            } else {
                final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
                        userSpout, spoutId, null, null);

                @SuppressWarnings({ "unchecked", "rawtypes" })
                DataStreamSource<SplitStreamType<Tuple>> multiSource = env.addSource(
                        spoutWrapperMultipleOutputs, spoutId,
                        (TypeInformation) TypeExtractor.getForClass(SplitStreamType.class));

                SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
                        .split(new StormStreamSelector<Tuple>());
                for (String streamId : sourceStreams.keySet()) {
                    SingleOutputStreamOperator<Tuple> outStream =
                            .map(new SplitStreamMapper<Tuple>());
                    outputStreams.put(streamId, outStream);
                source = multiSource;
            availableInputs.put(spoutId, outputStreams);

            final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
            if (common.is_set_parallelism_hint()) {
                int dop = common.get_parallelism_hint();
            } else {

         * 1. Connect all spout streams with bolts streams
         * 2. Then proceed with the bolts stream already connected
         * <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before
         * its producer
         * ->thus, we might need to repeat multiple times
        boolean makeProgress = true;
        while (bolts.size() > 0) {
            if (!makeProgress) {
                StringBuilder strBld = new StringBuilder();
                strBld.append("Unable to build Topology. Could not connect the following bolts:");
                for (String boltId : bolts.keySet()) {
                    strBld.append("\n  ");
                    strBld.append(": missing input streams [");
                    for (Entry<GlobalStreamId, Grouping> streams : unprocessdInputsPerBolt
                            .get(boltId)) {
                        strBld.append("' from '");
                        strBld.append("'; ");

                throw new RuntimeException(strBld.toString());
            makeProgress = false;

            final Iterator<Entry<String, IRichBolt>> boltsIterator = bolts.entrySet().iterator();
            while (boltsIterator.hasNext()) {

                final Entry<String, IRichBolt> bolt =;
                final String boltId = bolt.getKey();
                final IRichBolt userBolt = copyObject(bolt.getValue());

                final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();

                Set<Entry<GlobalStreamId, Grouping>> unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
                if (unprocessedBoltInputs == null) {
                    unprocessedBoltInputs = new HashSet<>();
                    unprocessdInputsPerBolt.put(boltId, unprocessedBoltInputs);

                // check if all inputs are available
                final int numberOfInputs = unprocessedBoltInputs.size();
                int inputsAvailable = 0;
                for (Entry<GlobalStreamId, Grouping> entry : unprocessedBoltInputs) {
                    final String producerId = entry.getKey().get_componentId();
                    final String streamId = entry.getKey().get_streamId();
                    final HashMap<String, DataStream<Tuple>> streams = availableInputs.get(producerId);
                    if (streams != null && streams.get(streamId) != null) {

                if (inputsAvailable != numberOfInputs) {
                    // traverse other bolts first until inputs are available
                } else {
                    makeProgress = true;

                final Map<GlobalStreamId, DataStream<Tuple>> inputStreams = new HashMap<>(numberOfInputs);

                for (Entry<GlobalStreamId, Grouping> input : unprocessedBoltInputs) {
                    final GlobalStreamId streamId = input.getKey();
                    final Grouping grouping = input.getValue();

                    final String producerId = streamId.get_componentId();

                    final Map<String, DataStream<Tuple>> producer = availableInputs.get(producerId);

                    inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer));

                final SingleOutputStreamOperator<?> outputStream = createOutput(boltId,
                        userBolt, inputStreams);

                if (common.is_set_parallelism_hint()) {
                    int dop = common.get_parallelism_hint();
                } else {

  • The whole conversion is to convert spout first and then bolt. The spouts and bolts information they base on are obtained from storm’s TopologyBuilder object using reflection in the constructor.
  • Flink uses FlinkOutputFieldsDeclarer (It implements storm's OutputFieldsDeclarer interface) to carry storm’s IRichSpout and declareOutputFields information configured in IRichBolt, but it should be noted that flink does not support dirctemit; Here, through the userSpout.declareOutputFields method, the declareinformation of the original spout is set to flinkoutputfields declare.
  • Flink uses spoutWrapper to wrap spout and converts it into RichParallelSourceFunction type. Here, different processing is performed on whether the number of outputStreams of Spout is greater than 1. After that, RichParallelSourceFunction is used as the parameter of StreamExecutionEnvironment. AddSource method to create flink DataStreamSource, which is added to availableInputs, and then parallelism of DataStreamSource is set according to spout’s parallelismHit.
  • For the transformation of bolt, unprocessdInputsPerBolt is maintained here, key is boltId, and value is GlobalStreamId and Grouping method to be connected by the bolt. since map is used for traversal, the transformed bolt may be out of order. if the connected GlobalStreamId exists, the transformation will be carried out. Then it is removed from bolts. When the GlobalStreamId of bolts connection is not in availableInputs, it needs to skip processing the next one and will not be removed from bolts, because the loop condition of the outer layer is that the size of Bolts is greater than 0, so it depends on this mechanism to process out-of-order
  • An important method for Bolts conversion is processInput, which converts Bolts grouping into a corresponding operation on spout’s DataStream (For example, shuffleGrouping is converted to rebalance operation for DataStream, fieldsGrouping is converted to keyBy operation for DataStream, globalGrouping is converted to globalgrouping, and allGrouping is converted to broadcast operationAfter that, the createOutput method is called to convert bolt’s execution logic. It uses BoltWrapper or MergedInputsBoltWrapper to convert bolt to flink’s OneInputStreamOperator. Then transform the stream as a parameter to return to flink’s singleoutputtreamoperator, add the converted singleoutputtreamoperator to availableInputs, and then set parallelism for this singleoutputtreamoperator according to bolt’s parallelismHint.


flink-storm_2.11-1.6.2-sources.jar! /org/apache/flink/storm/api/

 * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
public class FlinkLocalCluster {

    /** The log used by this mini cluster. */
    private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);

    /** The Flink mini cluster on which to execute the programs. */
    private FlinkMiniCluster flink;

    /** Configuration key to submit topology in blocking mode if flag is set to {@code true}. */

    public FlinkLocalCluster() {

    public FlinkLocalCluster(FlinkMiniCluster flink) {
        this.flink = Objects.requireNonNull(flink);

    public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
            throws Exception {
        this.submitTopologyWithOpts(topologyName, conf, topology, null);

    public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {"Running Storm topology on FlinkLocalCluster");

        boolean submitBlocking = false;
        if (conf != null) {
            Object blockingFlag = conf.get(SUBMIT_BLOCKING);
            if (blockingFlag instanceof Boolean) {
                submitBlocking = ((Boolean) blockingFlag).booleanValue();

        FlinkClient.addStormConfigToTopology(topology, conf);

        StreamGraph streamGraph = topology.getExecutionEnvironment().getStreamGraph();

        JobGraph jobGraph = streamGraph.getJobGraph();

        if (this.flink == null) {
            Configuration configuration = new Configuration();

            configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
            configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

            this.flink = new LocalFlinkMiniCluster(configuration, true);

        if (submitBlocking) {
            this.flink.submitJobAndWait(jobGraph, false);
        } else {

    public void killTopology(final String topologyName) {
        this.killTopologyWithOpts(topologyName, null);

    public void killTopologyWithOpts(final String name, final KillOptions options) {

    public void activate(final String topologyName) {

    public void deactivate(final String topologyName) {

    public void rebalance(final String name, final RebalanceOptions options) {

    public void shutdown() {
        if (this.flink != null) {
            this.flink = null;

  • FlinkLocalCluster’s submitTopology method calls submitTopologyWithOpts, which mainly sets some parameters. Call topology.getexecutionenvironment (). getStreamGraph () to generate streamgraph according to transformations, obtain JobGraph, then create LocalFlinkMiniCluster and start. Finally, submit the entire JobGraph using submitJobAndWait or submitJobDetached of LocalFlinkMiniCluster.


  • Flink provides some compatibility to storm through flinkTopology, which is very helpful for migrating storm to Flink.
  • To run storm’s topology on flink, there are mainly several steps, namely, to build storm’s native TopologyBuilder, then to convert Storm’s topology into FlinkTopology through Flink topology. Create Topology (Builder), and finally through FlinkLocalCluster (Local mode) or FlinkSubmitter (Remote submission) to submit FlinkTopology
  • FlinkTopology is the core of flink-compatible storm. It is responsible for converting StormTopology into a structure corresponding to Flink, such as using spoutWrapper to convert spout into RichParallelSourceFunction, then adding it to StreamExecutionEnvironment to create DataStream, and converting bolt grouping into the corresponding operation of Spout’s DataStream (For example, shuffleGrouping is converted to rebalance operation for DataStream, fieldsGrouping is converted to keyBy operation for DataStream, globalGrouping is converted to globalgrouping, and allGrouping is converted to broadcast operation), and then use BoltWrapper or MergedInputsBoltWrapper to convert bolt to flink’s OneInputStreamOperator, and then transform stream as parameters
  • After the FlinkTopology is built, submit it to local execution using FlinkLocalCluster to remote execution using FlinkSubmitter.
  • FlinkLocalCluster’s submitTopology method is mainly to generate StreamGraph through StreamExecutionEnvironment under FlinkTopology, obtain JobGraph through it, then create LocalLinkInCluster and start, and finally submit JobGraph through LocalLinkInCluster.