Talk about the window coGroup operation of flink DataStream.

  flink

Order

This article mainly studies the window coGroup operation of flink DataStream.

Example

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
  • The basic usage of DataStream’s window coGroup operation is shown here.

DataStream.coGroup

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {

    //......

    public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
        return new CoGroupedStreams<>(this, otherStream);
    }

    //......
}
  • DataStream’s coGroup operation created CoGroupedStreams

CoGroupedStreams

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

@Public
public class CoGroupedStreams<T1, T2> {

    private final DataStream<T1> input1;

    private final DataStream<T2> input2;

    public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
        this.input1 = requireNonNull(input1);
        this.input2 = requireNonNull(input2);
    }

    public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
        Preconditions.checkNotNull(keySelector);
        final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
        return where(keySelector, keyType);
    }

    public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
        Preconditions.checkNotNull(keySelector);
        Preconditions.checkNotNull(keyType);
        return new Where<>(input1.clean(keySelector), keyType);
    }

    //.......
}
  • CoGroupedStreams provides a Where operation that specifies the keySelector for input1, which creates and returns a where object

Where

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

    @Public
    public class Where<KEY> {

        private final KeySelector<T1, KEY> keySelector1;
        private final TypeInformation<KEY> keyType;

        Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
            this.keySelector1 = keySelector1;
            this.keyType = keyType;
        }

        public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
            Preconditions.checkNotNull(keySelector);
            final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
            return equalTo(keySelector, otherKey);
        }

        public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
            Preconditions.checkNotNull(keySelector);
            Preconditions.checkNotNull(keyType);

            if (!keyType.equals(this.keyType)) {
                throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
                        "first key = " + this.keyType + " , second key = " + keyType);
            }

            return new EqualTo(input2.clean(keySelector));
        }

        //......
    }    
  • The Where object provides an equalTo operation that specifies the keySelector for input2, which creates and returns an EqualTo object

EqualTo

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

        @Public
        public class EqualTo {

            private final KeySelector<T2, KEY> keySelector2;

            EqualTo(KeySelector<T2, KEY> keySelector2) {
                this.keySelector2 = requireNonNull(keySelector2);
            }

            @PublicEvolving
            public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
            }
        }
  • The EqualTo object provides a window operation that creates and returns a WithWindow object.

WithWindow

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

    @Public
    public static class WithWindow<T1, T2, KEY, W extends Window> {
        private final DataStream<T1> input1;
        private final DataStream<T2> input2;

        private final KeySelector<T1, KEY> keySelector1;
        private final KeySelector<T2, KEY> keySelector2;

        private final TypeInformation<KEY> keyType;

        private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

        private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

        private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

        private final Time allowedLateness;

        private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;

        protected WithWindow(DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness) {
            this.input1 = input1;
            this.input2 = input2;

            this.keySelector1 = keySelector1;
            this.keySelector2 = keySelector2;
            this.keyType = keyType;

            this.windowAssigner = windowAssigner;
            this.trigger = trigger;
            this.evictor = evictor;

            this.allowedLateness = allowedLateness;
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, newTrigger, evictor, allowedLateness);
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, trigger, newEvictor, allowedLateness);
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, trigger, evictor, newLateness);
        }

        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

            TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
                function,
                input1.getType(),
                input2.getType(),
                "CoGroup",
                false);

            return apply(function, resultType);
        }

        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function) {
            return (SingleOutputStreamOperator<T>) apply(function);
        }

        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

            DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
                    .map(new Input1Tagger<T1, T2>())
                    .setParallelism(input1.getParallelism())
                    .returns(unionType);
            DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
                    .map(new Input2Tagger<T1, T2>())
                    .setParallelism(input2.getParallelism())
                    .returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

            // we explicitly create the keyed stream to manually pass the key type information in
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
                    .window(windowAssigner);

            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }

            return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            return (SingleOutputStreamOperator<T>) apply(function, resultType);
        }

        @VisibleForTesting
        Time getAllowedLateness() {
            return allowedLateness;
        }

        @VisibleForTesting
        WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
            return windowedStream;
        }
    }
  • WithWindow can set windowAssigner, trigger, evictor, allowedLateness, which provides apply operation (The with operation is marked as obsolete)
  • The apply operation receives CoGroupFunction, which first creates a UnionKeySelector based on two keySelector, and then maps the two Inputstream into Streams of TaggedUnion objects using Input1Tagger and Input2Tagger respectively. Then execute Tagged Input1. U NION (Tagged Input2) to obtain unionStream, then use UnionKeySelector to convert unionStream into KeyedStream, and then perform window operation on KeyedStream. The original windowAssigner, trigger, evictor, allowedLateness are all assigned to the past. Finally, the user-defined CoGroupFunction is wrapped as CoGroupWindowFunction, and then the windowedStream.apply method is called.
  • You can see the WindowedStream converted inside the apply operation, whose element type is TaggedUnion; ; KeyedStream used by WindowedStream, whose KeySelector is UnionKeySelector; ; KeyedStream is a DataStream based on TaggedUnion type, which is operated by Tagged Input1.Union (Tagged Input2); However, taggedInput1 and taggedInput2 are the result of map operation on the original input stream. The MapFunction used are Input1Tagger and Input2Tagger respectively.

