Talk about storm’s WindowedBolt

  storm

Order

This article mainly studies storm’s WindowedBolt.

Example

    @Test
    public void testSlidingTupleTsTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("integer", new RandomIntegerSpout(), 1);
        BaseWindowedBolt baseWindowedBolt = new SlidingWindowSumBolt()
                //windowLength , slidingInterval
                .withWindow(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
                //通过withTimestampField指定tuple的某个字段作为这个tuple的timestamp
                .withTimestampField("timestamp")
                //输入流中最新的元组时间戳的最小值减去Lag值=watermark,用于指定触发watermark的的interval,默认为1秒
                //当watermark被触发的时候,tuple timestamp比watermark早的window将被计算
                .withWatermarkInterval(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS))
                //withLag用于处理乱序的数据,当接收到的tuple的timestamp小于等lastWaterMarkTs(`取这批watermark的最大值`),则会被丢弃
                .withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS));
        builder.setBolt("slidingSum", baseWindowedBolt, 1).shuffleGrouping("integer");
        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingSum");
        SubmitHelper.submitRemote("slideWindowTopology",builder.createTopology());
    }
  • Here, withWindow, withTimestampField, withWatermarkInterval, withLag are mainly set up.
  • SlidingWindowSumBolt
public class SlidingWindowSumBolt extends BaseWindowedBolt {
    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);

    private int sum = 0;
    private OutputCollector collector;

    @Override
    public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        /*
         * The inputWindow gives a view of
         * (a) all the events in the window
         * (b) events that expired since last activation of the window
         * (c) events that newly arrived since last activation of the window
         */
        List<Tuple> tuplesInWindow = inputWindow.get();
        List<Tuple> newTuples = inputWindow.getNew();
        List<Tuple> expiredTuples = inputWindow.getExpired();

        LOG.debug("Events in current window: " + tuplesInWindow.size());
        /*
         * Instead of iterating over all the tuples in the window to compute
         * the sum, the values for the new events are added and old events are
         * subtracted. Similar optimizations might be possible in other
         * windowing computations.
         */
        for (Tuple tuple : newTuples) {
            sum += (int) tuple.getValue(0);
        }
        for (Tuple tuple : expiredTuples) {
            sum -= (int) tuple.getValue(0);
        }
        collector.emit(new Values(sum));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sum"));
    }
}
  • TupleWindow can obtain three types of values: one is all data in the current window, the other is newly arrived data since the last window, and the other is expired data

WindowedBolt

IWindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

/**
 * A bolt abstraction for supporting time and count based sliding & tumbling windows.
 */
public interface IWindowedBolt extends IComponent {
    /**
     * This is similar to the {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except that while emitting,
     * the tuples are automatically anchored to the tuples in the inputWindow.
     */
    void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);

    /**
     * Process the tuple window and optionally emit new tuples based on the tuples in the input window.
     */
    void execute(TupleWindow inputWindow);

    void cleanup();

    /**
     * Return a {@link TimestampExtractor} for extracting timestamps from a tuple for event time based processing, or null for processing
     * time.
     *
     * @return the timestamp extractor
     */
    TimestampExtractor getTimestampExtractor();
}
  • IwindowedBolt is stateless, that is, Window’s data is stored in memory.
  • The IWindowedBolt interface has an abstract implementation class BaseWindowedBolt, and its subclasses are BaseStatefulWindowedBolt and JoinBolt.

IStatefulWindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IStatefulWindowedBolt.java

/**
 * A windowed bolt abstraction for supporting windowing operation with state.
 */
public interface IStatefulWindowedBolt<T extends State> extends IStatefulComponent<T>, IWindowedBolt {
    /**
     * If the stateful windowed bolt should have its windows persisted in state and maintain a subset of events in memory.
     * <p>
     * The default is to keep all the window events in memory.
     * </p>
     *
     * @return true if the windows should be persisted
     */
    default boolean isPersistent() {
        return false;
    }

