Talk about storm’s tickTuple

  storm

Order

This article mainly studies storm’s tickTuple

Example

TickWordCountBolt

public class TickWordCountBolt extends BaseBasicBolt {

    private static final Logger LOGGER = LoggerFactory.getLogger(TickWordCountBolt.class);

    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        return conf;
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        if(TupleUtils.isTick(input)){
            //execute tick logic
            LOGGER.info("execute tick tuple, emit and clear counts");
            counts.entrySet().stream()
                    .forEach(entry -> collector.emit(new Values(entry.getKey(), entry.getValue())));
            counts.clear();
        }else{
            String word = input.getString(0);
            Integer count = counts.get(word);
            if (count == null) count = 0;
            count++;
            counts.put(word, count);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  • With tick, you need to judge the tuple type yourself in the execute method and then execute the corresponding processing.
  • The example here is rewriting the getComponentConfiguration method, directly adding a conf and setting the config.topology _ tick _ tuple _ freq _ secs parameter.

tickTopology

    @Test
    public void testTickTuple() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        //并发度10
        builder.setSpout("spout", new TestWordSpout(), 10);
        builder.setBolt("count", new TickWordCountBolt(), 5)
//                .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3)
                .fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1)
                .shuffleGrouping("count");
        SubmitHelper.submitRemote("tickDemo",builder);
    }
  • In addition to rewriting the getComponentConfiguration method to configure the config.topology _ tick _ tuple _ freq _ secs parameter, you can also call the addConfiguration method after TopologyBuilder.setBolt to configure. this configuration will override the configuration of the getComponentConfiguration method.
  • In addition to the bolt configuration, it is also possible to configure the config.topology _ tick _ tuple _ freq _ secs parameter on the incoming conf when the StormSubmitter.submitTopolog y is in progress. However, this configuration is global and acts on all BOLT of the entire topology. When there is both global configuration and bolt’s own configuration, the priority with small scope of action is given.

Source code analysis

TupleUtils.isTick

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/TupleUtils.java

    public static boolean isTick(Tuple tuple) {
        return tuple != null
               && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
               && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
    }
  • IsTick is judged by tuple’s sourceComponent and sourceStreamId.

TopologyBuilder.setBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

    /**
     * Define a new bolt in this topology with the specified amount of parallelism.
     *
     * @param id               the id of this component. This id is referenced by other components that want to consume this bolt's
     *                         outputs.
     * @param bolt             the bolt
     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process
     *                         somewhere around the cluster.
     * @return use the returned object to declare the inputs to this component
     *
     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
     */
    public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
        validateUnusedId(id);
        initCommon(id, bolt, parallelism_hint);
        _bolts.put(id, bolt);
        return new BoltGetter(id);
    }

    private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
        ComponentCommon common = new ComponentCommon();
        common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
        if (parallelism != null) {
            int dop = parallelism.intValue();
            if (dop < 1) {
                throw new IllegalArgumentException("Parallelism must be positive.");
            }
            common.set_parallelism_hint(dop);
        }
        Map<String, Object> conf = component.getComponentConfiguration();
        if (conf != null) {
            common.set_json_conf(JSONValue.toJSONString(conf));
        }
        commons.put(id, common);
    }
  • When setBolt called initCommon, here called bolt’s getComponentConfiguration and wrote its configuration to commons.

BoltGetter.addConfiguration

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

    protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
        //......
    }
  • The addConfiguration method inherits from BaseConfigurationDeclarer.

BaseConfigurationDeclarer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java

public abstract class BaseConfigurationDeclarer<T extends ComponentConfigurationDeclarer> implements ComponentConfigurationDeclarer<T> {
    @Override
    public T addConfiguration(String config, Object value) {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(config, value);
        return addConfigurations(configMap);
    }
    //......
}
  • Create a new map here, and then call addConfigurations of the subclass, where the subclass is ConfigGetter

ConfigGetter.addConfigurations

    protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
        String id;

        public ConfigGetter(String id) {
            this.id = id;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T addConfigurations(Map<String, Object> conf) {
            if (conf != null) {
                if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
                    throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
                }
                if (!conf.isEmpty()) {
                    String currConf = commons.get(id).get_json_conf();
                    commons.get(id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
                }
            }
            return (T) this;
        }
        //......
    }

    private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) {
        Map<String, Object> res = new HashMap<>(into);
        res.putAll(newMap);
        return JSONValue.toJSONString(res);
    }
  • You can see here that you get the configuration from common and then merge your own configuration into the component’s own configuration, that is, the configuration item of addConfiguration will overwrite bolt’s configuration in getComponentConfiguration method.

Executor.normalizedComponentConf

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    private Map<String, Object> normalizedComponentConf(
        Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
        List<String> keysToRemove = retrieveAllConfigKeys();
        keysToRemove.remove(Config.TOPOLOGY_DEBUG);
        keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
        keysToRemove.remove(Config.TOPOLOGY_TRANSACTIONAL_ID);
        keysToRemove.remove(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
        keysToRemove.remove(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS);
        keysToRemove.remove(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME);
        keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER);
        keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
        keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);

        Map<String, Object> componentConf;
        String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
        if (specJsonConf != null) {
            try {
                componentConf = (Map<String, Object>) JSONValue.parseWithException(specJsonConf);
            } catch (ParseException e) {
                throw new RuntimeException(e);
            }
            for (Object p : keysToRemove) {
                componentConf.remove(p);
            }
        } else {
            componentConf = new HashMap<>();
        }

        Map<String, Object> ret = new HashMap<>();
        ret.putAll(topoConf);
        ret.putAll(componentConf);

        return ret;
    }
  • Executor calls normalizedComponentConf in the constructor to merge the configurations
  • For componconf, some configuration items of topology are removed, and then for the return value, putAll(topoConf) and putconf (componconf) are used first. it can be seen that if there are configurations of config.topology _ tick _ tuple _ freq _ secs, componconf will overwrite the configuration of topoConf.

