Talk about storm’s IWaitStrategy

  storm

Order

This article mainly studies storm’s IWaitStrategy

IWaitStrategy

storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java

public interface IWaitStrategy {
    static IWaitStrategy createBackPressureWaitStrategy(Map<String, Object> topologyConf) {
        IWaitStrategy producerWaitStrategy =
            ReflectionUtils.newInstance((String) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
        producerWaitStrategy.prepare(topologyConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
        return producerWaitStrategy;
    }

    void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation);

    /**
     * Implementations of this method should be thread-safe (preferably no side-effects and lock-free)
     * <p>
     * Supports static or dynamic backoff. Dynamic backoff relies on idleCounter to estimate how long caller has been idling.
     * <p>
     * <pre>
     * <code>
     *  int idleCounter = 0;
     *  int consumeCount = consumeFromQ();
     *  while (consumeCount==0) {
     *     idleCounter = strategy.idle(idleCounter);
     *     consumeCount = consumeFromQ();
     *  }
     * </code>
     * </pre>
     *
     * @param idleCounter managed by the idle method until reset
     * @return new counter value to be used on subsequent idle cycle
     */
    int idle(int idleCounter) throws InterruptedException;

    enum WAIT_SITUATION {SPOUT_WAIT, BOLT_WAIT, BACK_PRESSURE_WAIT}

}
  • This interface provides a factory method. The default is to read the value of the topology.backpressure.wait.strategy parameter, create producerWaitStrategy initialize it with wait _ situation.back _ pressure _ wait
  • There are three types of WAIT_SITUATION: sport _ wait, bolt _ wait, back _ pressure _ wait
  • This interface defines an int idle(int idleCounter) method for static or dynamic backoff.

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

public class SpoutExecutor extends Executor {

    private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class);

    private final IWaitStrategy spoutWaitStrategy;
    private final IWaitStrategy backPressureWaitStrategy;
    private final AtomicBoolean lastActive;
    private final MutableLong emittedCount;
    private final MutableLong emptyEmitStreak;
    private final SpoutThrottlingMetrics spoutThrottlingMetrics;
    private final boolean hasAckers;
    private final SpoutExecutorStats stats;
    private final BuiltinMetrics builtInMetrics;
    SpoutOutputCollectorImpl spoutOutputCollector;
    private Integer maxSpoutPending;
    private List<ISpout> spouts;
    private List<SpoutOutputCollector> outputCollectors;
    private RotatingMap<Long, TupleInfo> pending;
    private long threadId = 0;

    public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
        super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);
        this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
        this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT);
        this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
        this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
        //......
    }

    //......
}
  • Two watiStrategy have been created here, one is spoutWaitStrategy and the other is backPressureWaitStrategy.
  • SpoutWaitStrategy reads the topology.spout.wait.strategy parameter, and the value in defaults.yaml is org.Apache.Storm.Policy.WaitStrategyprogressive
  • BackPressureWaitStrategy the topology.backpressure.wait.strategy parameter, and the value in defaults.yaml is org.Apache.storm.policy.waitstrategyrogressive

BoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java

public class BoltExecutor extends Executor {

    private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class);

    private final BooleanSupplier executeSampler;
    private final boolean isSystemBoltExecutor;
    private final IWaitStrategy consumeWaitStrategy;       // employed when no incoming data
    private final IWaitStrategy backPressureWaitStrategy;  // employed when outbound path is congested
    private final BoltExecutorStats stats;
    private final BuiltinMetrics builtInMetrics;
    private BoltOutputCollectorImpl outputCollector;

    public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
        super(workerData, executorId, credentials, ClientStatsUtil.BOLT);
        this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
        this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID);
        if (isSystemBoltExecutor) {
            this.consumeWaitStrategy = makeSystemBoltWaitStrategy();
        } else {
            this.consumeWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY));
            this.consumeWaitStrategy.prepare(topoConf, WAIT_SITUATION.BOLT_WAIT);
        }
        this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
        this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
        this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),
                                           ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
        this.builtInMetrics = new BuiltinBoltMetrics(stats);
    }

    private static IWaitStrategy makeSystemBoltWaitStrategy() {
        WaitStrategyPark ws = new WaitStrategyPark();
        Map<String, Object> conf = new HashMap<>();
        conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
        ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
        return ws;
    }
    //......
}
  • Two iwait strategies have been created here, one is consumer IWaitStrategy and the other is backPressureWaitStrategy.
  • ConsumeWaitStrategy reads the topology.bolt.wait.strategy parameter without SystemBoltExecutor, and the value in defaults.yaml is org.Apache.store.policy.waitStrategyprogressive; If it is SystemBoltExecutor, it uses WaitStrategyPark policy.
  • BackPressureWaitStrategy reads the topology.backpressure.wait.strategy parameter, and the value in defaults.yaml is org.Apache.store.policy.waitstrategystrategy.

