Talk about storm’s WindowedBoltExecutor

  storm

Order

This article mainly studies storm’s WindowedBoltExecutor

WindowedBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

/**
 * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
 */
public class WindowedBoltExecutor implements IRichBolt {
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
    private final IWindowedBolt bolt;
    // package level for unit tests
    transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;
    private transient WindowedOutputCollector windowedOutputCollector;
    private transient WindowLifecycleListener<Tuple> listener;
    private transient WindowManager<Tuple> windowManager;
    private transient int maxLagMs;
    private TimestampExtractor timestampExtractor;
    private transient String lateTupleStream;
    private transient TriggerPolicy<Tuple, ?> triggerPolicy;
    private transient EvictionPolicy<Tuple, ?> evictionPolicy;
    private transient Duration windowLengthDuration;

    public WindowedBoltExecutor(IWindowedBolt bolt) {
        this.bolt = bolt;
        timestampExtractor = bolt.getTimestampExtractor();
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        doPrepare(topoConf, context, collector, new ConcurrentLinkedQueue<>(), false);
    }

    // NOTE: the queue has to be thread safe.
    protected void doPrepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector,
                             Collection<Event<Tuple>> queue, boolean stateful) {
        Objects.requireNonNull(topoConf);
        Objects.requireNonNull(context);
        Objects.requireNonNull(collector);
        Objects.requireNonNull(queue);
        this.windowedOutputCollector = new WindowedOutputCollector(collector);
        bolt.prepare(topoConf, context, windowedOutputCollector);
        this.listener = newWindowLifecycleListener();
        this.windowManager = initWindowManager(listener, topoConf, context, queue, stateful);
        start();
        LOG.info("Initialized window manager {} ", windowManager);
    }

    @Override
    public void execute(Tuple input) {
        if (isTupleTs()) {
            long ts = timestampExtractor.extractTimestamp(input);
            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                windowManager.add(input, ts);
            } else {
                if (lateTupleStream != null) {
                    windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
                } else {
                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
                }
                windowedOutputCollector.ack(input);
            }
        } else {
            windowManager.add(input);
        }
    }

    @Override
    public void cleanup() {
        if (waterMarkEventGenerator != null) {
            waterMarkEventGenerator.shutdown();
        }
        windowManager.shutdown();
        bolt.cleanup();
    }

    // for unit tests
    WindowManager<Tuple> getWindowManager() {
        return windowManager;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String lateTupleStream = (String) getComponentConfiguration().get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
        if (lateTupleStream != null) {
            declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD));
        }
        bolt.declareOutputFields(declarer);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return bolt.getComponentConfiguration();
    }

    //......
}
  • WindowedBoltExecutor implements the IRichBolt interface, initializes windowedOutputCollector, listener, windowManager when prepare, and calls bolt.prepare; ; When cleaning up, cleanup waterMarkEventGenerator, windowManager, bolt; TopologyBuilder wrapped the original IWindowedBolt implementation class once and replaced it with WindowedBoltExecutor when it setBolt.
  • DECLAROUTPUTFIELDS adopts BOLT. DECLAROUTPUTFIELDS (Declarer); GetComponentConfiguration also returns bolt.getComponentConfiguration ();
  • The execute method mainly adds tuples to the windowManager, and immediately ack tuples that are not included in the windowmanager.

WindowedOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

    /**
     * Creates an {@link OutputCollector} wrapper that automatically anchors the tuples to inputTuples while emitting.
     */
    private static class WindowedOutputCollector extends OutputCollector {
        private List<Tuple> inputTuples;

        WindowedOutputCollector(IOutputCollector delegate) {
            super(delegate);
        }

        void setContext(List<Tuple> inputTuples) {
            this.inputTuples = inputTuples;
        }

        @Override
        public List<Integer> emit(String streamId, List<Object> tuple) {
            return emit(streamId, inputTuples, tuple);
        }

        @Override
        public void emitDirect(int taskId, String streamId, List<Object> tuple) {
            emitDirect(taskId, streamId, inputTuples, tuple);
        }
    }
  • WindowedOutputCollector inherits OutputCollector. You can see that the emitDirect method of Emit meter has been rewritten here, and inputTuples is anchor by default.

WindowLifecycleListener

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowLifecycleListener.java

/**
 * A callback for expiry, activation of events tracked by the {@link WindowManager}
 *
 * @param <T> The type of Event in the window (e.g. Tuple).
 */
public interface WindowLifecycleListener<T> {
    /**
     * Called on expiry of events from the window due to {@link EvictionPolicy}
     *
     * @param events the expired events
     */
    void onExpiry(List<T> events);

