Talk about flink’s window Operation

  flink

Order

This article mainly studies flink’s window operation

window

DataStream

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

    public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return windowAll(TumblingProcessingTimeWindows.of(size));
        } else {
            return windowAll(TumblingEventTimeWindows.of(size));
        }
    }

    public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return windowAll(SlidingProcessingTimeWindows.of(size, slide));
        } else {
            return windowAll(SlidingEventTimeWindows.of(size, slide));
        }
    }

    public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
        return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
        return windowAll(GlobalWindows.create())
                .evictor(CountEvictor.of(size))
                .trigger(CountTrigger.of(slide));
    }

    @PublicEvolving
    public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
        return new AllWindowedStream<>(this, assigner);
    }
  • For non-KeyedStream, there are timeWindowAll, countWindowAll, windowwall operations, of which the most important is windowwall operation, whose parallelism is 1, which requires a WindowAssigner parameter and returns AllWindowedStream

KeyedStream

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return window(TumblingProcessingTimeWindows.of(size));
        } else {
            return window(TumblingEventTimeWindows.of(size));
        }
    }

    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return window(SlidingProcessingTimeWindows.of(size, slide));
        } else {
            return window(SlidingEventTimeWindows.of(size, slide));
        }
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
        return window(GlobalWindows.create())
                .evictor(CountEvictor.of(size))
                .trigger(CountTrigger.of(slide));
    }

    @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
        return new WindowedStream<>(this, assigner);
    }
  • For KeyedStream, besides the window related operations inherited from DataStream, it mainly uses timeWindow, countWindow and window operations, of which the most important is window operation. it also needs a WindowAssigner parameter and returns WindowedStream.

WindowedStream

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/WindowedStream.java

@Public
public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream. */
    private final KeyedStream<T, K> input;

    /** The window assigner. */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

    /** The user-specified allowed lateness. */
    private long allowedLateness = 0L;

    /**
     * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
     * dropped.
      */
    private OutputTag<T> lateDataOutputTag;

    @PublicEvolving
    public WindowedStream(KeyedStream<T, K> input,
            WindowAssigner<? super T, W> windowAssigner) {
        this.input = input;
        this.windowAssigner = windowAssigner;
        this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
    }

    @PublicEvolving
    public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
        if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
            throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
        }

        if (windowAssigner instanceof BaseAlignedWindowAssigner) {
            throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with a custom trigger.");
        }

        this.trigger = trigger;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> allowedLateness(Time lateness) {
        final long millis = lateness.toMilliseconds();
        checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

        this.allowedLateness = millis;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
        Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
        this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
        if (windowAssigner instanceof BaseAlignedWindowAssigner) {
            throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName() + " with an Evictor.");
        }
        this.evictor = evictor;
        return this;
    }

    // ------------------------------------------------------------------------
    //  Operations on the keyed windows
    // ------------------------------------------------------------------------

    //......
}
  • WindowedStream has several parameters, among which the constructor requires input and windowAssigner parameters, followed by several optional parameters such as Trigger, Evictor, allowedLateness and OutputTag. In addition, operation function must be set up, mainly ReduceFunction, AggregateFunction, FoldFunction (Abandoned). ProcessWindowFunction
  • WindowAssigner is mainly used to determine how elements are divided into windows. Here are Tumbling EventTimeWindows/Tumbling ProcessingTimeWindows, SlidingEventTimeWindows/SlidingProcessingTimeWindows, EventTimeSessionWindows/ProcessingTimeSessionWindows, GlobalWindows
  • Trigger is used to trigger the launch of the window, Evictor is used to reject elements when launching the window, allowedLateness is used to specify the maximum time allowed for elements to fall behind watermark, and if exceeded, they are discarded (Valid only for event-time window), OutputTag is used to output late data to side output, which can be obtained through the SingleOutPutsTreamoperator. GetSideOutput (Output Tag) method.

AllWindowedStream’s properties/operations are basically similar to WindowedStream, so they will not be expanded in detail here.

Summary

  • Window operation is the core of processing infinite data stream. It divides the data stream into buckets of finite size, and then can perform related operations on these finite data. Flink’s windowAll operations are mainly divided into two categories, one is for KeyedStream and the other is for non-key stream.
  • Window operation mainly has several parameters. WindowAssigner is an essential parameter. There are mainly Tumbling EventTimeWindows/Tumbling ProcessingTimeWindows, SlidingEventTimeWindows/SlidingProcessingTimeWindows, EventTimeSessionWindows/ProcessingTimeSessionWindows and GlobalWindows. In addition, operation function must be set up, mainly ReduceFunction, AggregateFunction, FoldFunction (Abandoned). ProcessWindowFunction
  • Trigger, Victor, allowedLateness and OutputTag are optional parameters. Trigger is used to trigger the launch of the window, Victor is used to eliminate elements when launching the window, allowedLateness is used to specify the maximum time allowed for elements to fall behind watermark, and if exceeded, they are discarded (Valid only for event-time window), OutputTag is used to output late data to side output, which can be obtained through the SingleOutPutsTreamoperator. GetSideOutput (Output Tag) method.

doc