WaitStrategyPark

storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java

public class WaitStrategyPark implements IWaitStrategy {
    private long parkTimeNanoSec;

    public WaitStrategyPark() { // required for instantiation via reflection. must call prepare() thereafter
    }

    // Convenience alternative to prepare() for use in Tests
    public WaitStrategyPark(long microsec) {
        parkTimeNanoSec = microsec * 1_000;
    }

    @Override
    public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) {
        if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {
            parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PARK_MICROSEC));
        } else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
            parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC));
        } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
            parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC));
        } else {
            throw new IllegalArgumentException("Unknown wait situation : " + waitSituation);
        }
    }

    @Override
    public int idle(int idleCounter) throws InterruptedException {
        if (parkTimeNanoSec == 0) {
            return 1;
        }
        LockSupport.parkNanos(parkTimeNanoSec);
        return idleCounter + 1;
    }
}
  • This policy uses the locksupport.parknano (parktimenano sec) method.

WaitStrategyProgressive

storm-2.0.0/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java

/**
 * A Progressive Wait Strategy
 * <p> Has three levels of idling. Stays in each level for a configured number of iterations before entering the next level.
 * Level 1 - No idling. Returns immediately. Stays in this level for `level1Count` iterations. Level 2 - Calls LockSupport.parkNanos(1).
 * Stays in this level for `level2Count` iterations Level 3 - Calls Thread.sleep(). Stays in this level until wait situation changes.
 *
 * <p>
 * The initial spin can be useful to prevent downstream bolt from repeatedly sleeping/parking when the upstream component is a bit
 * relatively slower. Allows downstream bolt can enter deeper wait states only if the traffic to it appears to have reduced.
 * <p>
 */
public class WaitStrategyProgressive implements IWaitStrategy {
    private int level1Count;
    private int level2Count;
    private long level3SleepMs;

    @Override
    public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) {
        if (waitSituation == WAIT_SITUATION.SPOUT_WAIT) {
            level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
            level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
            level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_SPOUT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
        } else if (waitSituation == WAIT_SITUATION.BOLT_WAIT) {
            level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT));
            level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT));
            level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
        } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) {
            level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT));
            level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT));
            level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS));
        } else {
            throw new IllegalArgumentException("Unknown wait situation : " + waitSituation);
        }
    }

    @Override
    public int idle(int idleCounter) throws InterruptedException {
        if (idleCounter < level1Count) {                     // level 1 - no waiting
            ++idleCounter;
        } else if (idleCounter < level1Count + level2Count) { // level 2 - parkNanos(1L)
            ++idleCounter;
            LockSupport.parkNanos(1L);
        } else {                                      // level 3 - longer idling with Thread.sleep()
            Thread.sleep(level3SleepMs);
        }
        return idleCounter;
    }
}
  • Wait strategy is a gradual waitstrategy, which is divided into three level of idling.
  • Level 1 is no idling, return immediately; Enter level 2 after level 1 has passed level1Count
  • Level 2 uses locksupport.parknano (1) and enters level 3 after level 2 has gone through the number of level2Count.
  • Level 3 uses three. sleep (level 3 sleep ms) and jumps out when the wait situation changes.
  • Different WAIT_SITUATION reads different LEVEL1_COUNT, LEVEL2_COUNT, and LEVEL3_SLEEP_MILLIS parameters. For spout, their default values are 0, 0, and 1 respectively. The default values for bolt are 1, 1000 and 1 respectively. For back pressure, the default values are 1, 1000, and 1 respectively