    /**
     * Called on activation of the window due to the {@link TriggerPolicy}
     *
     * @param events        the list of current events in the window.
     * @param newEvents     the newly added events since last activation.
     * @param expired       the expired events since last activation.
     * @param referenceTime the reference (event or processing) time that resulted in activation
     */
    default void onActivation(List<T> events, List<T> newEvents, List<T> expired, Long referenceTime) {
        throw new UnsupportedOperationException("Not implemented");
    }

    /**
     * Called on activation of the window due to the {@link TriggerPolicy}. This is typically invoked when the windows are persisted in
     * state and is huge to be loaded entirely in memory.
     *
     * @param eventsIt      a supplier of iterator over the list of current events in the window
     * @param newEventsIt   a supplier of iterator over the newly added events since the last ativation
     * @param expiredIt     a supplier of iterator over the expired events since the last activation
     * @param referenceTime the reference (event or processing) time that resulted in activation
     */
    default void onActivation(Supplier<Iterator<T>> eventsIt, Supplier<Iterator<T>> newEventsIt, Supplier<Iterator<T>> expiredIt,
                              Long referenceTime) {
        throw new UnsupportedOperationException("Not implemented");
    }
}
  • WindowLifecycleListener defines several callback methods, namely onExpiry and onActivation
  • They are triggered by EvictionPolicy and TriggerPolicy respectively.

EvictionPolicy

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/EvictionPolicy.java

/**
 * Eviction policy tracks events and decides whether an event should be evicted from the window or not.
 *
 * @param <T> the type of event that is tracked.
 */
public interface EvictionPolicy<T, S> {
    /**
     * Decides if an event should be expired from the window, processed in the current window or kept for later processing.
     *
     * @param event the input event
     * @return the {@link org.apache.storm.windowing.EvictionPolicy.Action} to be taken based on the input event
     */
    Action evict(Event<T> event);

    /**
     * Tracks the event to later decide whether {@link EvictionPolicy#evict(Event)} should evict it or not.
     *
     * @param event the input event to be tracked
     */
    void track(Event<T> event);

    /**
     * Returns the current context that is part of this eviction policy.
     *
     * @return the eviction context
     */
    EvictionContext getContext();

    /**
     * Sets a context in the eviction policy that can be used while evicting the events. E.g. For TimeEvictionPolicy, this could be used to
     * set the reference timestamp.
     *
     * @param context the eviction context
     */
    void setContext(EvictionContext context);

    /**
     * Resets the eviction policy.
     */
    void reset();

    /**
     * Return runtime state to be checkpointed by the framework for restoring the eviction policy in case of failures.
     *
     * @return the state
     */
    S getState();

    /**
     * Restore the eviction policy from the state that was earlier checkpointed by the framework.
     *
     * @param state the state
     */
    void restoreState(S state);

    /**
     * The action to be taken when {@link EvictionPolicy#evict(Event)} is invoked.
     */
    public enum Action {
        /**
         * expire the event and remove it from the queue.
         */
        EXPIRE,
        /**
         * process the event in the current window of events.
         */
        PROCESS,
        /**
         * don't include in the current window but keep the event in the queue for evaluating as a part of future windows.
         */
        KEEP,
        /**
         * stop processing the queue, there cannot be anymore events satisfying the eviction policy.
         */
        STOP
    }
}
  • EvictionPolicy is mainly responsible for tracking the event and then determining whether the event should be removed from the window.
  • EvictionPolicy has several implementation classes: CountEvictionPolicy, TimeEvictionPolicy, WatermarkCountEvictionPolicy, WatermarkTimeEvictionPolicy

TriggerPolicy

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/TriggerPolicy.java

/**
 * Triggers the window calculations based on the policy.
 *
 * @param <T> the type of the event that is tracked
 */
public interface TriggerPolicy<T, S> {
    /**
     * Tracks the event and could use this to invoke the trigger.
     *
     * @param event the input event
     */
    void track(Event<T> event);

    /**
     * resets the trigger policy.
     */
    void reset();

    /**
     * Starts the trigger policy. This can be used during recovery to start the triggers after recovery is complete.
     */
    void start();

    /**
     * Any clean up could be handled here.
     */
    void shutdown();

    /**
     * Return runtime state to be checkpointed by the framework for restoring the trigger policy in case of failures.
     *
     * @return the state
     */
    S getState();

    /**
     * Restore the trigger policy from the state that was earlier checkpointed by the framework.
     *
     * @param state the state
     */
    void restoreState(S state);
}
  • TriggerPolicy is mainly responsible for window calculation.
  • TriggerPolicy has several implementation classes: CountTriggerPolicy, TimeTriggerPolicy, WatermarkCountTriggerPolicy, WatermarkTimeTriggerPolicy

