Talk about flink DataStream’s connect operation

  flink

Order

This article mainly studies the connect operation of flink DataStream.

DataStream.connect

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {

    //......

    public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
        return new ConnectedStreams<>(environment, this, dataStream);
    }

    @PublicEvolving
    public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
        return new BroadcastConnectedStream<>(
                environment,
                this,
                Preconditions.checkNotNull(broadcastStream),
                broadcastStream.getBroadcastStateDescriptor());
    }

    //......
}
  • DataStream’s connect operation creates ConnectedStreams or BroadcastConnectedStream, which use two generics, that is, it does not require that the element of two DataStreams are of the same type.

ConnectedStreams

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/ConnectedStreams.java

@Public
public class ConnectedStreams<IN1, IN2> {

    protected final StreamExecutionEnvironment environment;
    protected final DataStream<IN1> inputStream1;
    protected final DataStream<IN2> inputStream2;

    protected ConnectedStreams(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) {
        this.environment = requireNonNull(env);
        this.inputStream1 = requireNonNull(input1);
        this.inputStream2 = requireNonNull(input2);
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return environment;
    }

    public DataStream<IN1> getFirstInput() {
        return inputStream1;
    }

    public DataStream<IN2> getSecondInput() {
        return inputStream2;
    }

    public TypeInformation<IN1> getType1() {
        return inputStream1.getType();
    }

    public TypeInformation<IN2> getType2() {
        return inputStream2.getType();
    }

    public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) {
        return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1),
                inputStream2.keyBy(keyPosition2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) {
        return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1),
                inputStream2.keyBy(keyPositions2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) {
        return new ConnectedStreams<>(environment, inputStream1.keyBy(field1),
                inputStream2.keyBy(field2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) {
        return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1),
                inputStream2.keyBy(fields2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
        return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1),
                inputStream2.keyBy(keySelector2));
    }

    public <KEY> ConnectedStreams<IN1, IN2> keyBy(
            KeySelector<IN1, KEY> keySelector1,
            KeySelector<IN2, KEY> keySelector2,
            TypeInformation<KEY> keyType) {
        return new ConnectedStreams<>(
            environment,
            inputStream1.keyBy(keySelector1, keyType),
            inputStream2.keyBy(keySelector2, keyType));
    }

    public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {

        TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            coMapper,
            CoMapFunction.class,
            0,
            1,
            2,
            TypeExtractor.NO_INDEX,
            getType1(),
            getType2(),
            Utils.getCallLocationName(),
            true);

        return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));

    }

    public <R> SingleOutputStreamOperator<R> flatMap(
            CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {

        TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            coFlatMapper,
            CoFlatMapFunction.class,
            0,
            1,
            2,
            TypeExtractor.NO_INDEX,
            getType1(),
            getType2(),
            Utils.getCallLocationName(),
            true);

        return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(
            CoProcessFunction<IN1, IN2, R> coProcessFunction) {

        TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            coProcessFunction,
            CoProcessFunction.class,
            0,
            1,
            2,
            TypeExtractor.NO_INDEX,
            getType1(),
            getType2(),
            Utils.getCallLocationName(),
            true);

        return process(coProcessFunction, outTypeInfo);
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(
            CoProcessFunction<IN1, IN2, R> coProcessFunction,
            TypeInformation<R> outputType) {

        TwoInputStreamOperator<IN1, IN2, R> operator;

        if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
            operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction));
        } else {
            operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction));
        }

        return transform("Co-Process", outputType, operator);
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String functionName,
            TypeInformation<R> outTypeInfo,
            TwoInputStreamOperator<IN1, IN2, R> operator) {

        // read the output type of the input Transforms to coax out errors about MissingTypeInfo
        inputStream1.getType();
        inputStream2.getType();

        TwoInputTransformation<IN1, IN2, R> transform = new TwoInputTransformation<>(
                inputStream1.getTransformation(),
                inputStream2.getTransformation(),
                functionName,
                operator,
                outTypeInfo,
                environment.getParallelism());

        if (inputStream1 instanceof KeyedStream && inputStream2 instanceof KeyedStream) {
            KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;
            KeyedStream<IN2, ?> keyedInput2 = (KeyedStream<IN2, ?>) inputStream2;

            TypeInformation<?> keyType1 = keyedInput1.getKeyType();
            TypeInformation<?> keyType2 = keyedInput2.getKeyType();
            if (!(keyType1.canEqual(keyType2) && keyType1.equals(keyType2))) {
                throw new UnsupportedOperationException("Key types if input KeyedStreams " +
                        "don't match: " + keyType1 + " and " + keyType2 + ".");
            }

            transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector());
            transform.setStateKeyType(keyType1);
        }

        @SuppressWarnings({ "unchecked", "rawtypes" })
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, transform);

        getExecutionEnvironment().addOperator(transform);

        return returnStream;
    }
}
  • Connectedstream provides the keyBy method to specify the keySelector of two Streams, and provides map, flatMap, process, and transform operations, of which the first three operations are finally called transform operations.
  • The transform operation receives an operator of type TwoInputStreamOperator and then converts it to SingleOutputStreamOperator
  • Map operation receives CoMapFunction, flatMap operation receives CoFlatMapFunction, and process operation receives CoProcessFunction.

CoMapFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/co/CoMapFunction.java

@Public
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {

    OUT map1(IN1 value) throws Exception;

    OUT map2(IN2 value) throws Exception;
}
  • CoMapFunction inherits Function and defines map1 and map2 methods.

CoFlatMapFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java

@Public
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {

    void flatMap1(IN1 value, Collector<OUT> out) throws Exception;

    void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}
  • CoFlatMapFunction inherits Function and defines map1 and map2 methods. unlike CoMapFunction, CoFlatMapFunction’s map1 and map2 methods have more Collector parameters

CoProcessFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/co/CoProcessFunction.java

@PublicEvolving
public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

    public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}

    public abstract class Context {

        public abstract Long timestamp();

        public abstract TimerService timerService();

        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }

    public abstract class OnTimerContext extends Context {
        /**
         * The {@link TimeDomain} of the firing timer.
         */
        public abstract TimeDomain timeDomain();
    }
}
  • CoProcessFunction inherits AbstractRichFunction, which defines processElement1 and processElement2 methods. unlike CoFlatMapFunction, it defines these two methods with more Context parameters.
  • CoProcessFunction defines Context and OnTimerContext. Context can be accessed in processElement1 and processElement2 methods. Context provides timestamp, timerService and output methods.
  • Another difference between CoProcessFunction and CoFlatMapFunction is that it can use timerService to register Timer and then implement the logic of response in onTimer method.

Summary

  • DataStream’s connect operation creates ConnectedStreams or BroadcastConnectedStream, which use two generics, that is, it does not require that the element of two DataStreams are of the same type.
  • Connectedstream provides the keyBy method to specify the keySelector of two Streams, and provides map, flatMap, process, and transform operations, of which the first three operations are all called transform operations. The transform operation receives an operator of type TwoInputStreamOperator and then converts it to SingleOutputStreamOperatorï¼› ; Map operation receives CoMapFunction, flatMap operation receives CoFlatMapFunction, and process operation receives CoProcessFunction.
  • The difference between CoFlatMapFunction and CoMapFunction is that CoFlatMapFunction’s map1 and map2 methods have more Collector parameters. CoProcess Function defines processElement1 and processElement2 methods. unlike CoFlatMapFunction, it defines these two methods with more Context parameters. Another difference between CoProcessFunction and CoFlatMapFunction is that it can use time rService to register Timer and then implement the logic of response in onTimer method.

doc