SpoutExecutor.call

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

    @Override
    public Callable<Long> call() throws Exception {
        init(idToTask, idToTaskBase);
        return new Callable<Long>() {
            final int recvqCheckSkipCountMax = getSpoutRecvqCheckSkipCount();
            int recvqCheckSkips = 0;
            int swIdleCount = 0; // counter for spout wait strategy
            int bpIdleCount = 0; // counter for back pressure wait strategy
            int rmspCount = 0;

            @Override
            public Long call() throws Exception {
                int receiveCount = 0;
                if (recvqCheckSkips++ == recvqCheckSkipCountMax) {
                    receiveCount = receiveQueue.consume(SpoutExecutor.this);
                    recvqCheckSkips = 0;
                }
                long currCount = emittedCount.get();
                boolean reachedMaxSpoutPending = (maxSpoutPending != 0) && (pending.size() >= maxSpoutPending);
                boolean isActive = stormActive.get();

                if (!isActive) {
                    inactiveExecute();
                    return 0L;
                }

                if (!lastActive.get()) {
                    lastActive.set(true);
                    activateSpouts();
                }
                boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
                boolean noEmits = true;
                long emptyStretch = 0;

                if (!reachedMaxSpoutPending && pendingEmitsIsEmpty) {
                    for (int j = 0; j < spouts.size(); j++) { // in critical path. don't use iterators.
                        spouts.get(j).nextTuple();
                    }
                    noEmits = (currCount == emittedCount.get());
                    if (noEmits) {
                        emptyEmitStreak.increment();
                    } else {
                        emptyStretch = emptyEmitStreak.get();
                        emptyEmitStreak.set(0);
                    }
                }
                if (reachedMaxSpoutPending) {
                    if (rmspCount == 0) {
                        LOG.debug("Reached max spout pending");
                    }
                    rmspCount++;
                } else {
                    if (rmspCount > 0) {
                        LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount);
                    }
                    rmspCount = 0;
                }

                if (receiveCount > 1) {
                    // continue without idling
                    return 0L;
                }
                if (!pendingEmits.isEmpty()) { // then facing backpressure
                    backPressureWaitStrategy();
                    return 0L;
                }
                bpIdleCount = 0;
                if (noEmits) {
                    spoutWaitStrategy(reachedMaxSpoutPending, emptyStretch);
                    return 0L;
                }
                swIdleCount = 0;
                return 0L;
            }

            private void backPressureWaitStrategy() throws InterruptedException {
                long start = Time.currentTimeMillis();
                if (bpIdleCount == 0) { // check avoids multiple log msgs when in a idle loop
                    LOG.debug("Experiencing Back Pressure from downstream components. Entering BackPressure Wait.");
                }
                bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
                spoutThrottlingMetrics.skippedBackPressureMs(Time.currentTimeMillis() - start);
            }

            private void spoutWaitStrategy(boolean reachedMaxSpoutPending, long emptyStretch) throws InterruptedException {
                emptyEmitStreak.increment();
                long start = Time.currentTimeMillis();
                swIdleCount = spoutWaitStrategy.idle(swIdleCount);
                if (reachedMaxSpoutPending) {
                    spoutThrottlingMetrics.skippedMaxSpoutMs(Time.currentTimeMillis() - start);
                } else {
                    if (emptyStretch > 0) {
                        LOG.debug("Ending Spout Wait Stretch of {}", emptyStretch);
                    }
                }
            }

            // returns true if pendingEmits is empty
            private boolean tryFlushPendingEmits() {
                for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
                    if (executorTransfer.tryTransfer(t, null)) {
                        pendingEmits.poll();
                    } else { // to avoid reordering of emits, stop at first failure
                        return false;
                    }
                }
                return true;
            }
        };
    }
  • Spout maintains the pendingEmits queue, i.e. the queue where emit failed or waited for emit, and also maintains the pending RotatingMap, i.e. the id and data of the tuple waiting for ack.
  • Sport reads topology _ max _ sport _ pending configuration from topology.max.spout.pending, and calculates maxsport = objectreader.getint (topoconf.get (config.topology _ max _ sport _ pending), 0) * idotask.size (), which defaults to null, i.e. maxsport is 0
  • Spout is here! ReachedMaxSportPending & & PendingEmitsSempty, nextTuple is called to send data. Triggers backPressureWaitStrategy; when pendingEmits is not empty; In noEmits ((currCount == emittedCount.get())Triggers spoutWaitStrategy when
  • On each call, record currcount = emittedcount.get (), between calls to nextTuple; If nextTuple is called, emittedCount; will be updated in methods such as emit or emitDirect of SpoutOutputCollectorImpl; After that, use noemissions = (currcount = = emittedcount.get ()) to judge whether there is transmission data.
  • Spout maintains bpIdleCount and swIdleCount, which are respectively used for backpressurewaitstrategy.idle (bpidlecount) and spoutwaitstrategy.idle (swidlecount)

BoltExecutor.call

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java

    @Override
    public Callable<Long> call() throws Exception {
        init(idToTask, idToTaskBase);

        return new Callable<Long>() {
            int bpIdleCount = 0;
            int consumeIdleCounter = 0;
            private final ExitCondition tillNoPendingEmits = () -> pendingEmits.isEmpty();

            @Override
            public Long call() throws Exception {
                boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
                if (pendingEmitsIsEmpty) {
                    if (bpIdleCount != 0) {
                        LOG.debug("Ending Back Pressure Wait stretch : {}", bpIdleCount);
                    }
                    bpIdleCount = 0;
                    int consumeCount = receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits);
                    if (consumeCount == 0) {
                        if (consumeIdleCounter == 0) {
                            LOG.debug("Invoking consume wait strategy");
                        }
                        consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter);
                        if (Thread.interrupted()) {
                            throw new InterruptedException();
                        }
                    } else {
                        if (consumeIdleCounter != 0) {
                            LOG.debug("Ending consume wait stretch : {}", consumeIdleCounter);
                        }
                        consumeIdleCounter = 0;
                    }
                } else {
                    if (bpIdleCount == 0) { // check avoids multiple log msgs when spinning in a idle loop
                        LOG.debug("Experiencing Back Pressure. Entering BackPressure Wait. PendingEmits = {}", pendingEmits.size());
                    }
                    bpIdleCount = backPressureWaitStrategy.idle(bpIdleCount);
                }

                return 0L;
            }

            // returns true if pendingEmits is empty
            private boolean tryFlushPendingEmits() {
                for (AddressedTuple t = pendingEmits.peek(); t != null; t = pendingEmits.peek()) {
                    if (executorTransfer.tryTransfer(t, null)) {
                        pendingEmits.poll();
                    } else { // to avoid reordering of emits, stop at first failure
                        return false;
                    }
                }
                return true;
            }

        };
    }
  • Bolt executor also maintains pendingEmits. When pendingEmits is not empty, BackPressureWaitTrategy. Idle (BPIDLECOUNT) is triggered.
  • When pendingEmits is empty, according to the consumeCount returned by receivequeue.consume (bolt executor.this, tillnopendingemits), if it is 0, it will trigger consumewaitstrategy.idle (consumeidlecounter)
  • Bolt executor maintains bpIdleCount and CONSUMERIDECOUNTER, which are used for BackPressureWaitStrategy. Idle (BPIDLECOUNT) and CONSUMERIDEStrategy. Idle (CONSUMERIDELECOUNTER), respectively.

