Talk about the pendingTriggers of storemtradidentwindowmanager.

  storm

Order

This article mainly studies the pendingTriggers of StormtrigentWindowManager.

TridentBoltExecutor.finishBatch

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/topology/TridentBoltExecutor.java

    private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {
        boolean success = true;
        try {
            _bolt.finishBatch(tracked.info);
            String stream = COORD_STREAM(tracked.info.batchGroup);
            for(Integer task: tracked.condition.targetTasks) {
                _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));
            }
            if(tracked.delayedAck!=null) {
                _collector.ack(tracked.delayedAck);
                tracked.delayedAck = null;
            }
        } catch(FailedException e) {
            failBatch(tracked, e);
            success = false;
        }
        _batches.remove(tracked.info.batchId.getId());
        return success;
    }
  • The finishBatch method of _bolt is called here. this _bolt has two implementation classes, namely, TridentSpoutExecutor for spout and SubtopologyBolt for normal bolt.

SubtopologyBolt.finishBatch

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/planner/SubtopologyBolt.java

    public void finishBatch(BatchInfo batchInfo) {
        for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
            p.finishBatch((ProcessorContext) batchInfo.state);
        }
    }
  • FinishBatch called a series of TridentProcessor’s finish batch operations.

WindowTridentProcessor.finishBatch

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/WindowTridentProcessor.java

    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        // add tuple to the batch state
        Object state = processorContext.state[tridentContext.getStateIndex()];
        ((List<TridentTuple>) state).add(projection.create(tuple));
    }

   public void finishBatch(ProcessorContext processorContext) {

        Object batchId = processorContext.batchId;
        Object batchTxnId = getBatchTxnId(batchId);

        LOG.debug("Received finishBatch of : [{}] ", batchId);
        // get all the tuples in a batch and add it to trident-window-manager
        List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
        tridentWindowManager.addTuplesBatch(batchId, tuples);

        List<Integer> pendingTriggerIds = null;
        List<String> triggerKeys = new ArrayList<>();
        Iterable<Object> triggerValues = null;

        if (retriedAttempt(batchId)) {
            pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
            if (pendingTriggerIds != null) {
                for (Integer pendingTriggerId : pendingTriggerIds) {
                    triggerKeys.add(triggerKey(pendingTriggerId));
                }
                triggerValues = windowStore.get(triggerKeys);
            }
        }

        // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
        if(triggerValues == null) {
            pendingTriggerIds = new ArrayList<>();
            Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
            LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
            try {
                Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
                List<Object> values = new ArrayList<>();
                StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
                while (pendingTriggersIter.hasNext()) {
                    triggerResult = pendingTriggersIter.next();
                    for (List<Object> aggregatedResult : triggerResult.result) {
                        String triggerKey = triggerKey(triggerResult.id);
                        triggerKeys.add(triggerKey);
                        values.add(aggregatedResult);
                        pendingTriggerIds.add(triggerResult.id);
                    }
                    pendingTriggersIter.remove();
                }
                triggerValues = values;
            } finally {
                // store inprocess triggers of a batch in store for batch retries for any failures
                if (!pendingTriggerIds.isEmpty()) {
                    windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
                }
            }
        }

        collector.setContext(processorContext);
        int i = 0;
        for (Object resultValue : triggerValues) {
            collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
        }
        collector.setContext(null);
    }
  • After all tuple of a batch are acknowledged in bolt, where WindowTridentProcessor is located, the finishBatch operation will be performed.
  • Execute by WindowTridentProcessor, received a tuple and accumulated it in processorContext.state
  • When finishing batch, take out this batch of tuples from processorContext.state, and then call tridentwindowmanager.addtuplesbatch (batch id, tuples)
  • After that, call Trident WindowManager. GetpendingTriggerIds () to get PendingTriggers stored in store, and get triggerValues to be triggered at the same time
  • Finally, the triggerValues are constructed one by one to send out TriggerInfo and resultValue.

