Talk about storm’s maxSpoutPending

  storm

Order

This article mainly studies storm’s maxSpoutPending

TOPOLOGY_MAX_SPOUT_PENDING

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

    /**
     * The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to
     * spouts or topologies as a whole.
     *
     * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has
     * no effect for unreliable spouts that don't tag their tuples with a message id.
     */
    @isInteger
    @isPositiveNumber
    public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending";
  • Topology _ max _ sport _ pending sets the maximum number of tuples that a spout task has emit waiting for ack. this configuration is only valid for transmitting reliable tuples (Set msgId) is at work.
  • The default configuration of topology.max.spout.pending in the defaults.yaml file is null.

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

    public void init(final ArrayList<Task> idToTask, int idToTaskBase) {
        this.threadId = Thread.currentThread().getId();
        executorTransfer.initLocalRecvQueues();
        while (!stormActive.get()) {
            Utils.sleep(100);
        }

        LOG.info("Opening spout {}:{}", componentId, taskIds);
        this.idToTask = idToTask;
        this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size();
        //......
    }

    public Callable<Long> call() throws Exception {
        init(idToTask, idToTaskBase);
        return new Callable<Long>() {
            final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
            int recvqCheckSkips = 0;
            int swIdleCount = 0; // counter for spout wait strategy
            int bpIdleCount = 0; // counter for back pressure wait strategy
            int rmspCount = 0;

            @Override
            public Long call() throws Exception {
                int receiveCount = 0;
                if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
                    receiveCount = receiveQueue.consume(SpoutExecutor.this);
                    recvqCheckSkips = 0;
                }
                long currCount = emittedCount.get();
                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
                boolean isActive = stormActive.get();

                if (!isActive) {
                    inactiveExecute();
                    return 0L;
                }

                if (!lastActive.get()) {
                    lastActive.set(true);
                    activateSpouts();
                }
                boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
                boolean noEmits = true;
                long emptyStretch = 0;

                if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
                    for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
                        spouts.get(j).nextTuple();
                    }
                    noEmits = (currCount == emittedCount.get());
                    if (noEmits) {
                        emptyEmitStreak.increment();
                    } else {
                        emptyStretch = emptyEmitStreak.get();
                        emptyEmitStreak.set(0);
                    }
                }
                if (reachedMaxSpoutPending) {
                    if (rmspCount == 0) {
                        LOG.debug("Reached max spout pending");
                    }
                    rmspCount++;
                } else {
                    if (rmspCount > 0) {
                        LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
                    }
                    rmspCount = 0;
                }

                if (receiveCount > 1) {
                    // continue without idling
                    return 0L;
                }
                if (!pendingEmits.isEmpty()) { // then facing backpressure
                    backPressureWaitStrategy();
                    return 0L;
                }
                bpIdleCount = 0;
                if (noEmits) {
                    spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
                    return 0L;
                }
                swIdleCount = 0;
                return 0L;
            }

            private void backPressureWaitStrategy() throws InterruptedException {
                long start = Time.currentTimeMillis();
                if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
                    LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
                }
                bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
                spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
            }

            private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
                emptyEmitStreak.increment();
                long start = Time.currentTimeMillis();
                swIdleCount = spoutWaitStrategy.idle(swIdleCount);
                if (reachedMaxSpoutPending) {
                    spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
                } else {
                    if (emptyStretch > 0) {
                        LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
                    }
                }
            }

            // returns true if pendingEmits is empty
            private boolean tryFlushPendingEmits() {
                for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
                    if (executorTransfer.tryTransfer(t, null)) {
                        pendingEmits.poll();
                    } else { // to avoid reordering of emits, stop at first failure
                        return false;
                    }
                }
                return true;
            }
        };
    }
  • Here, config.topology _ max _ sport _ pending is read from topoConf. if it cannot be read, 0 is taken and multiplied by the number of task, which is maxSpoutPending.
  • MaxSpoutPending controls the reachedMaxSpoutPending variable in the call method, only! ReachedMaxSportPending & & PendingEmitsSempty can only execute nextTuple transmit data
  • Note here that ReachedMaxSpoutPending = (MaxSpoutPending! = 0)&(pending.size () > = maxsportpending), which calculates the pending size

