Talk about storm’s messageTimeout.

  storm

Order

This article mainly studies storm’s messageTimeout.

TOPOLOGY_MESSAGE_TIMEOUT_SECS

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

    /**
     * True if Storm should timeout messages or not. Defaults to true. This is meant to be used in unit tests to prevent tuples from being
     * accidentally timed out during the test.
     */
    @isBoolean
    public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts";

    /**
     * The maximum amount of time given to the topology to fully process a message emitted by a spout. If the message is not acked within
     * this time frame, Storm will fail the message on the spout. Some spouts implementations will then replay the message at a later time.
     */
    @isInteger
    @isPositiveNumber
    @NotNull
    public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";

    /**
     * How often a tick tuple from the "__system" component and "__tick" stream should be sent to tasks. Meant to be used as a
     * component-specific configuration.
     */
    @isInteger
    public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS = "topology.tick.tuple.freq.secs";
  • In defaults.yaml, topology.enable.message.timeout defaults to true.
  • In defaults.yaml, topology.message.timeout.secs defaults to 30.
  • The default value of topology.tick.tuple.freq.secs in defaults.yaml is null, which is actually the value of topology.message.timeout.secs

StormCommon.addAcker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

    public static void addAcker(Map<String, Object> conf, StormTopology topology) {
        int ackerNum =
            ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
        Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);

        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
        outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));

        Map<String, Object> ackerConf = new HashMap<>();
        ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));

        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);

        for (Bolt bolt : topology.get_bolts().values()) {
            ComponentCommon common = bolt.get_common();
            common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
            common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
            common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
        }

        for (SpoutSpec spout : topology.get_spouts().values()) {
            ComponentCommon common = spout.get_common();
            Map<String, Object> spoutConf = componentConf(spout);
            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
                          ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
            common.set_json_conf(JSONValue.toJSONString(spoutConf));
            common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
                                  Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
        }

        topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
    }
  • Storm used the value of config.topology _ message _ timeout _ secs as config.topology _ tick _ tuple _ freq _ secs when adding addAcker.

Executor.setupTicks

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    protected void setupTicks(boolean isSpout) {
        final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
        if (tickTimeSecs != null) {
            boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
            if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
                || (!enableMessageTimeout && isSpout)) {
                LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId);
            } else {
                StormTimer timerTask = workerData.getUserTimer();
                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,
                                            () -> {
                                                TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs),
                                                                                Constants.SYSTEM_COMPONENT_ID,
                                                                                (int) Constants.SYSTEM_TASK_ID,
                                                                                Constants.SYSTEM_TICK_STREAM_ID);
                                                AddressedTuple tickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
                                                try {
                                                    receiveQueue.publish(tickTuple);
                                                    receiveQueue.flush(); // avoid buffering
                                                } catch (InterruptedException e) {
                                                    LOG.warn("Thread interrupted when emitting tick tuple. Setting interrupt flag.");
                                                    Thread.currentThread().interrupt();
                                                    return;
                                                }
                                            }
                );
            }
        }
    }
  • Executor uses config.topology _ tick _ tuple _ freq _ secs as tickTimeSecs, i.e. tickTuple’s scheduling interval, when setupTicks.
  • One of the prerequisites for scheduling tickTuple is to open config.topology _ enable _ message _ timeout.
  • The timing task sends a tickTuple every tickTimeSecs, and the srcComponent of the tuple is set to Constants.SYSTEM_COMPONENT_ID (__systemSystem _ task _ id (-1System _ tick _ stream _ id (__tick)

SpoutExecutor.tupleActionFn

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
        String streamId = tuple.getSourceStreamId();
        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            spoutOutputCollector.flush();
        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            pending.rotate();
        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
            if (spoutObj instanceof ICredentialsListener) {
                ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
            }
        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
            Long id = (Long) tuple.getValue(0);
            TupleInfo pendingForId = pending.get(id);
            if (pendingForId != null) {
                pending.put(id, pendingForId);
            }
        } else {
            Long id = (Long) tuple.getValue(0);
            Long timeDeltaMs = (Long) tuple.getValue(1);
            TupleInfo tupleInfo = pending.remove(id);
            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                if (taskId != tupleInfo.getTaskId()) {
                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                }
                Long timeDelta = null;
                if (hasAckers) {
                    long startTimeMs = tupleInfo.getTimestamp();
                    if (startTimeMs != 0) {
                        timeDelta = timeDeltaMs;
                    }
                }
                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
                    ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
                    failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                }
            }
        }
    }
  • SpoutExecutor triggers the pending.rotate () method when the tupleActionFn method receives the tickTuple of constants.system _ tick _ stream _ id

RotatingMap.rotate

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java

    public Map<K, V> rotate() {
        Map<K, V> dead = _buckets.removeLast();
        _buckets.addFirst(new HashMap<K, V>());
        if (_callback != null) {
            for (Entry<K, V> entry : dead.entrySet()) {
                _callback.expire(entry.getKey(), entry.getValue());
            }
        }
        return dead;
    }
  • The rotate method triggers expire

SpoutExecutor.RotatingMap.ExpiredCallback

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

    public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
        this.threadId = Thread.currentThread().getId();
        executorTransfer.initLocalRecvQueues();
        while (!stormActive.get()) {
            Utils.sleep(100);
        }

        LOG.info("Opening spout {}:{}", componentId, taskIds);
        this.idToTask = idToTask;
        this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
        this.spouts = new ArrayList<>();
        for (Task task : idToTask) {
            if (task != null) {
                this.spouts.add((ISpout) task.getTaskObject());
            }
        }
        this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() {
            @Override
            public void expire(Long key, TupleInfo tupleInfo) {
                Long timeDelta = null;
                if (tupleInfo.getTimestamp() != 0) {
                    timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
                }
                failSpoutMsg(SpoutExecutor.this, idToTask.get(tupleInfo.getTaskId() - idToTaskBase), timeDelta, tupleInfo, "TIMEOUT");
            }
        });
        //......
    }
  • SpoutExecutor registered pending Rotational RotatingMap.ExpiredCallback when init, in which failSpoutMsg was called on expired tuple

Summary

  • The messageTimeout related parameter of spout is config.topology _ enable _ message _ timeout (Message.timeout defaults to true.)、Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(Secs default 30)
  • StormCommon takes config.topology _ message _ timeout _ secs as the config.topology _ tick _ tuple _ freq _ secs value when adding addAcker and as the scheduling interval of tickTimeSecs, i.e. tickTuple of ack
  • When SpoutExecutor receives the tickTuple, it triggers the rotate operation of the RotatingMap to perform expire callback, while SpoutExecutor registers the RotatingMap.ExpiredCallback when init, performs failSpoutMsg operation on the expired tuple, and callbacks the fail method of spout, thus completing the function of messageTimeout (For spout that does not open a msgId message, its pending queue is empty, so rotate and ExpiredCallback here are equivalent to no effect.)

doc