WindowedBoltExecutor.newWindowLifecycleListener

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        return new WindowLifecycleListener<Tuple>() {
            @Override
            public void onExpiry(List<Tuple> tuples) {
                for (Tuple tuple : tuples) {
                    windowedOutputCollector.ack(tuple);
                }
            }

            @Override
            public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
                windowedOutputCollector.setContext(tuples);
                boltExecute(tuples, newTuples, expiredTuples, timestamp);
            }

        };
    }

    protected void boltExecute(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
        bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, getWindowStartTs(timestamp), timestamp));
    }
  • An anonymous WindowLifecycleListener implementation was created here.
  • Acknowledge the tuple one by one in onExpiry, and call boltExecute in activation to construct TupleWindowImpl and pass it to bolt for execution.

WindowedBoltExecutor.initWindowManager

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
                                                   TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {

        WindowManager<Tuple> manager = stateful ?
            new StatefulWindowManager<>(lifecycleListener, queue)
            : new WindowManager<>(lifecycleListener, queue);

        Count windowLengthCount = null;
        Duration slidingIntervalDuration = null;
        Count slidingIntervalCount = null;
        // window length
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
            windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
            windowLengthDuration = new Duration(
                ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
                TimeUnit.MILLISECONDS);
        }
        // sliding interval
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
            slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
            slidingIntervalDuration =
                new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
        } else {
            // default is a sliding window of count 1
            slidingIntervalCount = new Count(1);
        }
        // tuple ts
        if (timestampExtractor != null) {
            // late tuple stream
            lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
            if (lateTupleStream != null) {
                if (!context.getThisStreams().contains(lateTupleStream)) {
                    throw new IllegalArgumentException(
                        "Stream for late tuples must be defined with the builder method withLateTupleStream");
                }
            }
            // max lag
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
            } else {
                maxLagMs = DEFAULT_MAX_LAG_MS;
            }
            // watermark interval
            int watermarkInterval;
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
                watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
            } else {
                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
            }
            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                    maxLagMs, getComponentStreams(context));
        } else {
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
            }
        }
        // validate
        validate(topoConf, windowLengthCount, windowLengthDuration,
                 slidingIntervalCount, slidingIntervalDuration);
        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
        triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
                                         manager, evictionPolicy);
        manager.setEvictionPolicy(evictionPolicy);
        manager.setTriggerPolicy(triggerPolicy);
        return manager;
    }

    private EvictionPolicy<Tuple, ?> getEvictionPolicy(Count windowLengthCount, Duration windowLengthDuration) {
        if (windowLengthCount != null) {
            if (isTupleTs()) {
                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);
            } else {
                return new CountEvictionPolicy<>(windowLengthCount.value);
            }
        } else {
            if (isTupleTs()) {
                return new WatermarkTimeEvictionPolicy<>(windowLengthDuration.value, maxLagMs);
            } else {
                return new TimeEvictionPolicy<>(windowLengthDuration.value);
            }
        }
    }

    private TriggerPolicy<Tuple, ?> getTriggerPolicy(Count slidingIntervalCount, Duration slidingIntervalDuration,
                                                     WindowManager<Tuple> manager, EvictionPolicy<Tuple, ?> evictionPolicy) {
        if (slidingIntervalCount != null) {
            if (isTupleTs()) {
                return new WatermarkCountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy, manager);
            } else {
                return new CountTriggerPolicy<>(slidingIntervalCount.value, manager, evictionPolicy);
            }
        } else {
            if (isTupleTs()) {
                return new WatermarkTimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy, manager);
            } else {
                return new TimeTriggerPolicy<>(slidingIntervalDuration.value, manager, evictionPolicy);
            }
        }
    }
  • For WindowManager, stateful is false, and windowmanager is created here.
  • The DEFAULT_MAX_LAG_MS here is 0, that is, there is no lag, and the default _ watermark _ event _ interval _ ms is 1000, that is, 1 second.
  • Here, the corresponding EvictionPolicy and TriggerPolicy are obtained according to the parameter types specified by windowLength and slidingInterval. For th ose with timestampField configured, the parameter is Duration, and the WatermarkTimeEvictionPolicy and WatermarkTimeTriggerPolicy are created.

WindowManager

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java

/**
 * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks on expiry of events or activation of the window due to
 * {@link TriggerPolicy}.
 *
 * @param <T> the type of event in the window.
 */
public class WindowManager<T> implements TriggerHandler {

    protected final Collection<Event<T>> queue;

    private final AtomicInteger eventsSinceLastExpiry;