SpoutOutputCollectorImpl

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

    @Override
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        try {
            return sendSpoutMsg(streamId, tuple, messageId, null);
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during emit().");
            throw new RuntimeException(e);
        }
    }

    @Override
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
        try {
            sendSpoutMsg(streamId, tuple, messageId, taskId);
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during emitDirect().");
            throw new RuntimeException(e);
        }
    }

    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
        InterruptedException {
        emittedCount.increment();

        List<Integer> outTasks;
        if (outTaskId != null) {
            outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
        } else {
            outTasks = taskData.getOutgoingTasks(stream, values);
        }

        final boolean needAck = (messageId != null) && hasAckers;

        final List<Long> ackSeq = needAck ? new ArrayList<>() : null;

        final long rootId = needAck ? MessageId.generateId(random) : 0;

        for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
            Integer t = outTasks.get(i);
            MessageId msgId;
            if (needAck) {
                long as = MessageId.generateId(random);
                msgId = MessageId.makeRootId(rootId, as);
                ackSeq.add(as);
            } else {
                msgId = MessageId.makeUnanchored();
            }

            final TupleImpl tuple =
                new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
            AddressedTuple adrTuple = new AddressedTuple(t, tuple);
            executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
        }
        if (isEventLoggers) {
            taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
        }

        if (needAck) {
            boolean sample = executor.samplerCheck();
            TupleInfo info = new TupleInfo();
            info.setTaskId(this.taskId);
            info.setStream(stream);
            info.setMessageId(messageId);
            if (isDebug) {
                info.setValues(values);
            }
            if (sample) {
                info.setTimestamp(System.currentTimeMillis());
            }

            pending.put(rootId, info);
            List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
            taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
        } else if (messageId != null) {
            // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
            if (isDebug) {
                if (spoutExecutorThdId != Thread.currentThread().getId()) {
                    throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
                                               "Spout Output Collector should only emit from the main spout executor thread.");
                }
            }
            globalTupleInfo.clear();
            globalTupleInfo.setStream(stream);
            globalTupleInfo.setValues(values);
            globalTupleInfo.setMessageId(messageId);
            globalTupleInfo.setTimestamp(0);
            globalTupleInfo.setId("0:");
            Long timeDelta = 0L;
            executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
        }
        return outTasks;
    }
  • Emit and emitDirect methods of this collector both call sendSpoutMsg at the end.
  • SendSpoutMsg here is a judgment needAck = (messageId! = null) && hasAckers; If there is no messageId or acker, needAck is false.
  • If needAck is false, no data will be added to the pending queue, so SpoutExecutor’s ReachedMaxSpoutPending = (MaxSpoutPending! = 0)&(pending.size () > = maxsportpending). since pending.size >= maxSpoutPending is not established and reachedMaxSpoutPending is false, the maxsportpending mechanism will not be triggered.

MasterBatchCoordinator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java

    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        _throttler = new WindowedTimeThrottler((Number) conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
        for (String spoutId : _managedSpoutIds) {
            _states.add(TransactionalState.newCoordinatorState(conf, spoutId));
        }
        _currTransaction = getStoredCurrTransaction();

        _collector = collector;
        Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
        if (active == null) {
            _maxTransactionActive = 1;
        } else {
            _maxTransactionActive = active.intValue();
        }
        _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);


        for (int i = 0; i < _spouts.size(); i++) {
            String txId = _managedSpoutIds.get(i);
            _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
        }
        LOG.debug("Opened {}", this);
    }

    private void sync() {
        // note that sometimes the tuples active may be less than max_spout_pending, e.g.
        // max_spout_pending = 3
        // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
        // and there won't be a batch for tx 4 because there's max_spout_pending tx active
        TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
        if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
            maybeCommit.status = AttemptStatus.COMMITTING;
            _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);
        }

        if (_active) {
            if (_activeTx.size() < _maxTransactionActive) {
                Long curr = _currTransaction;
                for (int i = 0; i < _maxTransactionActive; i++) {
                    if (!_activeTx.containsKey(curr) && isReady(curr)) {
                        // by using a monotonically increasing attempt id, downstream tasks
                        // can be memory efficient by clearing out state for old attempts
                        // as soon as they see a higher attempt id for a transaction
                        Integer attemptId = _attemptIds.get(curr);
                        if (attemptId == null) {
                            attemptId = 0;
                        } else {
                            attemptId++;
                        }
                        _attemptIds.put(curr, attemptId);
                        for (TransactionalState state : _states) {
                            state.setData(CURRENT_ATTEMPTS, _attemptIds);
                        }

                        TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
                        final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);
                        _activeTx.put(curr, newTransactionStatus);
                        _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
                        LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt,
                                  newTransactionStatus, this);
                        _throttler.markEvent();
                    }
                    curr = nextTransactionId(curr);
                }
            }
        }
    }
  • The open method of MasterBatchCoordinator reads config.topology _ max _ sport _ pending setting from conf to _maxTransactionActive. if null, it defaults to 1
  • Only _ activetx.size () < _ maxtransactionactive will transmit data to BATCH_STREAM_ID here

Summary

  • Config.TOPOLOGY_MAX_SPOUT_PENDING(topology.max.spout.pending), the default is null, only for open reliable (msgId) spout of the message works; For normal spout, if SpoutOutputCollectorImpl judges that ack is not turned on, it will not add data to the pending queue, so REACHDED MAXSPOUTPING is false, which will not trigger the maxSpoutPending mechanism. However, for trident’s spout, the default is to use transactionattempt.gettransactionid () as the batchId and track by transaction.
  • For normal spout, it refers to the maximum number of waiting ack, beyond which SpoutExecutor will not call spout’s nextTuple to transmit data.
  • For trident’s spout, it refers to the number of batches that are processed at the same time. Only after these batches are processed successfully or fail can the next batch be continued.

doc