Talk about the intervalJoin operation of flink KeyedStream

  flink

Order

This article mainly studies the intervalJoin operation of flink KeyedStream.

Example

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

KeyedStream.intervalJoin

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
    //......

    @PublicEvolving
    public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
        return new IntervalJoin<>(this, otherStream);
    }

    //......
}
  • IntervalJoin of KeyedStream creates and returns IntervalJoin

IntervalJoin

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

    @PublicEvolving
    public static class IntervalJoin<T1, T2, KEY> {

        private final KeyedStream<T1, KEY> streamOne;
        private final KeyedStream<T2, KEY> streamTwo;

        IntervalJoin(
                KeyedStream<T1, KEY> streamOne,
                KeyedStream<T2, KEY> streamTwo
        ) {
            this.streamOne = checkNotNull(streamOne);
            this.streamTwo = checkNotNull(streamTwo);
        }

        @PublicEvolving
        public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {

            TimeCharacteristic timeCharacteristic =
                streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();

            if (timeCharacteristic != TimeCharacteristic.EventTime) {
                throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
            }

            checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
            checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");

            return new IntervalJoined<>(
                streamOne,
                streamTwo,
                lowerBound.toMilliseconds(),
                upperBound.toMilliseconds(),
                true,
                true
            );
        }
    }
  • IntervalJoin provides the between operation, which is used to set the lowerBound and upperBound of interval. It can be seen here that the direct throw of Unsupported TimeCharacteristic Exception to non-TimeCharacteristic.EventTime in the between method; Between operation creates and returns IntervalJoined

IntervalJoined

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

    @PublicEvolving
    public static class IntervalJoined<IN1, IN2, KEY> {

        private final KeyedStream<IN1, KEY> left;
        private final KeyedStream<IN2, KEY> right;

        private final long lowerBound;
        private final long upperBound;

        private final KeySelector<IN1, KEY> keySelector1;
        private final KeySelector<IN2, KEY> keySelector2;

        private boolean lowerBoundInclusive;
        private boolean upperBoundInclusive;

        public IntervalJoined(
                KeyedStream<IN1, KEY> left,
                KeyedStream<IN2, KEY> right,
                long lowerBound,
                long upperBound,
                boolean lowerBoundInclusive,
                boolean upperBoundInclusive) {

            this.left = checkNotNull(left);
            this.right = checkNotNull(right);

            this.lowerBound = lowerBound;
            this.upperBound = upperBound;

            this.lowerBoundInclusive = lowerBoundInclusive;
            this.upperBoundInclusive = upperBoundInclusive;

            this.keySelector1 = left.getKeySelector();
            this.keySelector2 = right.getKeySelector();
        }

        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
            this.upperBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
            this.lowerBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
            Preconditions.checkNotNull(processJoinFunction);

            final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(
                processJoinFunction,
                ProcessJoinFunction.class,
                0,
                1,
                2,
                TypeExtractor.NO_INDEX,
                left.getType(),
                right.getType(),
                Utils.getCallLocationName(),
                true
            );

            return process(processJoinFunction, outputType);
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(
                ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
                TypeInformation<OUT> outputType) {
            Preconditions.checkNotNull(processJoinFunction);
            Preconditions.checkNotNull(outputType);

            final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);

            final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
                new IntervalJoinOperator<>(
                    lowerBound,
                    upperBound,
                    lowerBoundInclusive,
                    upperBoundInclusive,
                    left.getType().createSerializer(left.getExecutionConfig()),
                    right.getType().createSerializer(right.getExecutionConfig()),
                    cleanedUdf
                );

            return left
                .connect(right)
                .keyBy(keySelector1, keySelector2)
                .transform("Interval Join", outputType, operator);
        }
    }
  • IntervalJoined is inclusive for lowerBound and upperBound by default, and it also provides lowerBoundExclusive and upperBoundExclusive to be separately set as exclusive; IntervalJoined provides the process operation and receives the ProcessJoinFunction;; In the process operation, an IntervalJoinOperator is created, and then left.connect (right). keyby (keyselector1, keyselector2). transform (“intervaljoin”, outputtype, operator) returns SingleOutputStreamOperator (In this example, left is orangeStream and right is greenStream.)

ProcessJoinFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java

@PublicEvolving
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {

    private static final long serialVersionUID = -2444626938039012398L;

    public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;

    public abstract class Context {

        public abstract long getLeftTimestamp();

        public abstract long getRightTimestamp();

        public abstract long getTimestamp();

        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}
  • ProcessJoinFunction inherits AbstractRichFunction, which defines processElement abstract method, and also defines its own Context object, which defines four abstract methods: getLeftTimestamp, getRightTimestamp, getTimestamp, and output

IntervalJoinOperator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java

@Internal
public class IntervalJoinOperator<K, T1, T2, OUT>
        extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
        implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {

    private static final long serialVersionUID = -5380774605111543454L;

    private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);

    private static final String LEFT_BUFFER = "LEFT_BUFFER";
    private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
    private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
    private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
    private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";

    private final long lowerBound;
    private final long upperBound;

    private final TypeSerializer<T1> leftTypeSerializer;
    private final TypeSerializer<T2> rightTypeSerializer;

    private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
    private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;

    private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;

    private transient InternalTimerService<String> internalTimerService;

    public IntervalJoinOperator(
            long lowerBound,
            long upperBound,
            boolean lowerBoundInclusive,
            boolean upperBoundInclusive,
            TypeSerializer<T1> leftTypeSerializer,
            TypeSerializer<T2> rightTypeSerializer,
            ProcessJoinFunction<T1, T2, OUT> udf) {

        super(Preconditions.checkNotNull(udf));

        Preconditions.checkArgument(lowerBound <= upperBound,
            "lowerBound <= upperBound must be fulfilled");

        // Move buffer by +1 / -1 depending on inclusiveness in order not needing
        // to check for inclusiveness later on
        this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
        this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;

        this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
        this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
    }

    @Override
    public void open() throws Exception {
        super.open();

        collector = new TimestampedCollector<>(output);
        context = new ContextImpl(userFunction);
        internalTimerService =
            getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);

        this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
            LEFT_BUFFER,
            LongSerializer.INSTANCE,
            new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer))
        ));

        this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
            RIGHT_BUFFER,
            LongSerializer.INSTANCE,
            new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
        ));
    }

    @Override
    public void processElement1(StreamRecord<T1> record) throws Exception {
        processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
    }

    @Override
    public void processElement2(StreamRecord<T2> record) throws Exception {
        processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
    }

    @SuppressWarnings("unchecked")
    private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {

        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }

        if (isLate(ourTimestamp)) {
            return;
        }

        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

    private boolean isLate(long timestamp) {
        long currentWatermark = internalTimerService.currentWatermark();
        return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
    }

    private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
        final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

        collector.setAbsoluteTimestamp(resultTimestamp);
        context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

        userFunction.processElement(left, right, context, collector);
    }

    @Override
    public void onEventTime(InternalTimer<K, String> timer) throws Exception {

        long timerTimestamp = timer.getTimestamp();
        String namespace = timer.getNamespace();

        logger.trace("onEventTime @ {}", timerTimestamp);

        switch (namespace) {
            case CLEANUP_NAMESPACE_LEFT: {
                long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
                logger.trace("Removing from left buffer @ {}", timestamp);
                leftBuffer.remove(timestamp);
                break;
            }
            case CLEANUP_NAMESPACE_RIGHT: {
                long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", timestamp);
                rightBuffer.remove(timestamp);
                break;
            }
            default:
                throw new RuntimeException("Invalid namespace " + namespace);
        }
    }

    @Override
    public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
        // do nothing.
    }

    //......
}
  • IntervalJoinOperator inherits abstract class of AbstractUdfStreamOperator and implements TwoInputStreamOperator and Triggerable interface.
  • IntervalJoinOperator covers AbstractUdfStreamOperator (StreamOperator definition) open and initializeState methods, which create InternalTimerService in the open method, passing the trigger parameter as this, that is, the trigger interface implemented by itself; Two MapState, leftBuffer and rightBuffer, were created in the initializeState method.
  • IntervalJoinOperator implements the processElement1, processElement2 methods defined by the TwoInputStreamOperator interface (Other methods defined by the TwoInputStreamOperator interface are implemented in the parent class AbstractStreamOperator of abstract streamoperator.); Both processElement1 and processElement2 methods call the processElement method internally, except that the passed relativeLowerBound, relativeUpperBound, isLeft parameters are different, and the passed parameters of leftBuffer and rightBuffer are different in order.
  • The processElement method implements the time matching logic of intervalJoin. it will get the currentWatermark from internalTimerService and then determine whether the element is late. if late returns directly, otherwise it will continue to execute. This is followed by adding the value of the element to ourBuffer (OurBuffer is leftBuffer for processElement1 and rightBuffer for processElement2.); After that, each element in otherBuffer is traversed, and whether the time meets the requirements is judged one by one (I.e. ourtimestamps+relativelower bound < = timestamps < = ourtimestamps+relativeupper bound), do not meet the requirements of direct skip, meet the requirements of the call collect method (What is executed in the collect method is userFunction.processElement, that is, the processElement method that calls the user-defined ProcessJoinFunction.); The next step is to calculate cleanupTime and call InternaltimerService. RegistereVentTimer to register the Timer that cleans up the element.
  • IntervalJoinOperator implements onEventTime and onProcessingTime methods defined by Triggerable interface, where onProcessingTime does nothing, while onEventTime cleans up the element in leftBuffer or rightBuffer according to timestamp.

