Talk about flink’s consecutive windowed operations

  flink

Order

This article mainly studies flink’s consecutive windowed operations.

Example

DataStream<Integer> input = ...;

DataStream<Integer> resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction());
  • This example firstly partition the keys according to the specified window, then counts the keys according to the specified window, and then performs the windowAll operation on the dataStream. the time WindowAssigner is the same as the previous one, thus achieving the effect of partitioning and summarizing first and then summarizing globally in the same time window (Problems like top-k elements can be solved.)

TimestampsAndPeriodicWatermarksOperator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java

public class TimestampsAndPeriodicWatermarksOperator<T>
        extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
        implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

    private static final long serialVersionUID = 1L;

    private transient long watermarkInterval;

    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
        super(assigner);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();

        currentWatermark = Long.MIN_VALUE;
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

        if (watermarkInterval > 0) {
            long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + watermarkInterval, this);
        }
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

        output.collect(element.replace(element.getValue(), newTimestamp));
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark);
        }

        long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }

    /**
     * Override the base implementation to completely ignore watermarks propagated from
     * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
     * watermarks from here).
     */
    @Override
    public void processWatermark(Watermark mark) throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
            currentWatermark = Long.MAX_VALUE;
            output.emitWatermark(mark);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();

        // emit a final watermark
        Watermark newWatermark = userFunction.getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark);
        }
    }
}
  • Assuming that assignTimestampsAndWatermarks uses parameters of type AssignerWithPeriodicWatermarks, timestampsandperiodicwaremarkoper is created; When it open, it registers a delayed task according to the specified watermarkInterval.
  • The delayed task will call back the onProcessingTime method, while onProcessingTime will call the getCurrentwatermark method of AssignerWithPeriodicWatermarks here to obtain Watermark, and then re-register the new delayed task. The delay time is GetProcessingTimeService (). GetCurrentProcessingTime ()+WatermarkInterval; WatermarkInterval here is the value set by env.getconfig (). setautowatermarkinterval
  • The getCurrentWatermark method of AssignerWithPeriodicWatermarks not only registers delayed tasks to achieve the effect of constant timing, but also emits watermarks under the condition that the new watermark value is greater than currentWatermark.

SystemProcessingTimeService

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

public class SystemProcessingTimeService extends ProcessingTimeService {

    private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);

    private static final int STATUS_ALIVE = 0;
    private static final int STATUS_QUIESCED = 1;
    private static final int STATUS_SHUTDOWN = 2;

    // ------------------------------------------------------------------------

    /** The containing task that owns this time service provider. */
    private final AsyncExceptionHandler task;

    /** The lock that timers acquire upon triggering. */
    private final Object checkpointLock;

    /** The executor service that schedules and calls the triggers of this task. */
    private final ScheduledThreadPoolExecutor timerService;

    private final AtomicInteger status;

    public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
        this(failureHandler, checkpointLock, null);
    }

    public SystemProcessingTimeService(
            AsyncExceptionHandler task,
            Object checkpointLock,
            ThreadFactory threadFactory) {

        this.task = checkNotNull(task);
        this.checkpointLock = checkNotNull(checkpointLock);

        this.status = new AtomicInteger(STATUS_ALIVE);

        if (threadFactory == null) {
            this.timerService = new ScheduledThreadPoolExecutor(1);
        } else {
            this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
        }

        // tasks should be removed if the future is canceled
        this.timerService.setRemoveOnCancelPolicy(true);

        // make sure shutdown removes all pending tasks
        this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    }

    @Override
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }

    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {

        // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
        // T says we won't see elements in the future with a timestamp smaller or equal to T.
        // With processing time, we therefore need to delay firing the timer by one ms.
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.schedule(
                    new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

    //......
}
  • The registerTimer method of SystemProcessingTimeService registers a delayed task TriggerTask; according to the specified timestamp; TimerService is the ScheduledThreadPoolExecutor; that comes with JDK. The run method of TriggerTask triggers ProcessingTimeCallback (This is TimestampSandPeriodicWatermarkOperator.The onProcessingTime method of

WindowOperator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

    //......
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {

            //......

        } else {
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }

        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    /**
     * Emits the contents of the given window using the {@link InternalWindowFunction}.
     */
    @SuppressWarnings("unchecked")
    private void emitWindowContents(W window, ACC contents) throws Exception {
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
        processContext.window = window;
        userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
    }

    //......
}
  • WindowOperator’s processElement method adds the element to windowState, here HeapAggregatingState, which is accumulated in memory and then calls the triggerContext.onElement method (It uses the trigger.onElement method, and here the trigger is EventTimeTrigger.) Get TriggerResult, if fire is required, emitWindowContents will be triggered, if purge is required, windowState; will be cleared; EmitWindowContents calls userFunction.process to perform user-defined window operations.

EventTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}
  • The onElement method of EventTimeTrigger determines that if window.maxtimestamp () < = CTX.getcurrenttimermark (), it will return TriggerResult.FIRE, informing the WindowOperator that it can emitWindowContents.

Summary

  • Flink supports consecutive windowed operations. for example, first partition the keys according to the specified window, then count the keys according to the specified window, and then perform windowAll operation on the dataStream. the time WindowAssigner is the same as the previous one. this can achieve the effect of partition summary before global summary in the same time window (Problems like top-k elements can be solved.)
  • AssignerWithPeriodicwatermark;s or Assigners WithpunctuatedWarmarks have two functions: one is to extract timestamp from element as eventTime, and the other is to launch Watermark. Since the element does not necessarily arrive strictly according to the eventTime time, there may be disorder, so the function of watermark is to restrict the late data from entering the window and prevent the window from waiting indefinitely for the element that may belong to the window, that is, to inform the window that all elements whose eventTime is less than or equal to the watermark can be considered to have arrived (The window can judge whether the window can be closed or not and then start to perform relevant operations on the window data according to the time range set by the window itself by means of trigger.); For consecutive windowed operations, upstream watermark will be forward to downstream operations.
  • The function of Trigger is to inform WindowOperator when to close the window and start to perform relevant operations on the window data (In the case of returning TriggerResult.FIRE), for EventTimeTrigger, the judgment logic of its onElement method is related to watermark. If Window.maxTimeStamp () < = CTX.getCurrentTimeMark (), TriggerResult.FIRE will be returned

doc