CoGroupFunction

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/functions/CoGroupFunction.java

@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {

    void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
  • CoGroupFunction inherits the Function, which defines the coGroup method, which receives two element collections of type Iterable.

Input1Tagger and Input2Tagger

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

    private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
        private static final long serialVersionUID = 1L;

        @Override
        public TaggedUnion<T1, T2> map(T1 value) throws Exception {
            return TaggedUnion.one(value);
        }
    }

    private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
        private static final long serialVersionUID = 1L;

        @Override
        public TaggedUnion<T1, T2> map(T2 value) throws Exception {
            return TaggedUnion.two(value);
        }
    }
  • Input1Tagger and Input2Tagger implement MapFunction, and the type returned by this mapmethod is TaggedUnion.

TaggedUnion

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

    @Internal
    public static class TaggedUnion<T1, T2> {
        private final T1 one;
        private final T2 two;

        private TaggedUnion(T1 one, T2 two) {
            this.one = one;
            this.two = two;
        }

        public boolean isOne() {
            return one != null;
        }

        public boolean isTwo() {
            return two != null;
        }

        public T1 getOne() {
            return one;
        }

        public T2 getTwo() {
            return two;
        }

        public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
            return new TaggedUnion<>(one, null);
        }

        public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
            return new TaggedUnion<>(null, two);
        }
    }
  • TaggedUnion has two attributes, one and two. it provides two static factory methods, one and two. it can be seen that one is null or two is null. it is impossible to have both values at the same time.

UnionKeySelector

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

    private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
        private static final long serialVersionUID = 1L;

        private final KeySelector<T1, KEY> keySelector1;
        private final KeySelector<T2, KEY> keySelector2;

        public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2) {
            this.keySelector1 = keySelector1;
            this.keySelector2 = keySelector2;
        }

        @Override
        public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
            if (value.isOne()) {
                return keySelector1.getKey(value.getOne());
            } else {
                return keySelector2.getKey(value.getTwo());
            }
        }
    }
  • UnionKeySelector has two KeySelector properties. Its getKey operation is determined by TaggedUnion. If it is one, it uses keySelector 1. GetKey (value. GetOne ()), otherwise it uses keySelector 2. GetKey (value. GetTwo ())

DataStream.union

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {

    //......

    @SafeVarargs
    public final DataStream<T> union(DataStream<T>... streams) {
        List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
        unionedTransforms.add(this.transformation);

        for (DataStream<T> newStream : streams) {
            if (!getType().equals(newStream.getType())) {
                throw new IllegalArgumentException("Cannot union streams of different types: "
                        + getType() + " and " + newStream.getType());
            }

            unionedTransforms.add(newStream.getTransformation());
        }
        return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
    }

    //......
}
  • The union operation of DataStream uses UnionTransformation to create a new DataStream; Note that the union operation requires two streams to use the same type of element, which is why the apply operation WithWindow uses Input1Tagger and Input2Tagger to map the two inputstreams to TaggedUnion objects to unify the element types of the two streams.

CoGroupWindowFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

    private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
            extends WrappingFunction<CoGroupFunction<T1, T2, T>>
            implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

        private static final long serialVersionUID = 1L;

        public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
            super(userFunction);
        }

        @Override
        public void apply(KEY key,
                W window,
                Iterable<TaggedUnion<T1, T2>> values,
                Collector<T> out) throws Exception {

            List<T1> oneValues = new ArrayList<>();
            List<T2> twoValues = new ArrayList<>();

            for (TaggedUnion<T1, T2> val: values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }
    }
  • CoGroupWindowFunction inherits the WrappingFunction (WrappingFunction inherits AbstractRichFunction and covers the parent class's open, close, setRuntimeContext methods for managing wrappedFunction.), the WindowFunction interface is implemented. Its apply method decomposes TaggedUnion type Iterable data into oneValues and twoValues respectively, and then calls the user-defined CoGroupFunction coGroup method.

Summary

  • DataStream provides a coGroup method for performing window coGroup operations, which returns CoGroupedStreams; ; CoGroupedStreams mainly provides where operations to build Where objects; Where object mainly provides equalTo operation to build EqualTo object; The EqualTo object provides a window operation to build the WithWindow object; WithWindow can set windowAssigner, trigger, evictor, allowedLateness, which provides apply operation.
  • The apply operation of the WithWindow object of CoGroupedStreams receives CoGroupFunction, which internally creates a UnionKeySelector based on two keySelector. Then map the two inputstream using Input1Tagger and Input2Tagger respectively t o convert them into streams of TaggedUnion objects, and then execute tagged input1.union (tagged input2) to obtain unionStream. After that, UnionKeySelector is used to convert unionStream into KeyedStream. After that, window operation is performed on KeyedStream, and the original windowAssigner, trigger, evictor, allowedLateness are all assigned to the past. Finally, the user-defined CoGroupFunction is wrapped as CoGroupWindowFunction, and then the windowedStream.apply method is called.
  • The apply operation of the WithWindow object of CoGroupedStreams merges two Streams with the help of DataStream’s union operation class and then converts them into KeyedStream, where the two key classes are TaggedUnion and UnionKeySelector; respectively. TaggedUnion has two attributes, one and two. it provides two static factory methods, one and two. it can be seen that one is null or two is null. it is impossible to have two values at the same time. UnionKeySelector has two KeySelector properties. Its getKey operation is determined by TaggedUnion. If it is one, it uses keySelector 1. GetKey (value. GetOne ()), otherwise it uses keySelector 2. GetKey (value. GetTwo ()) (Use the Taggedunion class to unify the element types of the two stream, and then perform the Union operation well.)
  • CoGroupWindowFunction inherits the WrappingFunction (WrappingFunction inherits AbstractRichFunction and covers the parent class's open, close, setRuntimeContext methods for managing wrappedFunction.), the WindowFunction interface is implemented. Its apply method decomposes TaggedUnion type Iterable data into oneValues and twoValues respectively, and then calls the user-defined CoGroupFunction coGroup method.
  • CoGroupFunction inherits the Function, which defines the coGroup method, which receives two Iterable type element sets; JoinFunction or FlatJoinFunction is wrapped inside the apply method of the WithWindow object of JoinedStreams as CoGroupFunction(JoinFunction is wrapped with JoinCoGroupFunction. FlatJoinFunction uses FlatJoinCoGroupFunction wrapper) and then calls the apply method of WithWindow of CoGroupedStreams. However, JoinCoGroupFunction and FlatJoinCoGroupFunction inherit the WrappingFunction and implement the coGroup method defined by the CoGroupFunction interface at the same time. The default is to traverse the first set, traverse the second set for each element, and execute JoinFunction or FlatJoinFunction’s JoinMethod (The operation here does not do anything for the case where the set is empty, so the inner join effect is realized. Users can customize coGroupfunction to implement outer join by using cogroup operations.)

doc