Talk about flink’s Tumbling Window

  flink

Order

This article mainly studies flink’s Tumbling Window

WindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * Returns a {@code Collection} of windows that should be assigned to the element.
     *
     * @param element The element to which windows should be assigned.
     * @param timestamp The timestamp of the element.
     * @param context The {@link WindowAssignerContext} in which the assigner operates.
     */
    public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);

    /**
     * Returns the default trigger associated with this {@code WindowAssigner}.
     */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

    /**
     * Returns a {@link TypeSerializer} for serializing windows that are assigned by
     * this {@code WindowAssigner}.
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

    /**
     * Returns {@code true} if elements are assigned to windows based on event time,
     * {@code false} otherwise.
     */
    public abstract boolean isEventTime();

    /**
     * A context provided to the {@link WindowAssigner} that allows it to query the
     * current processing time.
     *
     * <p>This is provided to the assigner by its containing
     * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
     * which, in turn, gets it from the containing
     * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
     */
    public abstract static class WindowAssignerContext {

        /**
         * Returns the current processing time.
         */
        public abstract long getCurrentProcessingTime();

    }
}
  • WindowAssigner defines several abstract methods, namely, assignWindows, getDefaultTrigger, getWindowSerializer, isEventTime, and also defines the abstract static class WindowAssignerContext; . It has two generics, where t is the element type and w is the window type

Window

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/windows/Window.java

@PublicEvolving
public abstract class Window {

    /**
     * Gets the largest timestamp that still belongs to this window.
     *
     * @return The largest timestamp that still belongs to this window.
     */
    public abstract long maxTimestamp();
}
  • The Window object represents a set that divides infinite flow data into finite buckets. It has a maxTimestamp that represents the arrival of the window data at that point in time. It has two subclasses, one is GlobalWindow and the other is TimeWindow.

TimeWindow

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/windows/TimeWindow.java

@PublicEvolving
public class TimeWindow extends Window {

    private final long start;
    private final long end;

    public TimeWindow(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * Gets the starting timestamp of the window. This is the first timestamp that belongs
     * to this window.
     *
     * @return The starting timestamp of this window.
     */
    public long getStart() {
        return start;
    }

    /**
     * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it
     * is the first timestamp that does not belong to this window any more.
     *
     * @return The exclusive end timestamp of this window.
     */
    public long getEnd() {
        return end;
    }

    /**
     * Gets the largest timestamp that still belongs to this window.
     *
     * <p>This timestamp is identical to {@code getEnd() - 1}.
     *
     * @return The largest timestamp that still belongs to this window.
     *
     * @see #getEnd()
     */
    @Override
    public long maxTimestamp() {
        return end - 1;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        TimeWindow window = (TimeWindow) o;

        return end == window.end && start == window.start;
    }

    @Override
    public int hashCode() {
        return MathUtils.longToIntWithBitMixing(start + end);
    }

    @Override
    public String toString() {
        return "TimeWindow{" +
                "start=" + start +
                ", end=" + end +
                '}';
    }

    /**
     * Returns {@code true} if this window intersects the given window.
     */
    public boolean intersects(TimeWindow other) {
        return this.start <= other.end && this.end >= other.start;
    }

    /**
     * Returns the minimal window covers both this window and the given window.
     */
    public TimeWindow cover(TimeWindow other) {
        return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
    }

    // ------------------------------------------------------------------------
    // Serializer
    // ------------------------------------------------------------------------

    //......

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    /**
     * Merge overlapping {@link TimeWindow}s. For use by merging
     * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.
     */
    public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {

        // sort the windows by the start time and then merge overlapping windows

        List<TimeWindow> sortedWindows = new ArrayList<>(windows);

        Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
            @Override
            public int compare(TimeWindow o1, TimeWindow o2) {
                return Long.compare(o1.getStart(), o2.getStart());
            }
        });

        List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
        Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;

        for (TimeWindow candidate: sortedWindows) {
            if (currentMerge == null) {
                currentMerge = new Tuple2<>();
                currentMerge.f0 = candidate;
                currentMerge.f1 = new HashSet<>();
                currentMerge.f1.add(candidate);
            } else if (currentMerge.f0.intersects(candidate)) {
                currentMerge.f0 = currentMerge.f0.cover(candidate);
                currentMerge.f1.add(candidate);
            } else {
                merged.add(currentMerge);
                currentMerge = new Tuple2<>();
                currentMerge.f0 = candidate;
                currentMerge.f1 = new HashSet<>();
                currentMerge.f1.add(candidate);
            }
        }

        if (currentMerge != null) {
            merged.add(currentMerge);
        }