Executor.setupTicks

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    protected void setupTicks(boolean isSpout) {
        final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
        if (tickTimeSecs != null) {
            boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
            if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
                || (!enableMessageTimeout && isSpout)) {
                LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId);
            } else {
                StormTimer timerTask = workerData.getUserTimer();
                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,
                                            () -> {
                                                TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
                                                                                Constants.SYSTEM_COMPONENT_ID,
                                                                                (int) Constants.SYSTEM_TASK_ID,
                                                                                Constants.SYSTEM_TICK_STREAM_ID);
                                                AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
                                                try {
                                                    receiveQueue.publish(tickTuple);
                                                    receiveQueue.flush(); // avoid buffering
                                                } catch (InterruptedException e) {
                                                    LOG.warn("Thread interrupted when emitting tick tuple. Setting interrupt flag.");
                                                    Thread.currentThread().interrupt();
                                                    return;
                                                }
                                            }
                );
            }
        }
    }
  • TopoConf here is the configuration after the combination of topoConf and componentConf, and timerTask is set for the componentthat meets the conditions.
  • You can see here that the srcComponent of new TupleImpl is set to Constants.SYSTEM_COMPONENT_ID (__systemSystem _ task _ id (-1System _ tick _ stream _ id (__tick)
  • TimerTask calls jcque (receiveQueue).publish(tickTuple)

JCQueue.publish

    private final DirectInserter directInserter = new DirectInserter(this);

    /**
     * Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
     */
    public void publish(Object obj) throws InterruptedException {
        Inserter inserter = getInserter();
        inserter.publish(obj);
    }

    private Inserter getInserter() {
        Inserter inserter;
        if (producerBatchSz > 1) {
            inserter = thdLocalBatcher.get();
            if (inserter == null) {
                BatchInserter b = new BatchInserter(this, producerBatchSz);
                inserter = b;
                thdLocalBatcher.set(b);
            }
        } else {
            inserter = directInserter;
        }
        return inserter;
    }

    private static class DirectInserter implements Inserter {
        private JCQueue q;

        public DirectInserter(JCQueue q) {
            this.q = q;
        }

        /**
         * Blocking call, that can be interrupted via Thread.interrupt
         */
        @Override
        public void publish(Object obj) throws InterruptedException {
            boolean inserted = q.tryPublishInternal(obj);
            int idleCount = 0;
            while (!inserted) {
                q.metrics.notifyInsertFailure();
                if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
                    LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", q.getName());
                }

                idleCount = q.backPressureWaitStrategy.idle(idleCount);
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                inserted = q.tryPublishInternal(obj);
            }

        }
        //......
    }    

    // Non Blocking. returns true/false indicating success/failure. Fails if full.
    private boolean tryPublishInternal(Object obj) {
        if (recvQueue.offer(obj)) {
            metrics.notifyArrivals(1);
            return true;
        }
        return false;
    }
  • JCQueue.publish calls inserter.publish, where inserter may be BatchInserter or DirectInserter, look at DirectInserter’s publish method here.
  • DirectInserter’s publish method calls JCQueue.tryPublishInternal, while the method calls recvQueue.offer(obj) and puts it into the recvQueue queue.

JCQueue.consume

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java

    /**
     * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
     * elements consumed from Q
     */
    public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
        try {
            return consumeImpl(consumer, exitCond);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
     *
     * @param consumer
     * @param exitCond
     */
    private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
        int drainCount = 0;
        while (exitCond.keepRunning()) {
            Object tuple = recvQueue.poll();
            if (tuple == null) {
                break;
            }
            consumer.accept(tuple);
            ++drainCount;
        }

        int overflowDrainCount = 0;
        int limit = overflowQ.size();
        while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
            Object tuple = overflowQ.poll();
            ++overflowDrainCount;
            consumer.accept(tuple);
        }
        int total = drainCount + overflowDrainCount;
        if (total > 0) {
            consumer.flush();
        }
        return total;
    }
  • InTalk about executor and tasks of storeworker.In this article, we have seen that the executor’s asyncLoop mainly calls the executer.call (). call () method, while the bolt Executor.call().call the JCQueue.consume method, which calls the recvQueue.poll ()
  • It can be seen that tickTuple and bolt’s business tuple share a queue.

Summary

  • Regarding the parameter configuration of tick, there are topology level, BoltDeclarer level and bolt’s getComponentConfiguration level. in three ways, BoltDeclarer has the highest priority, then bolt’s getComponentConfiguration level, and finally the global topology level configuration.
  • For tickTuple, StormTimer is used for scheduling. When scheduling, the publish method of JCQueue to bolt is specifically called RecvQueue.Offer (OBJ); While executor’s asycLoop calls the Executor.call().call () method, for BoltExecutor.call it calls JCQueue.consume method, which calls recvQueue.poll ()
  • Therefore, it can be seen that timer is only responsible for sending tickTuple to the queue. As for the time accuracy of triggering, it is not necessarily 100% accurate. It depends on the queue length of recvQueue and the consumption capacity of executor.

doc