Talk about storm’s window trigger

  storm

Order

This article mainly studies storm’s window trigger

WindowTridentProcessor.prepare

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/WindowTridentProcessor.java

    public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {
        this.topologyContext = context;
        List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
        if (parents.size() != 1) {
            throw new RuntimeException("Aggregation related operation can only have one parent");
        }

        Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf);

        this.tridentContext = tridentContext;
        collector = new FreshCollector(tridentContext);
        projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);

        windowStore = windowStoreFactory.create(stormConf);
        windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
        windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);

        tridentWindowManager = storeTuplesInStore ?
                new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields)
                : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector());

        tridentWindowManager.prepare();
    }
  • Prepare () is called here

AbstractTridentWindowManager.prepare

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore,
                                        Aggregator aggregator, BatchOutputCollector delegateCollector) {
        this.windowTaskId = windowTaskId;
        this.windowStore = windowStore;
        this.aggregator = aggregator;
        this.delegateCollector = delegateCollector;

        windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId;

        windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());

        WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
        EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
        windowManager.setEvictionPolicy(evictionPolicy);
        triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy);
        windowManager.setTriggerPolicy(triggerPolicy);
    }

    public void prepare() {
        preInitialize();

        initialize();

        postInitialize();
    }

    private void postInitialize() {
        // start trigger once the initialization is done.
        triggerPolicy.start();
    }
  • AbstractTridentWindowManager calls WindowStrategy. GettriggerPolicy; to get TriggerPolicy in the constructor. The prepare method called postInitialize, which triggered triggerPolicy.start ()

SlidingDurationWindowStrategy.getTriggerPolicy

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java

    /**
     * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration.
     *
     * @param triggerHandler
     * @param evictionPolicy
     * @return
     */
    @Override
    public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) {
        return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy);
    }
  • Take SlidingDurationWindowStrategy as an example. TimeTriggerPolicy is created here, and its duration is windowConfig.getSlidingLength (), while triggerHandler is WindowManager.

TimeTriggerPolicy.start

storm-core-1.2.2-sources.jar! /org/apache/storm/windowing/TimeTriggerPolicy.java

    public void start() {
        executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
    }

   private Runnable newTriggerTask() {
        return new Runnable() {
            @Override
            public void run() {
                // do not process current timestamp since tuples might arrive while the trigger is executing
                long now = System.currentTimeMillis() - 1;
                try {
                    /*
                     * set the current timestamp as the reference time for the eviction policy
                     * to evict the events
                     */
                    if (evictionPolicy != null) {
                        evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
                    }
                    handler.onTrigger();
                } catch (Throwable th) {
                    LOG.error("handler.onTrigger failed ", th);
                    /*
                     * propagate it so that task gets canceled and the exception
                     * can be retrieved from executorFuture.get()
                     */
                    throw th;
                }
            }
        };
    }
  • The start method registers a scheduled task that is triggered every duration (windowConfig.getSlidingLength()); The run method is to trigger handler.onTrigger (), that is, WindowManager.onTrigger ()

WindowManager.onTrigger

storm-core-1.2.2-sources.jar! /org/apache/storm/windowing/WindowManager.java

    /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • Here, windowLifecycleListener. ON ACTIVATION (Events, New Events, Expired) is called, and WindowLiFeCYCLELISTENER is TridentWindowLifeCycleListener of AbstractTridentWindowManager.

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    /**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

   private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
  • The method of activation is mainly execAggregatorAndStoreResult.
  • ExecAggregatorAndStoreResult calls the init, aggregate, and complete methods of aggregator in turn.
  • Finally, put the TriggerResult into pendingTriggers.

Summary

  • Storm registered the timed task TriggerTask when TimeTriggerPolicy.start. Take SlidingDurationWindowStrategy as an example, its scheduling interval is windowConfig.getSlidingLength ()
  • TriggerTask periodically triggers the WindowManager.onTrigger method, which calls back WindowClickListener.onActivation.
  • AbstractTridentWindowManager provides TridentWindowLifeCycleListener, whose onActivation is mainly to call execAggregatorAndStoreResult;; The execaggregatorAndStoreResult method mainly completes a series of calls to the Aggregator, first calling the init method, then traversing the resultTuples to call the aggregate method one by one, and finally the complete method (From here, we can clearly see the calling logic and sequence of each method of the Aggregator interface.)

doc