    /**
     * The maximum number of window events to keep in memory.
     */
    default long maxEventsInMemory() {
        return 1_000_000L; // default
    }
}
  • No method is defined in version 1.2.2 IStatefulWindowedBolt, and version 2.0.0 defines two default methods, one is isPersistent and the other is maxEventsInMemory.
  • IsPersistent decides whether to create PersistentWindowedBoltExecutor or StatefulWindowedBoltExecutor
  • MaxEventsInMemory determines how much data WindowState holds in memory, and the rest moves to KeyValueState (HBaseKeyValueState、InMemoryKeyValueState、RedisKeyValueStateIn
  • IStatefulWindowedBolt interface has an abstract implementation class BaseStatefulWindowedBolt

WithWindow and withTumblingWindow

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java

    /**
     * Tuple count based sliding window configuration.
     *
     * @param windowLength    the number of tuples in the window
     * @param slidingInterval the number of tuples after which the window slides
     */
    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * Time duration based sliding window configuration.
     *
     * @param windowLength    the time duration of the window
     * @param slidingInterval the time duration after which the window slides
     */
    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * A time duration based tumbling window.
     *
     * @param duration the time duration after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Duration duration) {
        return withWindowLength(duration).withSlidingInterval(duration);
    }

    /**
     * A count based tumbling window.
     *
     * @param count the number of tuples after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Count count) {
        return withWindowLength(count).withSlidingInterval(count);
    }
  • BaseWindowedBolt abstract class defines many withWindow methods, which mainly define windowLength and slidingIntervals parameters, and the parameters have two dimensions, one is Duration and the other is Count
  • WithWindow is a sliding window, while withTumblingWindow is a tumbling window.
  • From the method definition, we can see that the windowLength and slidingInterval parameter values of withTumblingWindow are the same.

WindowedBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

public class WindowedBoltExecutor implements IRichBolt {
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag

    //......

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
                                                   TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {

        WindowManager<Tuple> manager = stateful ?
            new StatefulWindowManager<>(lifecycleListener, queue)
            : new WindowManager<>(lifecycleListener, queue);

        Count windowLengthCount = null;
        Duration slidingIntervalDuration = null;
        Count slidingIntervalCount = null;
        // window length
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
            windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
            windowLengthDuration = new Duration(
                ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
                TimeUnit.MILLISECONDS);
        }
        // sliding interval
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
            slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
            slidingIntervalDuration =
                new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
        } else {
            // default is a sliding window of count 1
            slidingIntervalCount = new Count(1);
        }
        // tuple ts
        if (timestampExtractor != null) {
            // late tuple stream
            lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
            if (lateTupleStream != null) {
                if (!context.getThisStreams().contains(lateTupleStream)) {
                    throw new IllegalArgumentException(
                        "Stream for late tuples must be defined with the builder method withLateTupleStream");
                }
            }
            // max lag
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
            } else {
                maxLagMs = DEFAULT_MAX_LAG_MS;
            }
            // watermark interval
            int watermarkInterval;
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
                watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
            } else {
                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
            }
            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                    maxLagMs, getComponentStreams(context));
        } else {
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
            }
        }
        // validate
        validate(topoConf, windowLengthCount, windowLengthDuration,
                 slidingIntervalCount, slidingIntervalDuration);
        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
        triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
                                         manager, evictionPolicy);
        manager.setEvictionPolicy(evictionPolicy);
        manager.setTriggerPolicy(triggerPolicy);
        return manager;
    }

    @Override
    public void execute(Tuple input) {
        if (isTupleTs()) {
            long ts = timestampExtractor.extractTimestamp(input);
            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                windowManager.add(input, ts);
            } else {
                if (lateTupleStream != null) {
                    windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
                } else {
                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
                }
                windowedOutputCollector.ack(input);
            }
        } else {
            windowManager.add(input);
        }
    }
  • InitWindowManager reads the value of maxlags, which defaults to 0, that is, there is no Lag, and then the maxLags parameter is passed in when the WaterMarkEventGenerator is created.
  • If the waterMarkEventGenerator.track method returns false and config.topology _ bolts _ late _ tuple _ stream is not configured, log will be printed with the format received a late tuple {} with ts {}.

WaterMarkEventGenerator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java

public class WaterMarkEventGenerator<T> implements Runnable {

    /**
     * Creates a new WatermarkEventGenerator.
     *
     * @param windowManager The window manager this generator will submit watermark events to
     * @param intervalMs    The generator will check if it should generate a watermark event with this interval
     * @param eventTsLagMs  The max allowed lag behind the last watermark event before an event is considered late
     * @param inputStreams  The input streams this generator is expected to handle
     */
    public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs,
                                   int eventTsLagMs, Set<GlobalStreamId> inputStreams) {
        this.windowManager = windowManager;
        streamToTs = new ConcurrentHashMap<>();

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("watermark-event-generator-%d")
            .setDaemon(true)
            .build();
        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);

        this.interval = intervalMs;
        this.eventTsLag = eventTsLagMs;
        this.inputStreams = inputStreams;
    }

    public void start() {
        this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
    }

    //......

    /**
     * Tracks the timestamp of the event in the stream, returns true if the event can be considered for processing or false if its a late
     * event.
     */
    public boolean track(GlobalStreamId stream, long ts) {
        Long currentVal = streamToTs.get(stream);
        if (currentVal == null || ts > currentVal) {
            streamToTs.put(stream, ts);
        }
        checkFailures();
        return ts >= lastWaterMarkTs;
    }

    @Override
    public void run() {
        try {
            long waterMarkTs = computeWaterMarkTs();
            if (waterMarkTs > lastWaterMarkTs) {
                this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
                lastWaterMarkTs = waterMarkTs;
            }
        } catch (Throwable th) {
            LOG.error("Failed while processing watermark event ", th);
            throw th;
        }
    }

    /**
     * Computes the min ts across all streams.
     */
    private long computeWaterMarkTs() {
        long ts = 0;
        // only if some data has arrived on each input stream
        if (streamToTs.size() >= inputStreams.size()) {
            ts = Long.MAX_VALUE;
            for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
                ts = Math.min(ts, entry.getValue());
            }
        }
        return ts - eventTsLag;
    }
}
  • The track method judges whether the tuple needs to be processed or not according to the timestamp of the tuple and lastWaterMarkTs.
  • LastWaterMarkTs is updated in WaterMarkEventGenerator’s run method. The computeWaterMarkTs method first calculates the minimum timestamp of this batch of tuple in streamToTs, and then subtracts eventTsLag, which is the waterMarkTs value.
  • If the watermarks are greater than lastwatermarks, then update, that is to say, the run method of the WaterMarkEventGenerator continuously calculates the watermarks, and then ensure that lastwatermarks takes the maximum value of the watermarks.
  • The WaterMarkEventGenerator triggers a scheduled task in the start method at exactly the time interval of watermarkInterval, that is, the run method is executed every watermarkInterval time.

Summary

  • Storm’s WindowedBolts are divided into IWindowedBolt and IStatefulWindowedBolt. One is stateless and the other is stateful
  • Window has two important parameters, one is windowLength and the other is slidingInterval. They have two dimensions, one is Duration and the other is Count.
  • The windowLength and slidingInterval parameter values set by the withTumblingWindow method of BaseWindowedBolt are the same. That is, tumbling window is a special sliding window. The two parameter values are the same, that is, window will not overlap.
  • The WaterMarkEventGenerator triggers a scheduling task to calculate the waterMarkTs (The minimum value of the latest tuple timestamp in the input stream minus the Lag value), and then updates lastWaterMarkTs if it is greater than the lastWaterMarkTs value
  • The WaterMarkEventGenerator.track method is used to calculate whether the tuple should be processed. If the timestamp of the tuple is less than lastWat erMarkTs, false is returned. If config.topology _ bolts _ late _ tuple _ stream is configured, it will be sent to the stream, otherwise log will be printed.

doc