Talk about flink’s Triggers

  flink

Order

This article mainly studies flink’s Triggers

Trigger

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/triggers/Trigger.java

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

    public boolean canMerge() {
        return false;
    }

    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    public interface TriggerContext {

        long getCurrentProcessingTime();

        MetricGroup getMetricGroup();

        long getCurrentWatermark();

        void registerProcessingTimeTimer(long time);

        void registerEventTimeTimer(long time);

        void deleteProcessingTimeTimer(long time);

        void deleteEventTimeTimer(long time);

        <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

        @Deprecated
        <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

        @Deprecated
        <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
    }

    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
    }
}
  • Trigger receives two generics, one element type and the other window type. It defines several methods of onElement, onProcessingTime, onEventTime, canMerge, onMerge, clear, among which onElement, onProcessingTime, onEventTime all need to return TriggerResult
  • OnElement is called back when each element is added to the window; OnProcessingTime will be recalled when the registered Event-TimeTimeTimer triggers; OnEventTime will be recalled when the registered processing-time timer triggers.
  • CanMerge is used to identify whether the merge of trigger state is supported, and returns false; by default; OnMerge will be triggered when multiple window merge; Clear is used to clear the relevant state stored in the TriggerContext.
  • Trigger also defines TriggerContext and OnMergeContext; ; TriggerContext defines the methods of registering and deleting EventTimeTimer and ProcessingTimeTimer, and also defines the methods of getCurrentProcessingTime, getMetricGroup, getCurrentWatermark, getPartitionedState, getKeyValueState and getKeyValueState.
  • OnMergeContext inherits the TriggerContext, which mostly defines the mergePartitionedState method.

TriggerResult

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java

public enum TriggerResult {

    CONTINUE(false, false),

    FIRE_AND_PURGE(true, true),

    FIRE(true, false),

    PURGE(false, true);

    // ------------------------------------------------------------------------

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}
  • TriggerResult is used to represent the action enumeration returned by Trigger when onElement, onProcessingTime and onEventTime are called back. It has two attributes: fire and purge, and five enumerations: CONTINUE, FIRE_AND_PURGE, FIRE and PURGE
  • Fire indicates whether to trigger the calculation operation of window; While purge indicates whether to clean up the window data of the window
  • CONTINUE means not to do anything with window; FIRE_AND_PURGE means to trigger the window’s calculation operation and then clean up the window’s window data; FIRE means that only the calculation operation of the window is triggered but the window data of the window is not cleaned; PURGE means that the calculation operation of the window is not triggered but the window data of the window is to be cleaned up.

EventTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}
  • EventTimeTrigger inherits Trigger, the element type is Object, and the window type is TimeWindow; ; SlidingEventTimeWindows, TumblingEventTimeWindows, EventTimeSessionWindows, DynamicEventTimeSessionWindows all use EventTimeTrigger by default.
  • OnElement returns TriggerResult.FIRE when window.maxTimestamp () is less than or equal to ctx.getCurrentWatermark (), otherwise, it executes ctx.registereventtimestamp (), and then returns TriggerResult.CONTINUE; ; OnEventTime returns TriggerResult.FIRE when time equals window.maxTimestamp (), otherwise it returns TriggerResult.CONTINUE; ; OnProcessingTime returns TriggerResult.CONTINUE
  • CanMerge returns true; ; OnMerge executes ctx.registereventtimer (windowmaxtimestamps) when window.maxTimestamp () is greater than ctx.getCurrentWatermark (). Clear executes ctx.deleteeventtimer (window.maxtimestamp ())

ProcessingTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {}

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        // only register a timer if the time is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the time is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }

}
  • ProcessingTimeTrigger inherits Trigger, element type is Object, and window type is TimeWindow; ; SlidingProcessingTimeWindows, TumblingProcessingTimeWindows, ProcessingTimeSessionWindows, DynamiProcessingTimeSessionWindows use ProcessingTimeTrigger by default.
  • OnElement executes ctx.registerprocessingtimer (window.maxtimestamp ()) and then returns TriggerResult.CONTINUE; ; OnEventTime returned TriggerResult.CONTINUE; ; OnProcessingTime returns TriggerResult.FIRE
  • CanMerge returns true; ; OnMerge executes ctx.registerprocessingtimemer (windowmaxtimestamps) when window.maxTimestamp () is greater than ctx.getCurrentWatermark (); Clear executes ctx.deleteprocessing timer (window.maxtimestamp ())

