Talk about flink’s Allowed Lateness

  flink

Order

This article mainly studies flink’s Allowed Lateness

WindowedStream

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/WindowedStream.java

@Public
public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream. */
    private final KeyedStream<T, K> input;

    /** The window assigner. */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

    /** The user-specified allowed lateness. */
    private long allowedLateness = 0L;

    /**
     * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
     * dropped.
      */
    private OutputTag<T> lateDataOutputTag;

    @PublicEvolving
    public WindowedStream<T, K, W> allowedLateness(Time lateness) {
        final long millis = lateness.toMilliseconds();
        checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

        this.allowedLateness = millis;
        return this;
    }

    @PublicEvolving
    public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
        Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
        this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
        return this;
    }

    //......

    public <R> SingleOutputStreamOperator<R> reduce(
            ReduceFunction<T> reduceFunction,
            WindowFunction<T, R, K, W> function,
            TypeInformation<R> resultType) {

        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
        }

        //clean the closures
        function = input.getExecutionEnvironment().clean(function);
        reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);

        final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
        KeySelector<T, K> keySel = input.getKeySelector();

        OneInputStreamOperator<T, R> operator;

        if (evictor != null) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            ListStateDescriptor<StreamRecord<T>> stateDesc =
                new ListStateDescriptor<>("window-contents", streamRecordSerializer);

            operator =
                new EvictingWindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
                    trigger,
                    evictor,
                    allowedLateness,
                    lateDataOutputTag);

        } else {
            ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
                reduceFunction,
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    new InternalSingleValueWindowFunction<>(function),
                    trigger,
                    allowedLateness,
                    lateDataOutputTag);
        }

        return input.transform(opName, resultType, operator);
    }

    //......
}
  • WindowedStream has two parameters related to Allowed Lateness, one is allowedLateness, which is used to specify the length of time that elements are allowed to be late, and the other is lateDataOutputTag, which is used to configure the output of late elements.
  • WindowedStream creates different WindowOperator (Evictor did not create EvictingWindowOperator for null, and evictor created WindowOperator for null.)
  • EvictingWindowOperator inherits WindowOperator. Its constructors have more Evictor parameters than WindowOperator, but their constructors all need Trigger, allowedLateness and lateDataOutputTag parameters.

OutputTag

flink-core-1.7.0-sources.jar! /org/apache/flink/util/OutputTag.java

@PublicEvolving
public class OutputTag<T> implements Serializable {

    private static final long serialVersionUID = 2L;

    private final String id;

    private final TypeInformation<T> typeInfo;

    public OutputTag(String id) {
        Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
        this.id = id;

        try {
            this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
        }
        catch (InvalidTypesException e) {
            throw new InvalidTypesException("Could not determine TypeInformation for the OutputTag type. " +
                    "The most common reason is forgetting to make the OutputTag an anonymous inner class. " +
                    "It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.", e);
        }
    }

    public OutputTag(String id, TypeInformation<T> typeInfo) {
        Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
        this.id = id;
        this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null.");
    }

    // ------------------------------------------------------------------------

    public String getId() {
        return id;
    }

    public TypeInformation<T> getTypeInfo() {
        return typeInfo;
    }

    // ------------------------------------------------------------------------

    @Override
    public boolean equals(Object obj) {
        return obj instanceof OutputTag
            && ((OutputTag) obj).id.equals(this.id);
    }

    @Override
    public int hashCode() {
        return id.hashCode();
    }

    @Override
    public String toString() {
        return "OutputTag(" + getTypeInfo() + ", " + id + ")";
    }
}
  • OutputTag is a side output identifier with name and type information; Flink allows ProcessFunction, CoProcessFunction, ProcessWindowFunction, ProcessAllWindowFunction to output side output. the Context of these function has an output (outputtag < x > outputtag, xvalue) method for outputting elements to side output.

SingleOutputStreamOperator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {

    protected boolean nonParallel = false;

    private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>();

    private boolean wasSplitApplied = false;

    //......

    public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
        if (wasSplitApplied) {
            throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
                "As a work-around, please add a no-op map function before the split() call.");
        }

        sideOutputTag = clean(requireNonNull(sideOutputTag));

        // make a defensive copy
        sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

        TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
        if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
            throw new UnsupportedOperationException("A side output with a matching id was " +
                    "already requested with a different type. This is not allowed, side output " +
                    "ids need to be unique.");
        }

        requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

        SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
        return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
    }
}
  • SingleOutputStreamOperator provides the getSideOutput method, which can obtain the site output previously output in the function according to the OutputTag.

WindowOperator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

    //......

    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<W> mergingWindows = getMergingWindowSet();

            for (W window: elementWindows) {

                // adding the new window might result in a merge, in that case the actualWindow
                // is the merged window and we work with that. If we don't merge then
                // actualWindow == window
                W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                    @Override
                    public void merge(W mergeResult,
                            Collection<W> mergedWindows, W stateWindowResult,
                            Collection<W> mergedStateWindows) throws Exception {

                        if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
                            throw new UnsupportedOperationException("The end timestamp of an " +
                                    "event-time window cannot become earlier than the current watermark " +
                                    "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                                    " window: " + mergeResult);
                        } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
                            throw new UnsupportedOperationException("The end timestamp of a " +
                                    "processing-time window cannot become earlier than the current processing time " +
                                    "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
                                    " window: " + mergeResult);
                        }

                        triggerContext.key = key;
                        triggerContext.window = mergeResult;

                        triggerContext.onMerge(mergedWindows);

                        for (W m: mergedWindows) {
                            triggerContext.window = m;
                            triggerContext.clear();
                            deleteCleanupTimer(m);
                        }

                        // merge the merged state windows into the newly resulting state window
                        windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });

                // drop if the window is already late
                if (isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                isSkippedElement = false;

                W stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }

                windowState.setCurrentNamespace(stateWindow);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = actualWindow;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(actualWindow, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(actualWindow);
            }

            // need to make sure to update the merging state in state
            mergingWindows.persist();
        } else {
            for (W window: elementWindows) {

                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }

        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    protected boolean isElementLate(StreamRecord<IN> element){
        return (windowAssigner.isEventTime()) &&
            (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
    }

    private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

    //......
}
  • WindowOperator has an isElementLate method to determine whether an element is latebased on allowedLateness. Its processElement method finally performs the following logic when isSkippedElement is true and isElementLate is true: if lateDataOutputTag is not null, lateelement is output to side output. If lateDataOutputTag is null, execute numLateRecordsDropped.inc () to increment numLateRecordsDropped statistics

Summary

  • When using event-time window, flink provides the allowedLateness method to configure the allowable lateness time of an element, beyond which it will be discarded (Elements that arrive within the window end time+allowable lateness time will still be added to the window), which is set to 0 by default; For a window assigner like GlobalWindows, since its end timestamp is Long.MAX_VALUE, element does not matter late.
  • OutputTag is a side output identifier with name and type information; Flink allows ProcessFunction, CoProcessFunction, ProcessWindowFunction, ProcessAllWindowFunction to output side output. the Context of these function has an output (outputtag < x > outputtag, xvalue) method for outputting elements to side output.
  • SingleOutputStreamOperator provides the getSideOutput method, which can obtain the siteOutput previously output in the function according to the OutputTag; The WindowOperlateor’s processElement method will determine at the end that if isSkippedElement is true and isElementLate is also true, lateDataOutputTag will output lateelement to side output if latedataoutputtag is not null.

doc