Talk about flink’s Sliding Window

  flink

Order

This article mainly studies flink’s Sliding Window

SlidingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java

@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private final long size;

    private final long slide;

    private final long offset;

    protected SlidingEventTimeWindows(long size, long slide, long offset) {
        if (offset < 0 || offset >= slide || size <= 0) {
            throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
        }

        this.size = size;
        this.slide = slide;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                start > timestamp - size;
                start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

    public long getSize() {
        return size;
    }

    public long getSlide() {
        return slide;
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
    }

    public static SlidingEventTimeWindows of(Time size, Time slide) {
        return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }

    public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
        return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
            offset.toMilliseconds() % slide.toMilliseconds());
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }
}
  • SlidingEventTimeWindow; s inherits Window, where the element type is Object and the window type is TimeWindow; It has three parameters, one is size, one is slide, and one is offset, where offset must be greater than or equal to 0, offset must be greater than slide, and size must be greater than 0
  • The assignWindows method uses slide as the size to calculate lastStart through timewindow.getwindowstarttwithoffset (timestamp, offset, slide), and then calculates TimeWindow(start, start+size); ) one by one with start+size > timestamp as the loop condition. The getDefaultTrigger method returns EventTimeTrigger; ; The getWindowSerializer method returned timewindow.serializer (); IsEventTime returned true.
  • SlidingEventTimeWindows provides a method of static factories, which can specify the size, slide and offset parameters. It converts the incoming offset parameters into milliseconds and then takes the remainder with slide.toMilliseconds () as the final offset value

SlidingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private final long size;

    private final long offset;

    private final long slide;

    private SlidingProcessingTimeWindows(long size, long slide, long offset) {
        if (offset < 0 || offset >= slide || size <= 0) {
            throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
        }

        this.size = size;
        this.slide = slide;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        timestamp = context.getCurrentProcessingTime();
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart;
            start > timestamp - size;
            start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    }

    public long getSize() {
        return size;
    }

    public long getSlide() {
        return slide;
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
    }

    public static SlidingProcessingTimeWindows of(Time size, Time slide) {
        return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }

    public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
        return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
            offset.toMilliseconds() % slide.toMilliseconds());
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }
}
  • SlidingProcessingTimeWindow; s inherits Window, where the element type is Object and the window type is TimeWindow; It has three parameters, one is size, one is slide, and one is offset, where offset must be greater than or equal to 0, offset must be greater than slide, and size must be greater than 0
  • The assignWindows method uses slide as the size to calculate lastStart (Unlike SlidingEventTimeWindows, this method of SlidingProcessingTimeWindows uses the context. getCurrentProcessingTime () value to reset timestamp), and then think that start+size > timestamp is the cycle condition, subtract slide from start each time, and calculate TimeWindow(start, start+size);) one by one; The getDefaultTrigger method returns ProcessingTimeTrigger;; The getWindowSerializer method returned timewindow.serializer (); IsEventTime returned false.
  • SlidingEventTimeWindows provides a method of static factories, which can specify the size, slide and offset parameters. It converts the incoming offset parameters into milliseconds and then takes the remainder with slide.toMilliseconds () as the final offset value

Summary

  • Flink’s Sliding Window is divided into SlidingEventTimeWindow; s and SlidingProcessingTimeWindows, which both inherit WindowAssigner, where the element type is Object and the window type is TimeWindow; It has three parameters, one is size, one is slide, and one is offset, where offset must be greater than or equal to 0, offset must be greater than slide, and size must be greater than 0
  • WindowAssigner defines several abstract methods, namely, assignWindows, getDefaultTrigger, getWindowSerializer, isEventTime, and also defines the abstract static class WindowAssignerContext; . It has two generics, where T is the element type and W is the window type; SlidingEventTimeWindows and SlidingProcessingTimeWindows have a window type of TimeWindow, which has start and end attributes, where start is inclusive and end is exclusive, maxTimestamp returns end-1, and it also provides mergeWindows and getWindowStartWithOffset static methods. The former is used to merge overlapping time windows, while the latter is used to obtain window start for specifying timestamp, offset and windowSize.
  • SlidingEventTimeWindows differs from SlidingProcessingTimeWindows in assignWindows, getDefaultTrigger, and isEventTime methods. The former assignWindows to use timestamp in the parameter, while the latter uses context. getcurrentprocessingtime (); GetDefaultTrigger of the former returns EventTimeTrigger, while the latter returns ProcessingTimeTrigger; ; The former isEventTime method returns true, while the latter returns false

doc