Talk about Evictors of flink

  flink

Order

This article mainly studies flink’s Evictors

Evictor

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/evictors/Evictor.java

@PublicEvolving
public interface Evictor<T, W extends Window> extends Serializable {

    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

    interface EvictorContext {

        long getCurrentProcessingTime();

        MetricGroup getMetricGroup();

        long getCurrentWatermark();
    }
}
  • Evictor receives two generics, one element type and the other window type. It defines evictBefore (Before windowing function)、evictAfter(After windowing function) two methods, both of which have EvictorContext parameters; EvictorContext defines getCurrentProcessingTime, getMetricGroup, getCurrentWatermark methods

CountEvictor

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java

@PublicEvolving
public class CountEvictor<W extends Window> implements Evictor<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;
    private final boolean doEvictAfter;

    private CountEvictor(long count, boolean doEvictAfter) {
        this.maxCount = count;
        this.doEvictAfter = doEvictAfter;
    }

    private CountEvictor(long count) {
        this.maxCount = count;
        this.doEvictAfter = false;
    }

    @Override
    public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (!doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
        if (size <= maxCount) {
            return;
        } else {
            int evictedCount = 0;
            for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
                iterator.next();
                evictedCount++;
                if (evictedCount > size - maxCount) {
                    break;
                } else {
                    iterator.remove();
                }
            }
        }
    }

    public static <W extends Window> CountEvictor<W> of(long maxCount) {
        return new CountEvictor<>(maxCount);
    }

    public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) {
        return new CountEvictor<>(maxCount, doEvictAfter);
    }
}
  • CountEvictor implements the Evictor interface, where the element type is Object; ; It has two attributes, doEvictAfter and maxCount; ; The doevictAfter is used to specify whether to use the evictBefore method or the EvictAfter method. MaxCount is the threshold value for the number of window elements. If it exceeds the threshold value, it will be deleted.

DeltaEvictor

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java

@PublicEvolving
public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
    private static final long serialVersionUID = 1L;

    DeltaFunction<T> deltaFunction;
    private double threshold;
    private final boolean doEvictAfter;

    private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
        this.deltaFunction = deltaFunction;
        this.threshold = threshold;
        this.doEvictAfter = false;
    }

    private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
        this.deltaFunction = deltaFunction;
        this.threshold = threshold;
        this.doEvictAfter = doEvictAfter;
    }

    @Override
    public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
        if (!doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
        if (doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
        TimestampedValue<T> lastElement = Iterables.getLast(elements);
        for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
            TimestampedValue<T> element = iterator.next();
            if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
                iterator.remove();
            }
        }
    }

    @Override
    public String toString() {
        return "DeltaEvictor(" +  deltaFunction + ", " + threshold + ")";
    }

    public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
        return new DeltaEvictor<>(threshold, deltaFunction);
    }

    public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
        return new DeltaEvictor<>(threshold, deltaFunction, doEvictAfter);
    }
}
  • DeltaEvictor implements the Evictor interface, which has three attributes: doEvictAfter, threshold, deltaFunction;; The doevictAfter is used to specify whether to use the evictBefore method or the EvictAfter method. Threshold is the threshold if the deltaFunction.getDelta method (Each element and lastElement calculates deltaIf the calculated value is greater than or equal to the value, the element needs to be removed

TimeEvictor

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java

@PublicEvolving
public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long windowSize;
    private final boolean doEvictAfter;

    public TimeEvictor(long windowSize) {
        this.windowSize = windowSize;
        this.doEvictAfter = false;
    }

    public TimeEvictor(long windowSize, boolean doEvictAfter) {
        this.windowSize = windowSize;
        this.doEvictAfter = doEvictAfter;
    }

    @Override
    public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (!doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (doEvictAfter) {
            evict(elements, size, ctx);
        }
    }

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
        if (!hasTimestamp(elements)) {
            return;
        }

        long currentTime = getMaxTimestamp(elements);
        long evictCutoff = currentTime - windowSize;

        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
            TimestampedValue<Object> record = iterator.next();
            if (record.getTimestamp() <= evictCutoff) {
                iterator.remove();
            }
        }
    }

    private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
        Iterator<TimestampedValue<Object>> it = elements.iterator();
        if (it.hasNext()) {
            return it.next().hasTimestamp();
        }
        return false;
    }

    private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
        long currentTime = Long.MIN_VALUE;
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
            TimestampedValue<Object> record = iterator.next();
            currentTime = Math.max(currentTime, record.getTimestamp());
        }
        return currentTime;
    }

    @Override
    public String toString() {
        return "TimeEvictor(" + windowSize + ")";
    }

    @VisibleForTesting
    public long getWindowSize() {
        return windowSize;
    }

    public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
        return new TimeEvictor<>(windowSize.toMilliseconds());
    }

    public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) {
        return new TimeEvictor<>(windowSize.toMilliseconds(), doEvictAfter);
    }
}
  • TimeEvictor implements the Evictor interface, where the element type is Object; ; It has two attributes, doEvictAfter and windowSize; ; The doevictAfter is used to specify whether to use the evictBefore method or the EvictAfter method. WindowSize is used to specify the time length of t he window. The window element’s maximum timestamp -windowSize is evictCutoff. All elements whose timestamp is less than or equal to evictCutoff will be eliminated.

Summary

  • Evictor receives two generics, one element type and the other window type. It defines evictBefore (Before windowing function)、evictAfter(After windowing function) two methods, both of which have EvictorContext parameters; EvictorContext defines getCurrentProcessingTime, getMetricGroup, getCurrentWatermark methods
  • Evictor has several built-in implementation classes, which are CountEvictor, DeltaEvictor and TimeEvictor; . Among them, the Counter is eliminated according to the number of window elements, the TimeEvictor is eliminated according to the window length, and the DeltaEvictor is eliminated according to the comparison between the delta of window elements and lastElement and the specified threshold.
  • If evictor is specified (evictBefore) will prevent any pre-aggregation operation because all window elements will perform the evictor operation before the WindowFunction calculation; In addition, flink does not guarantee the order of window elements, that is, if evictor removes elements according to the beginning or end of the window, the elements that may be removed are not actually the first or the last to be reached.

doc