Talk about the join operation of flink DataStream.

  flink

Order

This article mainly studies the join operation of flink DataStream.

Example

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
  • Here, join is called first to merge with another stream, returning JoinedStreams, and then the Where operation of JoinedStreams can be called to construct the where object construction condition. Where has equalTo operation to cons truct EqualTo, while EqualTo has window operation to construct WithWindow, and WithWindow can set windowAssigner, trigger, evictor, allowedLateness, which provides apply operation.

DataStream.join

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {
    //......

    /**
     * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
     * and window can be specified.
     */
    public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
        return new JoinedStreams<>(this, otherStream);
    }

    //......
}
  • DataStream provides a join method for performing join operations, which returns JoinedStreams

JoinedStreams

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

@Public
public class JoinedStreams<T1, T2> {

    /** The first input stream. */
    private final DataStream<T1> input1;

    /** The second input stream. */
    private final DataStream<T2> input2;

    public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
        this.input1 = requireNonNull(input1);
        this.input2 = requireNonNull(input2);
    }

    public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
        requireNonNull(keySelector);
        final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
        return where(keySelector, keyType);
    }

    public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
        requireNonNull(keySelector);
        requireNonNull(keyType);
        return new Where<>(input1.clean(keySelector), keyType);
    }

    //......
}
  • JoinedStreams mainly provides Where operations to build where objects.

Where

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

    @Public
    public class Where<KEY> {

        private final KeySelector<T1, KEY> keySelector1;
        private final TypeInformation<KEY> keyType;

        Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
            this.keySelector1 = keySelector1;
            this.keyType = keyType;
        }

        public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
            requireNonNull(keySelector);
            final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
            return equalTo(keySelector, otherKey);
        }

        public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
            requireNonNull(keySelector);
            requireNonNull(keyType);

            if (!keyType.equals(this.keyType)) {
                throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
                        "first key = " + this.keyType + " , second key = " + keyType);
            }

            return new EqualTo(input2.clean(keySelector));
        }

        //......

    }
  • Where object mainly provides equalTo operation to build EqualTo object.

EqualTo

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

        @Public
        public class EqualTo {

            private final KeySelector<T2, KEY> keySelector2;

            EqualTo(KeySelector<T2, KEY> keySelector2) {
                this.keySelector2 = requireNonNull(keySelector2);
            }

            /**
             * Specifies the window on which the join operation works.
             */
            @PublicEvolving
            public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
            }
        }
  • The EqualTo object provides a window operation for building WithWindow objects.

WithWindow

/flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

    @Public
    public static class WithWindow<T1, T2, KEY, W extends Window> {

        private final DataStream<T1> input1;
        private final DataStream<T2> input2;

        private final KeySelector<T1, KEY> keySelector1;
        private final KeySelector<T2, KEY> keySelector2;
        private final TypeInformation<KEY> keyType;

        private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

        private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

        private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

        private final Time allowedLateness;

        private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;

        @PublicEvolving
        protected WithWindow(DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness) {

            this.input1 = requireNonNull(input1);
            this.input2 = requireNonNull(input2);

            this.keySelector1 = requireNonNull(keySelector1);
            this.keySelector2 = requireNonNull(keySelector2);
            this.keyType = requireNonNull(keyType);

            this.windowAssigner = requireNonNull(windowAssigner);

            this.trigger = trigger;
            this.evictor = evictor;

            this.allowedLateness = allowedLateness;
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, newTrigger, evictor, allowedLateness);
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, trigger, newEvictor, allowedLateness);
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                windowAssigner, trigger, evictor, newLateness);
        }

        public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
            TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
                function,
                JoinFunction.class,
                0,
                1,
                2,
                TypeExtractor.NO_INDEX,
                input1.getType(),
                input2.getType(),
                "Join",
                false);

            return apply(function, resultType);
        }

        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) {
            return (SingleOutputStreamOperator<T>) apply(function);
        }

        public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            coGroupedWindowedStream = input1.coGroup(input2)
                .where(keySelector1)
                .equalTo(keySelector2)
                .window(windowAssigner)
                .trigger(trigger)
                .evictor(evictor)
                .allowedLateness(allowedLateness);

            return coGroupedWindowedStream
                    .apply(new FlatJoinCoGroupFunction<>(function), resultType);
        }

        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            return (SingleOutputStreamOperator<T>) apply(function, resultType);
        }

        public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
            TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
                function,
                FlatJoinFunction.class,
                0,
                1,
                2,
                new int[]{2, 0},
                input1.getType(),
                input2.getType(),
                "Join",
                false);

            return apply(function, resultType);
        }

        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) {
            return (SingleOutputStreamOperator<T>) apply(function);
        }

        public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            coGroupedWindowedStream = input1.coGroup(input2)
                .where(keySelector1)
                .equalTo(keySelector2)
                .window(windowAssigner)
                .trigger(trigger)
                .evictor(evictor)
                .allowedLateness(allowedLateness);

            return coGroupedWindowedStream
                    .apply(new JoinCoGroupFunction<>(function), resultType);
        }

        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            return (SingleOutputStreamOperator<T>) apply(function, resultType);
        }

        @VisibleForTesting
        Time getAllowedLateness() {
            return allowedLateness;
        }

        @VisibleForTesting
        CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() {
            return coGroupedWindowedStream;
        }
    }
  • WithWindow can set windowAssigner, trigger, evictor, allowedLateness, which provides apply operation (The with operation is marked as obsolete)
  • The apply operation can receive JoinFunction or FlatJoinFunction, which internally uses DataStream’s coGroup method to create CoGroupedStreams. After that, set its own where and equalTo keySelector, windowAssigner, trigger, evictor, allowedLateness to CoGroupedStreams, and finally call the apply method of the WithWindow object of CoGroupedStreams.
  • The apply method of the WithWindow object of CoGroupedStreams is different from the apply method parameter of the WithWindow object of JoinedStreams. The apply method of the WithWindow of CoGroupedStreams receives CoGroupFunction. Therefore, JoinFunction or FlatJoinFunction is wrapped as CoGroupFunction (JoinFunction is wrapped with JoinCoGroupFunction and FlatJoinFunction is wrapped with FlatJoinCoGroupFunction.) to CoGroupedStreams’ WithWindow’s apply method

