Talk about flink’s Session Window

  flink

Order

This article mainly studies flink’s Session Window

MergingWindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.java

@PublicEvolving
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
    private static final long serialVersionUID = 1L;

    /**
     * Determines which windows (if any) should be merged.
     *
     * @param windows The window candidates.
     * @param callback A callback that can be invoked to signal which windows should be merged.
     */
    public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);

    /**
     * Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
     * windows should be merged.
     */
    public interface MergeCallback<W> {

        /**
         * Specifies that the given windows should be merged into the result window.
         *
         * @param toBeMerged The list of windows that should be merged into one window.
         * @param mergeResult The resulting merged window.
         */
        void merge(Collection<W> toBeMerged, W mergeResult);
    }
}
  • MergingwindowAssigner inherits WindowAssigner and defines mergewindows abstract method. The method has a MergeCallback type parameter. The MergeCallbac k interface defines the MergeMethod, which is passed into the Windows before Mergeand the merge Window.

EventTimeSessionWindows

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java

public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    protected long sessionTimeout;

    protected EventTimeSessionWindows(long sessionTimeout) {
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size");
        }

        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "EventTimeSessionWindows(" + sessionTimeout + ")";
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param size The session timeout, i.e. the time gap between sessions
     * @return The policy.
     */
    public static EventTimeSessionWindows withGap(Time size) {
        return new EventTimeSessionWindows(size.toMilliseconds());
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
     * @return The policy.
     */
    @PublicEvolving
    public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

}
  • EventTimeSessionWindows inherits MergingWindowAssigner and its constructor parameter is sessionTimeout; ; The assignWindows method returns TimeWindow with a start of timestamp and an end of timestamp+sessionTimeout.
  • The getDefaultTrigger method returns EventTimeTrigger; ; GetWindowSerializer returned timewindow.serializer (); IsEventTime returned true; ; MergeWindows method calls TimeWindow.mergeWindows method
  • EventTimeSessionWindows defines two static factory methods, withGap and WITh DynamiGAP, where withGap creates EventTimeSessionWindows and WITh DynamiGAP creates DynamicEventTimeSessionWindows.

ProcessingTimeSessionWindows

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    protected long sessionTimeout;

    protected ProcessingTimeSessionWindows(long sessionTimeout) {
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("ProcessingTimeSessionWindows parameters must satisfy 0 < size");
        }

        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        long currentProcessingTime = context.getCurrentProcessingTime();
        return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "ProcessingTimeSessionWindows(" + sessionTimeout + ")";
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param size The session timeout, i.e. the time gap between sessions
     * @return The policy.
     */
    public static ProcessingTimeSessionWindows withGap(Time size) {
        return new ProcessingTimeSessionWindows(size.toMilliseconds());
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
     * @return The policy.
     */
    @PublicEvolving
    public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

}
  • ProcessingTimeSessionWindows inherits MergingWindowAssigner and its constructor parameter is sessionTimeout;; The start of TimeWindow returned by the assignWindows method is currentProcessingTime (Here, the currentProcessingTime value is Context. GetCurrentProcessingTime ()), end is currentProcessingTime+sessionTimeout
  • The getDefaultTrigger method returns ProcessingTimeTrigger; ; GetWindowSerializer returned timewindow.serializer (); IsEventTime returned false; ; MergeWindows method calls TimeWindow.mergeWindows method
  • ProcessingTimeSessionWindows defines two static factory methods, withGap and WITh DynamiGAP, where withGap creates ProcessingTimeSessionWindows and WITh DynamiGAP creates DynamiProcessingTimeSessionWindows.

SessionWindowTimeGapExtractor

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java

@PublicEvolving
public interface SessionWindowTimeGapExtractor<T> extends Serializable {
    /**
     * Extracts the session time gap.
     * @param element The input element.
     * @return The session time gap in milliseconds.
     */
    long extract(T element);
}
  • The SessionWindowTimeGapExtractor interface defines the extract method, which is used to extract the sessionTimeout parameter from the element.

DynamicEventTimeSessionWindows

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java

@PublicEvolving
public class DynamicEventTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
    private static final long serialVersionUID = 1L;

    protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;

    protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
    }

    @Override
    public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
        long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
        }
        return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
    }

    @SuppressWarnings("unchecked")
    @Override
    public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return (Trigger<T, TimeWindow>) EventTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "DynamicEventTimeSessionWindows()";
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
     * @return The policy.
     */
    public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

}
  • DynamicEventTimeSessionWindows also inherits MergingWindowAssigner. Unlike EventTimeSessionWindows, its constructor parameter is SessionWindowTimeGapExtractor
  • The assignWindows method first uses sessionWindowTimeGapExtractor to extract sessionTimeout from the element, and then returns to TimeWindow (Timestamp, Timestamp+sessionTimeout); The getDefaultTrigger method returns EventTimeTrigger; ; IsEventTime returned true; ; MergeWindows method calls TimeWindow.mergeWindows method
  • DynamiEventTimeSessionWindows defines a static factory method for withDynamicGap to create DynamiEventTimeSessionWindows.

DynamicProcessingTimeSessionWindows

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java

@PublicEvolving
public class DynamicProcessingTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
    private static final long serialVersionUID = 1L;

    protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;

    protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
    }

    @Override
    public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
        long currentProcessingTime = context.getCurrentProcessingTime();
        long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
        }
        return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
    }

    @SuppressWarnings("unchecked")
    @Override
    public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return (Trigger<T, TimeWindow>) ProcessingTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "DynamicProcessingTimeSessionWindows()";
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
     * @return The policy.
     */
    public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

}
  • Dynamic ProcessingTimeSessionWindows also inherits MergingWindowAssigner. Unlike ProcessingTimeSessionWindows, its constructor parameter is SessionWindowTimeGapExtractor
  • The assignWindows method first uses sessionWindowTimeGapExtractor to extract sessionTimeout from the element, and then returns to TimeWindow (CurrentProcessingTime+SessionTimeout) (Here, the value of currentProcessingTime is Context. GetCurrentProcessingTime ()); The getDefaultTrigger method returns ProcessingTimeTrigger;; IsEventTime returned false;; MergeWindows method calls TimeWindow.mergeWindows method
  • Dynamic ProcessingTimeSessionWindows defines a static factory method for withDynamicGap to create Dynamic ProcessingTimeSessionWindows

Summary

  • Flink’s session window mainly includes EventTimeSessionWindows, DynamicEventTimeSessionWindows, ProcessingTimeSessionWindows, DynamiyProcessingTimeSessionWindows, which all inherit MergingWindowAssigner; ; MergingWindowAssigner defines mergeWindows abstract methods
  • The constructor parameters of EventTimeSessionWindows and ProcessingTimeSessionWindows are both sessionTimeout. The difference is that in assignWindows , ProcessingTimeSessionWindows uses context. getCurrentProcessingTime () instead of the timestamp parameter to calculate TimeWindow; ; The getDefaultTrigger method returns EventTimeTrigger for the former and ProcessingTimeTrigger; for the latter. The isEventTime method returns true for the former and false for the latter.
  • DynamicEventTimeSessionWindows and Dynamic ProcessingTimeSessionWindows differ from non-dynamic in that their constructor parameters are SessionWindowTimeGapExtractor; ; The SessionWindowTimeGapExtractor interface defines the extract method, which is used to extract the sessionTimeout parameter from the element. Instead of dynamic session window, its sessionTimeout parameter is fixed after the constructor is passed in.

doc