Summary

  • Flink’s intervalJoin operation requires KeyedStream and must be TimeCharacteristic.EventTime; ; IntervalJoin of KeyedStream creates and returns intervaljoin; IntervalJoin provides the between operation, which is used to set the lowerBound and upperBound of interval. this operation creates and returns interval join.
  • IntervalJoined provides the process operation and receives the ProcessJoinFunction; ; In the process operation, an IntervalJoinOperator is created, and then left.connect (right). keyby (keyselector1, keyselector2). transform (“intervaljoin”, outputtype, operator) returns SingleOutputStreamOperator
  • IntervalJoinOperator inherits abstract class of AbstractUdfStreamOperator and implements TwoInputStreamOperator and Triggerable interface. It covers AbstractUdfStreamOperator (StreamOperator definition) open and initializeState methods, which create InternalTimerService in the open method, passing the trigger parameter as this, that is, the trigger interface implemented by itself; Two MapState;, leftBuffer and rightBuffer, were created in the initializeState method. It implements the processElement1 and processElement2 methods defined by the TwoInputStreamOperator interface, and both processElement1 and processElement2 methods call the processElement method internally. Only the relativeLowerBound, relativeUpperBound and isLeft parameters passed are different, and the order of passing parameters of leftBuffer and rightBuffer is different.
  • The IntervalJoinOperator’s processElement method implements the time matching logic of intervalJoin. it first determines whether the element is late, if late returns directly, then adds the element to the buffer, and then traverses each element in otherBuffer to determine whether the time meets the requirements (I.e. ourtimestamps+relativelower bound < = timestamps < = ourtimestamps+relativeupper bound), do not meet the requirements of direct skip, meet the requirements of the call collect method (What is executed in the collect method is userFunction.processElement, that is, the processElement method that calls the user-defined ProcessJoinFunction.); The next step is to calculate cleanupTime and call InternaltimerService. RegistereVentTimer to register the Timer that cleans up the element.
  • IntervalJoinOperator implements onEventTime and onProcessingTime methods defined by Triggerable interface, where onProcessingTime does nothing, while onEventTime cleans up the element in leftBuffer or rightBuffer according to timestamp.

doc