JoinFunction

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/functions/JoinFunction.java

@Public
@FunctionalInterface
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {

    /**
     * The join method, called once per joined pair of elements.
     *
     * @param first The element from first input.
     * @param second The element from second input.
     * @return The resulting element.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    OUT join(IN1 first, IN2 second) throws Exception;
}
  • JoinFunction inherits Function and Serializable, which defines join operation and defaults to inner join semantics. if outer join is required, CoGroupFunction can be used.

FlatJoinFunction

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/functions/FlatJoinFunction.java

@Public
@FunctionalInterface
public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {

    /**
     * The join method, called once per joined pair of elements.
     *
     * @param first The element from first input.
     * @param second The element from second input.
     * @param out The collector used to return zero, one, or more elements.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
}
  • FlatjoinFunction inherits Function and Serializable. It defines Join operation. The default is the semantics of inner join. If outer join is required, CoGroupFunction; can be used. Different from the join method of JoinFunction, the join method of FlatJoinFunction has more Collector parameters and can be used to emit 0, 1 or more pieces of data, so it is named Flat.

CoGroupedStreams

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Public
public class CoGroupedStreams<T1, T2> {
    //......

@Public
    public static class WithWindow<T1, T2, KEY, W extends Window> {
        private final DataStream<T1> input1;
        private final DataStream<T2> input2;

        private final KeySelector<T1, KEY> keySelector1;
        private final KeySelector<T2, KEY> keySelector2;

        private final TypeInformation<KEY> keyType;

        private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

        private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

        private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

        private final Time allowedLateness;

        private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;

        protected WithWindow(DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness) {
            this.input1 = input1;
            this.input2 = input2;

            this.keySelector1 = keySelector1;
            this.keySelector2 = keySelector2;
            this.keyType = keyType;

            this.windowAssigner = windowAssigner;
            this.trigger = trigger;
            this.evictor = evictor;

            this.allowedLateness = allowedLateness;
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, newTrigger, evictor, allowedLateness);
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, trigger, newEvictor, allowedLateness);
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, trigger, evictor, newLateness);
        }

        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

            TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
                function,
                input1.getType(),
                input2.getType(),
                "CoGroup",
                false);

            return apply(function, resultType);
        }

        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

            DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
                    .map(new Input1Tagger<T1, T2>())
                    .setParallelism(input1.getParallelism())
                    .returns(unionType);
            DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
                    .map(new Input2Tagger<T1, T2>())
                    .setParallelism(input2.getParallelism())
                    .returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

            // we explicitly create the keyed stream to manually pass the key type information in
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
                    .window(windowAssigner);

            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }

            return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

        //......

    }

    //......
}
  • The whole class structure of CoGroupedStreams is very similar to JoinedStreams. CoGroupedStreams provide where operations to build Where objects. Wher e object mainly provides equalTo operation to build EqualTo object; The EqualTo object provides a window operation to build the WithWindow object; WithWindow can set windowAssigner, trigger, evictor, allowedLateness, which provides apply operation; One of the differences is that the Function received by the apply operation of the WithWindow object defined by CoGroupedStreams is the CoGroupFunction type, while the Function received by the apply operation of the WithWindow object defined by JoinedStreams is the JoinFunction or FlatJoinFunction type.

CoGroupFunction

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/functions/CoGroupFunction.java

@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {

    /**
     * This method must be implemented to provide a user implementation of a
     * coGroup. It is called for each pair of element groups where the elements share the
     * same key.
     *
     * @param first The records from the first input.
     * @param second The records from the second.
     * @param out A collector to return elements.
     *
     * @throws Exception The function may throw Exceptions, which will cause the program to cancel,
     *                   and may trigger the recovery logic.
     */
    void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
  • CoGroupFunction inherits Function and Serializable. it defines the coGroup operation and can be used to implement outer join. its parameter uses Iterab le, while the join parameter of JoinFunction and FlatJoinFunction uses a single object type.

WrappingFunction

flink-java-1.7.0-sources.jar! /org/apache/flink/api/java/operators/translation/WrappingFunction.java

@Internal
public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    protected T wrappedFunction;

    protected WrappingFunction(T wrappedFunction) {
        this.wrappedFunction = wrappedFunction;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        FunctionUtils.openFunction(this.wrappedFunction, parameters);
    }

    @Override
    public void close() throws Exception {
        FunctionUtils.closeFunction(this.wrappedFunction);
    }

    @Override
    public void setRuntimeContext(RuntimeContext t) {
        super.setRuntimeContext(t);

        FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
    }

    public T getWrappedFunction () {
        return this.wrappedFunction;
    }
}
  • WrappingFunction inherits AbstractRichFunction, which covers the parent class’s open, close, setRuntimeContext methods to manage wrappedFunction.

JoinCoGroupFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

    /**
     * CoGroup function that does a nested-loop join to get the join result.
     */
    private static class JoinCoGroupFunction<T1, T2, T>
            extends WrappingFunction<JoinFunction<T1, T2, T>>
            implements CoGroupFunction<T1, T2, T> {
        private static final long serialVersionUID = 1L;

        public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
            super(wrappedFunction);
        }

        @Override
        public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
            for (T1 val1: first) {
                for (T2 val2: second) {
                    out.collect(wrappedFunction.join(val1, val2));
                }
            }
        }
    }
  • JoinCoGroupFunction inherits the WrappingFunction and implements the coGroup method defined by the CoGroupFunction interface. By default, it traverses the first set, traverses the second set for each element, executes wrappedFunction.join one by one, and then transmits Join data
  • JoinedStreams defines a private static class JoinCoGroupFunction, which is used inside the apply method of the WithWindow object of JoinedStreams to wrap JoinFunction, and then to call the apply method of the WithWindow of CoGroupedStreams.
  • The join method defined by JoinFunction receives two object type parameters, while the coGroup method defined by Join Function receives two Iterable type parameters

FlatJoinCoGroupFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/JoinedStreams.java

    /**
     * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
     */
    private static class FlatJoinCoGroupFunction<T1, T2, T>
            extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
            implements CoGroupFunction<T1, T2, T> {
        private static final long serialVersionUID = 1L;

        public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
            super(wrappedFunction);
        }

        @Override
        public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
            for (T1 val1: first) {
                for (T2 val2: second) {
                    wrappedFunction.join(val1, val2, out);
                }
            }
        }
    }
  • FlatjoinCoGroupFunction inherits the WrappingFunction and implements the coGroup method defined by the CoGroupFunction interface. The default is to tra verse the first set, traverse the second set for each element, execute wrappedFunction.join one by one, and then transmit Join data
  • JoinedStreams defines the private static class FlatJoinCoGroupFunction, which is used inside the apply method of JoinedStreams’ WithWindow object to wrap FlatJoinFunction, and then to call the apply method of CoGroupedStreams’ WithWindow.
  • The join method defined by FlatJoinFunction receives two object type parameters, while the coGroup method defined by FlatJoinCoGroupFunction receives two Iterable type parameters

Summary

  • DataStream provides a join method for performing join operations, which returns JoinedStreams; ; JoinedStreams mainly provides Where operations to build where objects; Where object mainly provides equalTo operation to build EqualTo object; The EqualTo object provides a window operation to build the WithWindow object; WithWindow can set windowAssigner, trigger, evictor, allowedLateness, which provides apply operation.
  • The apply operation can receive JoinFunction or FlatJoinFunction, which internally uses DataStream’s coGroup method to create CoGroupedStreams. After that, set their own where and equalTo’s keySelector, windowAssigner, trigger, evictor, allowedLateness to CoGroupedStreams, and finally call the apply method of the WithWindow object of CoGroupedStreams. JoinFunction and FlatJoinFunction both inherit Function and Serializa ble, which define joinoperation. the default is the semantics of inner join. if outjoinis required, CoGroupFunction; can be used. The difference between FlatJoinFunction and JoinFunction is that FlatJoinFunction has more Collector parameters, which can be used to transmit 0, 1 or more pieces of data, so it is named Flat.
  • The apply method of the WithWindow object of CoGroupedStreams is different from the apply method parameter of the WithWindow object of JoinedStreams. The apply method of the WithWindow of CoGroupedStreams receives CoGroupFunction. Therefore, JoinFunction or FlatJoinFunction is wrapped as CoGroupFunction (JoinFunction is wrapped with JoinCoGroupFunction and FlatJoinFunction is wrapped with FlatJoinCoGroupFunction.), and then call the apply method of WithWindow of CoGroupedStreams; JoinCoGroupFunction and FlatJoinCoGroupFunction both inherited the WrappingFunction (It inherits AbstractRichFunction, which covers the parent class's open, close, setRuntimeContext methods to manage wrappedFunction.), and implements the coGroup method defined by the CoGroupFunction interface at the same time. The difference is that one is to wrap the JoinFunction, the other is to wrap the FlatJoinFunction, and the difference is that the latter is to wrap the FlatJoinFunction, so the JoinMethod passes out parameters more often.

doc