StoreBasedTridentWindowManager.addTuplesBatch

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/StoreBasedTridentWindowManager.java

    public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
        LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
        List<WindowsStore.Entry> entries = new ArrayList<>();
        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            entries.add(new WindowsStore.Entry(key+i, tridentTuple.select(inputFields)));
        }

        // tuples should be available in store before they are added to window manager
        windowStore.putAll(entries);

        for (int i = 0; i < tuples.size(); i++) {
            String key = keyOf(batchId);
            TridentTuple tridentTuple = tuples.get(i);
            addToWindowManager(i, key, tridentTuple);
        }

    }

    private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
        TridentTuple actualTuple = null;
        if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
            actualTuple = tridentTuple;
        }
        currentCachedTuplesSize.incrementAndGet();
        windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
    }
  • StoreBasedTridentWindowManager’s addTuplesBatch method puts these tuple into windowStore, and then adds addToWindowManager to windowManager one by one.

WindowManager.add

storm-core-1.2.2-sources.jar! /org/apache/storm/windowing/WindowManager.java

    private final ConcurrentLinkedQueue<Event<T>> queue;

    /**
     * Add an event into the window, with {@link System#currentTimeMillis()} as
     * the tracking ts.
     *
     * @param event the event to add
     */
    public void add(T event) {
        add(event, System.currentTimeMillis());
    }

    /**
     * Add an event into the window, with the given ts as the tracking ts.
     *
     * @param event the event to track
     * @param ts    the timestamp
     */
    public void add(T event, long ts) {
        add(new EventImpl<T>(event, ts));
    }

    /**
     * Tracks a window event
     *
     * @param windowEvent the window event to track
     */
    public void add(Event<T> windowEvent) {
        // watermark events are not added to the queue.
        if (!windowEvent.isWatermark()) {
            queue.add(windowEvent);
        } else {
            LOG.debug("Got watermark event with ts {}", windowEvent.getTimestamp());
        }
        track(windowEvent);
        compactWindow();
    }
  • Add tuple to ConcurrentLinkedQueue

WindowManager.onTrigger

storm-core-1.2.2-sources.jar! /org/apache/storm/windowing/WindowManager.java

   /**
     * The callback invoked by the trigger policy.
     */
    @Override
    public boolean onTrigger() {
        List<Event<T>> windowEvents = null;
        List<T> expired = null;
        try {
            lock.lock();
            /*
             * scan the entire window to handle out of order events in
             * the case of time based windows.
             */
            windowEvents = scanEvents(true);
            expired = new ArrayList<>(expiredEvents);
            expiredEvents.clear();
        } finally {
            lock.unlock();
        }
        List<T> events = new ArrayList<>();
        List<T> newEvents = new ArrayList<>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (!prevWindowEvents.contains(event)) {
                newEvents.add(event.get());
            }
        }
        prevWindowEvents.clear();
        if (!events.isEmpty()) {
            prevWindowEvents.addAll(windowEvents);
            LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size());
            windowLifecycleListener.onActivation(events, newEvents, expired);
        } else {
            LOG.debug("No events in the window, skipping onActivation");
        }
        triggerPolicy.reset();
        return !events.isEmpty();
    }
  • The onTrigger method first calls the scanevents method to obtain windowEvents, then divides it into Events and newEvents, and then calls back the WindowLifeCycle Listener. OnActivation (Events, NewEvents, Expired) method

WindowManager.scanEvents