        for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
            if (m.f1.size() > 1) {
                c.merge(m.f1, m.f0);
            }
        }
    }

    /**
     * Method to get the window start for a timestamp.
     *
     * @param timestamp epoch millisecond to get the window start.
     * @param offset The offset which window start would be shifted by.
     * @param windowSize The size of the generated windows.
     * @return window start
     */
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
}
  • TimeWindow has start and end attributes, where start is inclusive and end is exclusive, so maxTimestamp returns end-1; The equals and hashcode methods have been rewritten here.
  • TimeWindow provides intersects method to indicate whether this window intersects with the specified window. The cover method is used to return the ove rlapping window of this window and the specified window.
  • TimeWindow also provides mergeWindows and getWindowStartWithOffset static methods; The former is used to merge overlapping time windows, while the latter is used to obtain window start for specifying timestamp, offset and windowSize.

TumblingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java

@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private final long size;

    private final long offset;

    protected TumblingEventTimeWindows(long size, long offset) {
        if (offset < 0 || offset >= size) {
            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
        }

        this.size = size;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            // Long.MIN_VALUE is currently assigned when no timestamp is present
            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "TumblingEventTimeWindows(" + size + ")";
    }

    public static TumblingEventTimeWindows of(Time size) {
        return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
    }

    public static TumblingEventTimeWindows of(Time size, Time offset) {
        return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }
}
  • TumblingEventTimeWindow; s inherits Window, where the element type is Object and the window type is TimeWindow; It has two parameters, one is size and the other is offset, where offset must be greater than or equal to 0 and size must be greater than offset
  • The windows acquired by the assignWindows method are start and start+size, while START = TimeWindow. GetWindowSetTwithoffset (Timestamp, Offset, SIZE); The getDefaultTrigger method returns EventTimeTrigger; ; The getWindowSerializer method returned timewindow.serializer (); IsEventTime returns true
  • TumblingEventTimeWindows provides the of static factory method, which can specify the size and offset parameters

TumblingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private final long size;

    private final long offset;

    private TumblingProcessingTimeWindows(long size, long offset) {
        if (offset < 0 || offset >= size) {
            throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0 <= offset < size");
        }

        this.size = size;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        final long now = context.getCurrentProcessingTime();
        long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    }

    public long getSize() {
        return size;
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "TumblingProcessingTimeWindows(" + size + ")";
    }

    public static TumblingProcessingTimeWindows of(Time size) {
        return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
    }

    public static TumblingProcessingTimeWindows of(Time size, Time offset) {
        return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }
}
  • TumblingProcessingTimeWindow; s inherits WindowAssigner, where the element type is Object and the window type is TimeWindow; It has two parameters, one is size and the other is offset, where offset must be greater than or equal to 0 and size must be greater than offset
  • The windows acquired by the assignWindows method are start and start+size, while start = timewindow.getwindowstarttwithoffset (now, offset, size), and now value is context.getcurrentprocessingtime (), Is different from TumblingEventTimeWindows, TumblingProcessingTimeWindows does not use timestamp parameter to calculate, it uses now value instead; The getDefaultTrigger method returns ProcessingTimeTrigger, while the isEventTime method returns false
  • TumblingProcessingTimeWindows also provides the of static factory method, which can specify size and offset parameters

Summary

  • Flink’s Tumbling Window is divided into TumblingEventTimeWindow; s and TumblingProcessingTimeWindows, both of which inherit WindowAssigner, where the element type is Object and the window type is TimeWindow; It has two parameters, one is size and the other is offset, where offset must be greater than or equal to 0 and size must be greater than offset
  • WindowAssigner defines several abstract methods, namely, assignWindows, getDefaultTrigger, getWindowSerializer, isEventTime, and also defines the abstract static class WindowAssignerContext; . It has two generics, where T is the element type and W is the window type; TumblingEventTimeWindows and TumblingProcessingTimeWindows have a window type of TimeWindow, which has start and end attributes, where start is inclusive and end is exclusive, maxTimestamp returns end-1, and it also provides mergeWindows and getWindowStartWithOffset static methods. The former is used to merge overlapping time windows, while the latter is used to obtain window start for specifying timestamp, offset and windowSize.
  • TumblingEventTimeWindows differs from TumblingProcessingTimeWindows in assignWindows, getDefaultTrigger, isEventTime methods; The former assignWindows uses timestamp in the parameter, while the latter uses now value. GetDefaultTrigger of the former returns EventTimeTrigger, while the latter returns ProcessingTimeTrigger; ; The former isEventTime method returns true, while the latter returns false

doc