Summary

  • Spout and bolt both use backPressureWaitStrategy their executor, reading the topology.backpressurewait.strategy parameter (for any producer (spout/bolt/transfer thread) when the downstream Q is full), the implementation class used is org.Apache.storm.policy.waittrategyprogressive, the backpressure strategy used when the recv queue of the downstream component is full; Specifically, the pendingemissions queue is used to judge. in the call method of spout or bolt, every time it is judged that pendingemissions is called tryFlushPendingEmits, the first attempt is made to send data. if the downstream successfully receives, the pendingemissions queue is empty. through this mechanism, the downstream load is dynamically judged to decide whether to trigger backpressure.
  • Spout uses spoutWaitStrategy, which reads the topology.spout.wait.strategy parameter (employed when there is no data to produce), the implementation class used is org.Apache.storm.policy.waittrategyprogressive, which is used when there is no data transmission; Specifically, emittedCount is used to judge
  • The consumeWaitStrategy used by bolt reads the topology.bolt.wait.strategy parameter (employed when there is no data in its receive buffer to process), the implementation class used is org.Apache.storm.policy.waittrategyprogressive, which is used when the receive buffer has no data processing; Specifically, it is judged by using the consumeCount returned by receivequeue.consume (bolt executor.this, tillnopendingemits)
  • Spout is different from Bolts in that spout has an additional reachedMaxSpoutPending parameter in addition to PendingEmitsSempty to determine whether t o continue generating data, while Bolts uses PendingEmitsSempty to determine whether to continue consuming data.
  • The IWaitStrategy is implemented by WaitStrategyProgressive and WaitStrategyPark, which is used when bolt is SystemBolt.

doc