storm-core-1.2.2-sources.jar! /org/apache/storm/windowing/WindowManager.java

    /**
     * Scan events in the queue, using the expiration policy to check
     * if the event should be evicted or not.
     *
     * @param fullScan if set, will scan the entire queue; if not set, will stop
     *                 as soon as an event not satisfying the expiration policy is found
     * @return the list of events to be processed as a part of the current window
     */
    private List<Event<T>> scanEvents(boolean fullScan) {
        LOG.debug("Scan events, eviction policy {}", evictionPolicy);
        List<T> eventsToExpire = new ArrayList<>();
        List<Event<T>> eventsToProcess = new ArrayList<>();
        try {
            lock.lock();
            Iterator<Event<T>> it = queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = evictionPolicy.evict(windowEvent);
                if (action == EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                } else if (!fullScan || action == STOP) {
                    break;
                } else if (action == PROCESS) {
                    eventsToProcess.add(windowEvent);
                }
            }
            expiredEvents.addAll(eventsToExpire);
        } finally {
            lock.unlock();
        }
        eventsSinceLastExpiry.set(0);
        LOG.debug("[{}] events expired from window.", eventsToExpire.size());
        if (!eventsToExpire.isEmpty()) {
            LOG.debug("invoking windowLifecycleListener.onExpiry");
            windowLifecycleListener.onExpiry(eventsToExpire);
        }
        return eventsToProcess;
    }
  • The scanEvents method obtains events from ConcurrentLinkedQueue, and then determines whether it has expired. it divides it into two categories: expiredEvents and eventtopprocess, and returns the events of eventtopprocess.

TridentWindowLifeCycleListener.onActivation

storm-core-1.2.2-sources.jar! /org/apache/storm/trident/windowing/AbstractTridentWindowManager.java

    /**
     * Listener to reeive any activation/expiry of windowing events and take further action on them.
     */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {

        @Override
        public void onExpiry(List<T> expiredEvents) {
            LOG.debug("onExpiry is invoked");
            onTuplesExpired(expiredEvents);
        }

        @Override
        public void onActivation(List<T> events, List<T> newEvents, List<T> expired) {
            LOG.debug("onActivation is invoked with events size: [{}]", events.size());
            // trigger occurred, create an aggregation and keep them in store
            int currentTriggerId = triggerId.incrementAndGet();
            execAggregatorAndStoreResult(currentTriggerId, events);
        }
    }

    private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) {
        List<TridentTuple> resultTuples = getTridentTuples(tupleEvents);

        // run aggregator to compute the result
        AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector);
        Object state = aggregator.init(currentTriggerId, collector);
        for (TridentTuple resultTuple : resultTuples) {
            aggregator.aggregate(state, resultTuple, collector);
        }
        aggregator.complete(state, collector);

        List<List<Object>> resultantAggregatedValue = collector.values;

        ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1),
                new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue));
        windowStore.putAll(entries);

        pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue));
    }
  • The onActivation method calls execaggregatorAndStoreResult, which calls window’s Aggregator, then saves the result to windowStore, and adds resultantAggregatedValue as TriggerResult to pendingTriggers.

Summary

  • The TridentBoltExecutor where WindowTridentprocessor is located, when it receives the tuple of spout, it calls the execute method of Processor to cache the tuple into the ProcessorContext; After executing a series of processor’s execute methods, the tuple is acknowledged
  • After all tuples of a batch have been acknowledged by the TridentBoltExecutor in which WindowTridentProcessor is located, the checkFinish operation will be triggered, and then the findbatch operation will be executed. the findbatch operation will call a series of findbatch operations of TridentProcessor (For example, windowntridentprocessor-> projectedprocessor-> partitionpersistprocessor-> eachprocessor-> aggregatedprocessor)
  • Windowridentprocessor.finishbatch takes out this batch of tuples from processorContext.state, then calls tridentwindowManager.addtuplesbatch (batch id, tuples), puts the tuples into windowStore, and then adds them into windowmanager’s ConcurrentLinkedQueue; After that, call Trident WindowManager. GetpendingTriggerIds () to obtain PendingTriggers and store them in the Store. At the same time, obtain triggerValues to be triggered, and send triggerValues to construct TriggerInfo and resultValue one by one.
  • The windowManager.onTrigger method is called when the Window operation time window is triggered. It obtains the windowEvent from the windowManager’s Co ncurrentLinkedQueue and then passes it to the TridentWindowClickListener. OnActivation
  • The TridentWindowClickListener. OnActivation method performs the init, aggregate, and complete operations of the window’s aggregator to obtain the aggregation result resultantAggregatedValue, and then puts in the pendingTriggers, thus completing the connection between the window trigger and the WindowTridentProcessor.

doc