NeverTrigger

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java

    @Internal
    public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
        private static final long serialVersionUID = 1L;

        @Override
        public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

        @Override
        public void onMerge(GlobalWindow window, OnMergeContext ctx) {
        }
    }
  • OnElement, onEventTime, onProcessingTime of NeverTrigger all return TriggerResult.CONTINUE; ; GlobalWindows uses NeverTrigger by default.

CountTrigger

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java

@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    private CountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
    }

    @Override
    public String toString() {
        return "CountTrigger(" +  maxCount + ")";
    }

    public static <W extends Window> CountTrigger<W> of(long maxCount) {
        return new CountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }
}
  • CountTrigger inherits Trigger and specifies element type as Object type; It defines maxCount and ReducingStateDescriptor;; Where ReducingStateDescriptor is used for window counting (It uses its own defined Sum function), in the onElement method, when the count is greater than or equal to maxCount, the count will be cleared and then TriggerResult.FIRE will be returned, otherwise TriggerResult.CONTINUE; will be returned; OnEventTime, onProcessingTime both return TriggerResult.CONTINUE;; CanMerge returns true;; OnMerge executes ctx.mergepartitionedstate (statedesc); Clear executes ctx.getpartitionedstate (statedesc.clear ()

PurgingTrigger

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java

@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;

    private Trigger<T, W> nestedTrigger;

    private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
        this.nestedTrigger = nestedTrigger;
    }

    @Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        nestedTrigger.clear(window, ctx);
    }

    @Override
    public boolean canMerge() {
        return nestedTrigger.canMerge();
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        nestedTrigger.onMerge(window, ctx);
    }

    @Override
    public String toString() {
        return "PurgingTrigger(" + nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
        return new PurgingTrigger<>(nestedTrigger);
    }

    @VisibleForTesting
    public Trigger<T, W> getNestedTrigger() {
        return nestedTrigger;
    }
}
  • PurgingTrigger is a packaged Trigger. It wraps nestedTrigger. Its onElement, onEventTime, onProcessingTime return Trigger Result. Fire _ AND _ PURGE when triggerResult.isFire () is true according to the return result of nestedTrigger. CanMerge, onMerge, clear and other methods are entrusted to nestedTrigger for processing.

Summary

  • Trigger receives two generics, one element type and the other window type. It defines several methods of onElement, onProcessingTime, onEventTime, canMerge, onMerge, clear, among which onElement, onProcessingTime, onEventTime all need to return TriggerResult;; TriggerResult is used to represent the action enumeration returned by Trigger when onElement, onProcessingTime and onEventTime are called back. It has two attributes: fire and purge (Fire indicates whether to trigger the calculation operation of window; While purge indicates whether to clean up the window data of the window), CONTINUE, FIRE_AND_PURGE, FIRE, PURGE five enumerations
  • SlidingEventTimeWindows, TumblingEventTimeWindows, EventTimeSessionWindows, DynamicEventTimeSessionWindows all use EventTimeTrigger; by default; SlidingProcessingTimeWindows, TumblingProcessingTimeWindows, ProcessingTimeSessionWindows, DynamiProcessingTimeSessionWindows use ProcessingTimeTrigger; by default; GlobalWindows uses NeverTrigger by default.
  • CountTrigger is mainly used for counting window types. It uses ReducingStateDescriptor to count windows. In the onElement method, when the count is greater than or equal to maxCount, the count will be cleared and then TriggerResult.FIRE will be returned; otherwise, TriggerResult.CONTINUE; will be returned. PurgingTrigger is a packaged Trigger. It wraps nestedTrigger. Its onElement, onEventTime, onProcessingTime return Trigger Result. Fire _ AND _ PURGE when triggerResult.isFire () is true according to the return result of nestedTrigger. CanMerge, onMerge, clear and other methods are entrusted to nestedTrigger for processing.

doc