    //......
    /**
     * Add an event into the window, with the given ts as the tracking ts.
     *
     * @param event the event to track
     * @param ts    the timestamp
     */
    public void add(T event, long ts) {
        add(new EventImpl<T>(event, ts));
    }

    /**
     * Tracks a window event
     *
     * @param windowEvent the window event to track
     */
    public void add(Event<T> windowEvent) {
        // watermark events are not added to the queue.
        if (!windowEvent.isWatermark()) {
            queue.add(windowEvent);
        } else {
            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
        }
        track(windowEvent);
        compactWindow();
    }

    /**
     * feed the event to the eviction and trigger policies for bookkeeping and optionally firing the trigger.
     */
    private void track(Event<T> windowEvent) {
        evictionPolicy.track(windowEvent);
        triggerPolicy.track(windowEvent);
    }

    /**
     * expires events that fall out of the window every EXPIRE_EVENTS_THRESHOLD so that the window does not grow too big.
     */
    protected void compactWindow() {
        if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
            scanEvents(false);
        }
    }

    /**
     * Scan events in the queue, using the expiration policy to check if the event should be evicted or not.
     *
     * @param fullScan if set, will scan the entire queue; if not set, will stop as soon as an event not satisfying the expiration policy is
     *                 found
     * @return the list of events to be processed as a part of the current window
     */
    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
        List<T> eventsToExpire = new ArrayList<>();
        List<Event<T>> eventsToProcess = new ArrayList<>();
        try {
            lock.lock();
            Iterator<Event<T>> it = queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = evictionPolicy.evict(windowEvent);
                if (action == EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                } else if (!fullScan || action == STOP) {
                    break;
                } else if (action == PROCESS) {
                    eventsToProcess.add(windowEvent);
                }
            }
            expiredEvents.addAll(eventsToExpire);
        } finally {
            lock.unlock();
        }
        eventsSinceLastExpiry.set(0);
        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
        if (!eventsToExpire.isEmpty()) {
            LOG.debug("invoking windowLifecycleListener.onExpiry");
            windowLifecycleListener.onExpiry(eventsToExpire);
        }
        return eventsToProcess;
    }

    //......
}
  • The execute of WindowedBoltExecutor is mainly to add tuple to windowManager.
  • IsWatermark of EventImpl returns false, which mainly performs track and compactWindow operations.
  • Track is mainly entrusted to evictionPolicy and triggerPolicy to track. When events exceeds the specified threshold, compactWindow will trigger scanevents. If it is not fullScan, it will jump out of the traversal when it detects a non-expired Event. Then, it is detected whether the eventtoExpire is empty, and if so, it triggers WindowLifeCycle Listener. Onexpiry (eventtoExpire);

WaterMarkEventGenerator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java

/**
 * Tracks tuples across input streams and periodically emits watermark events. Watermark event timestamp is the minimum of the latest tuple
 * timestamps across all the input streams (minus the lag). Once a watermark event is emitted any tuple coming with an earlier timestamp can
 * be considered as late events.
 */
public class WaterMarkEventGenerator<T> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(WaterMarkEventGenerator.class);
    private final WindowManager<T> windowManager;
    private final int eventTsLag;
    private final Set<GlobalStreamId> inputStreams;
    private final Map<GlobalStreamId, Long> streamToTs;
    private final ScheduledExecutorService executorService;
    private final int interval;
    private ScheduledFuture<?> executorFuture;
    private volatile long lastWaterMarkTs;

    //......

    public void start() {
        this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
    }

    @Override
    public void run() {
        try {
            long waterMarkTs = computeWaterMarkTs();
            if (waterMarkTs > lastWaterMarkTs) {
                this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
                lastWaterMarkTs = waterMarkTs;
            }
        } catch (Throwable th) {
            LOG.error("Failed while processing watermark event ", th);
            throw th;
        }
    }
}
  • WindowedBoltExecutor will call WaterMarkEventGenerator’s start method when it starts.
  • This method schedules the task of WaterMarkEventGenerator every watermarkInterval time.
  • The run method is to calculate watermark (The minimum value of this batch of data -lag), when it is greater than lastWaterMarkTs, update lastWaterMarkTs and add a WaterMarkEvent (IsWatermark for this event is true)
  • Window manager.add (new watermarkevents < > (watermarks)) triggers trigger policy.track (windowevent) and compactWindow operations.

WatermarkTimeTriggerPolicy.track

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java

    @Override
    public void track(Event<T> event) {
        if (started && event.isWatermark()) {
            handleWaterMarkEvent(event);
        }
    }

    /**
     * Invokes the trigger all pending windows up to the watermark timestamp. The end ts of the window is set in the eviction policy context
     * so that the events falling within that window can be processed.
     */
    private void handleWaterMarkEvent(Event<T> event) {
        long watermarkTs = event.getTimestamp();
        long windowEndTs = nextWindowEndTs;
        LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
        while (windowEndTs <= watermarkTs) {
            long currentCount = windowManager.getEventCount(windowEndTs);
            evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
            if (handler.onTrigger()) {
                windowEndTs += slidingIntervalMs;
            } else {
                /*
                 * No events were found in the previous window interval.
                 * Scan through the events in the queue to find the next
                 * window intervals based on event ts.
                 */
                long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
                LOG.debug("Next aligned window end ts {}", ts);
                if (ts == Long.MAX_VALUE) {
                    LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs);
                    break;
                }
                windowEndTs = ts;
            }
        }
        nextWindowEndTs = windowEndTs;
    }

    /**
     * Computes the next window by scanning the events in the window and finds the next aligned window between the startTs and endTs. Return
     * the end ts of the next aligned window, i.e. the ts when the window should fire.
     *
     * @param startTs the start timestamp (excluding)
     * @param endTs   the end timestamp (including)
     * @return the aligned window end ts for the next window or Long.MAX_VALUE if there are no more events to be processed.
     */
    private long getNextAlignedWindowTs(long startTs, long endTs) {
        long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
        if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
            return nextTs;
        }
        return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
    }
  • HandleWaterMarkEvent triggers the handler.onTrigger () method

WindowManager.onTrigger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WindowManager.java

    /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired, evictionPolicy.getContext().getReferenceTime());
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • The onTrigger method mainly calculates three types of data: events, expiredEvents, and newEvents.
  • When events is not empty, WindowClickListener. OnActivation is triggered, that is, the execute method of bolt is called.

Summary

  • WindowedBoltExecutor implements the IRichBolt interface, which is a bolt. When TopologyBuilder setBolt, it packs the user’s IWindowedBolt implementation class once and replaces it with WindowedBoltExecutor, which modifies the execute method. For the call windowManager.add added into windows, the discarded one will be ack, while the real bolt execute operation will need to wait for the window to trigger.
  • WindowLifecycleListener has two callback operations, one is onExpiry triggered by EvictionPolicy and the other is onActivation triggered by TriggerPolicy.
  • Since window Length and slidingInterval parameters of Window have two dimensions of Duration and Count, Evictionpolicy and TriggerPolicy also have these two dimensions, plus watermark attribute, so each Policy has four implementation classes respectively. EvictionPolicy has several implementation classes: CountEvictionPolicy, TimeEvictionPolicy, WatermarkCountEvictionPolicy, WatermarkTimeEvictionPolicy; ; TriggerPolicy has several implementation classes: CountTriggerPolicy, TimeTriggerPolicy, WatermarkCountTriggerPolicy, WatermarkTimeTriggerPolicy
  • In addition to saving the tuple, windowManager.add also calls the track operation of two types of trigger, and then performs the compactWindow operation. The Tracks of WatermarkTimeEvictionPolicy have no operation at present, while the Tracks method of WatermarkTimeTriggerPolicy will trigger the window operation when the event is a WaterMarkEvent, calling the onTrigger method of WindowManager. Then the data of the window is filtered out, and then the windowlivecycle listener. on activation operation is triggered, and finally the windowedBolt’s execute method is triggered.
  • WindowManager’s onTrigger method and add method both call scanEvents, with the difference that the former is fullScan and the latter is not; ScanEvents will call evictionPolicy.evict to determine whether the tuple should be rejected, and then trigger WindowClickListener. Onexpiry operation, which will ack the tuple, that is, expired tuples will automatically ack (In theory, all tuple will expire and will be automatically acknowledged, thus requiring config.topology _ message _ timeout _ secs to be greater than windowLength+slidingInterval to avoid being considered timeout before ack)
  • WindowedBoltExecutor will start the WaterMarkEventGenerator when it starts. It will register a timed task and calculate the watermark (The minimum value of this batch of data -lag), when it is greater than lastWaterMarkTs, update lastWaterMarkTs and add a WaterMarkEvent (IsWatermark for this event is true), the entire WindowManager’s onTrigger method (That is, windowlifyclelListener. OnActivation operation) is triggered by here
  • Regarding ack, if the windowedBoltExecutor.execute method does not configure config.topology _ bolts _ late _ tuple _ stream for those that fail to enter the Window queue, then ACK immediately; Acknowledge when tuple expires; WindowedFulleruses WindowedBoltExecutor, which inherits OutputCollector and performs anchor operation on input tuples.

doc