Talk about flink’s Broadcast State

  flink

Order

This article mainly studies flink’s Broadcast State

Example

    @Test
    public void testBroadcastState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> originStream = env.addSource(new RandomWordSource());

        MapStateDescriptor<String, String> descriptor = new MapStateDescriptor("dynamicConfig", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        BroadcastStream<Tuple2<String,String>> configStream = env.addSource(new DynamicConfigSource()).broadcast(descriptor);

        BroadcastConnectedStream<String, Tuple2<String,String>> connectStream = originStream.connect(configStream);
        connectStream.process(new BroadcastProcessFunction<String, Tuple2<String,String>, Void>() {
            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
                ReadOnlyBroadcastState<String,String> config = ctx.getBroadcastState(descriptor);
                String configValue = config.get("demoConfigKey");
                //do some process base on the config
                LOGGER.info("process value:{},config:{}",value,configValue);
            }

            @Override
            public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Void> out) throws Exception {
                LOGGER.info("receive config item:{}",value);
                //update state
                ctx.getBroadcastState(descriptor).put(value.getField(0),value.getField(1));
            }
        });

        env.execute("testBroadcastState");
    }

public class DynamicConfigSource implements SourceFunction<Tuple2<String,String>> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
        long idx = 1;
        while (isRunning){
            ctx.collect(Tuple2.of("demoConfigKey","value" + idx));
            idx++;
            TimeUnit.SECONDS.sleep(10);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
  • Here, we simulated a configured source, refreshed the configuration regularly, and broadcast to each task.

MapStateDescriptor

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/state/MapStateDescriptor.java

@PublicEvolving
public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {

    private static final long serialVersionUID = 1L;

    /**
     * Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
     *
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keySerializer The type serializer for the keys in the state.
     * @param valueSerializer The type serializer for the values in the state.
     */
    public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
        super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
    }

    /**
     * Create a new {@code MapStateDescriptor} with the given name and the given type information.
     *
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keyTypeInfo The type information for the keys in the state.
     * @param valueTypeInfo The type information for the values in the state.
     */
    public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
        super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
    }

    /**
     * Create a new {@code MapStateDescriptor} with the given name and the given type information.
     *
     * <p>If this constructor fails (because it is not possible to describe the type via a class),
     * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
     *
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keyClass The class of the type of keys in the state.
     * @param valueClass The class of the type of values in the state.
     */
    public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
        super(name, new MapTypeInfo<>(keyClass, valueClass), null);
    }

    @Override
    public Type getType() {
        return Type.MAP;
    }

    /**
     * Gets the serializer for the keys in the state.
     *
     * @return The serializer for the keys in the state.
     */
    public TypeSerializer<UK> getKeySerializer() {
        final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
        if (!(rawSerializer instanceof MapSerializer)) {
            throw new IllegalStateException("Unexpected serializer type.");
        }

        return ((MapSerializer<UK, UV>) rawSerializer).getKeySerializer();
    }

    /**
     * Gets the serializer for the values in the state.
     *
     * @return The serializer for the values in the state.
     */
    public TypeSerializer<UV> getValueSerializer() {
        final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
        if (!(rawSerializer instanceof MapSerializer)) {
            throw new IllegalStateException("Unexpected serializer type.");
        }

        return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
    }
}
  • MapStateDescriptor inherits StateDescriptor, where state is MapState type and value is maptype.

DataStream.broadcast

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements
     * are broadcasted to every parallel instance of the next operation. In addition,
     * it implicitly as many {@link org.apache.flink.api.common.state.BroadcastState broadcast states}
     * as the specified descriptors which can be used to store the element of the stream.
     *
     * @param broadcastStateDescriptors the descriptors of the broadcast states to create.
     * @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)} to
     * create a {@link BroadcastConnectedStream} for further processing of the elements.
     */
    @PublicEvolving
    public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
        Preconditions.checkNotNull(broadcastStateDescriptors);
        final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
        return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
    }

    /**
     * Internal function for setting the partitioner for the DataStream.
     *
     * @param partitioner
     *            Partitioner to set.
     * @return The modified DataStream.
     */
    protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
        return new DataStream<>(this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner));
    }

    /**
     * Sets the partitioning of the {@link DataStream} so that the output elements
     * are broadcast to every parallel instance of the next operation.
     *
     * @return The DataStream with broadcast partitioning set.
     */
    public DataStream<T> broadcast() {
        return setConnectionType(new BroadcastPartitioner<T>());
    }
  • DataStream’s broadcast method calls setConnectionType first, and then uses MapStateDescriptor as a parameter to create BroadcastStream return; DataStream also has a parameterless broadcast method, which directly calls setConnectionType to return DataStream.

DataStream.connect

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * Creates a new {@link ConnectedStreams} by connecting
     * {@link DataStream} outputs of (possible) different types with each other.
     * The DataStreams connected using this operator can be used with
     * CoFunctions to apply joint transformations.
     *
     * @param dataStream
     *            The DataStream with which this stream will be connected.
     * @return The {@link ConnectedStreams}.
     */
    public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
        return new ConnectedStreams<>(environment, this, dataStream);
    }

    /**
     * Creates a new {@link BroadcastConnectedStream} by connecting the current
     * {@link DataStream} or {@link KeyedStream} with a {@link BroadcastStream}.
     *
     * <p>The latter can be created using the {@link #broadcast(MapStateDescriptor[])} method.
     *
     * <p>The resulting stream can be further processed using the {@code BroadcastConnectedStream.process(MyFunction)}
     * method, where {@code MyFunction} can be either a
     * {@link org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction KeyedBroadcastProcessFunction}
     * or a {@link org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction BroadcastProcessFunction}
     * depending on the current stream being a {@link KeyedStream} or not.
     *
     * @param broadcastStream The broadcast stream with the broadcast state to be connected with this stream.
     * @return The {@link BroadcastConnectedStream}.
     */
    @PublicEvolving
    public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
        return new BroadcastConnectedStream<>(
                environment,
                this,
                Preconditions.checkNotNull(broadcastStream),
                broadcastStream.getBroadcastStateDescriptor());
    }
  • The connect method parameter of DataStream can be either DataStream type or BroadcastStream type. if it is BroadcastStream type, BroadcastConnectedStream is returned, otherwise it is normal ConnectedStreams.

BroadcastConnectedStream.process

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java

@PublicEvolving
public class BroadcastConnectedStream<IN1, IN2> {

    private final StreamExecutionEnvironment environment;
    private final DataStream<IN1> inputStream1;
    private final BroadcastStream<IN2> inputStream2;
    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;

    protected BroadcastConnectedStream(
            final StreamExecutionEnvironment env,
            final DataStream<IN1> input1,
            final BroadcastStream<IN2> input2,
            final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
        this.environment = requireNonNull(env);
        this.inputStream1 = requireNonNull(input1);
        this.inputStream2 = requireNonNull(input2);
        this.broadcastStateDescriptors = requireNonNull(broadcastStateDescriptors);
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return environment;
    }

    /**
     * Returns the non-broadcast {@link DataStream}.
     *
     * @return The stream which, by convention, is not broadcasted.
     */
    public DataStream<IN1> getFirstInput() {
        return inputStream1;
    }

    /**
     * Returns the {@link BroadcastStream}.
     *
     * @return The stream which, by convention, is the broadcast one.
     */
    public BroadcastStream<IN2> getSecondInput() {
        return inputStream2;
    }

    /**
     * Gets the type of the first input.
     *
     * @return The type of the first input
     */
    public TypeInformation<IN1> getType1() {
        return inputStream1.getType();
    }

    /**
     * Gets the type of the second input.
     *
     * @return The type of the second input
     */
    public TypeInformation<IN2> getType2() {
        return inputStream2.getType();
    }

    /**
     * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
     * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
     *
     * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
     * @param <KS> The type of the keys in the keyed stream.
     * @param <OUT> The type of the output elements.
     * @return The transformed {@link DataStream}.
     */
    @PublicEvolving
    public <KS, OUT> SingleOutputStreamOperator<OUT> process(final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function) {

        TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
                function,
                KeyedBroadcastProcessFunction.class,
                1,
                2,
                3,
                TypeExtractor.NO_INDEX,
                getType1(),
                getType2(),
                Utils.getCallLocationName(),
                true);

        return process(function, outTypeInfo);
    }

    /**
     * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
     * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
     *
     * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
     * @param outTypeInfo The type of the output elements.
     * @param <KS> The type of the keys in the keyed stream.
     * @param <OUT> The type of the output elements.
     * @return The transformed {@link DataStream}.
     */
    @PublicEvolving
    public <KS, OUT> SingleOutputStreamOperator<OUT> process(
            final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
            final TypeInformation<OUT> outTypeInfo) {

        Preconditions.checkNotNull(function);
        Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
                "A KeyedBroadcastProcessFunction can only be used on a keyed stream.");

        TwoInputStreamOperator<IN1, IN2, OUT> operator =
                new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
        return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator);
    }

    /**
     * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
     * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
     *
     * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
     * @param <OUT> The type of the output elements.
     * @return The transformed {@link DataStream}.
     */
    @PublicEvolving
    public <OUT> SingleOutputStreamOperator<OUT> process(final BroadcastProcessFunction<IN1, IN2, OUT> function) {

        TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
                function,
                BroadcastProcessFunction.class,
                0,
                1,
                2,
                TypeExtractor.NO_INDEX,
                getType1(),
                getType2(),
                Utils.getCallLocationName(),
                true);

        return process(function, outTypeInfo);
    }

    /**
     * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
     * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
     *
     * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
     * @param outTypeInfo The type of the output elements.
     * @param <OUT> The type of the output elements.
     * @return The transformed {@link DataStream}.
     */
    @PublicEvolving
    public <OUT> SingleOutputStreamOperator<OUT> process(
            final BroadcastProcessFunction<IN1, IN2, OUT> function,
            final TypeInformation<OUT> outTypeInfo) {

        Preconditions.checkNotNull(function);
        Preconditions.checkArgument(!(inputStream1 instanceof KeyedStream),
                "A BroadcastProcessFunction can only be used on a non-keyed stream.");

        TwoInputStreamOperator<IN1, IN2, OUT> operator =
                new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
        return transform("Co-Process-Broadcast", outTypeInfo, operator);
    }

    @Internal
    private <OUT> SingleOutputStreamOperator<OUT> transform(
            final String functionName,
            final TypeInformation<OUT> outTypeInfo,
            final TwoInputStreamOperator<IN1, IN2, OUT> operator) {

        // read the output type of the input Transforms to coax out errors about MissingTypeInfo
        inputStream1.getType();
        inputStream2.getType();

        TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
                inputStream1.getTransformation(),
                inputStream2.getTransformation(),
                functionName,
                operator,
                outTypeInfo,
                environment.getParallelism());

        if (inputStream1 instanceof KeyedStream) {
            KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;
            TypeInformation<?> keyType1 = keyedInput1.getKeyType();
            transform.setStateKeySelectors(keyedInput1.getKeySelector(), null);
            transform.setStateKeyType(keyType1);
        }

        @SuppressWarnings({ "unchecked", "rawtypes" })
        SingleOutputStreamOperator<OUT> returnStream = new SingleOutputStreamOperator(environment, transform);

        getExecutionEnvironment().addOperator(transform);

        return returnStream;
    }

    protected <F> F clean(F f) {
        return getExecutionEnvironment().clean(f);
    }
}
  • Process receives two types of function, one is KeyedBroadcastProcessFunction; , the other is BroadcastProcessFunction; They all define processElement and processBroadcastElement abstract methods, but KeyedBroadcastProcessFunction defines an onTimer method, which is null by default and allows subclasses to override it.

Summary

  • There are several steps for the use of broadcast. 1 is to establish a MapStateDescriptor and then return BroadcastStream; through the DataStream.broadcast method; 2. The stream that needs to accept broadcast connects with BroadcastStream through DataStream.connect method to return to BroadcastConnectedStream; ; 3 is to process processElement and processBroadcastElement by broadcastconnectedstream.process method.
  • Process receives two types of function, one is KeyedBroadcastProcessFunction; , the other is BroadcastProcessFunction; They all define processElement and processBroadcastElement abstract methods, but KeyedBroadcastProcessFunction defines an onTimer method, which is null by default and allows subclasses to override it.
  • Broadcast State is map format, which broadcasts the state to each task. note that the state does not propagate across tasks, but only acts on the task in which it is located. Downstream tasks may receive broadcast event in different orders, so be careful when handling element depending on their arrival order. Checkpoint is also checked when checking point broadcaststate; In addition, Broadcast State only exists in memory, and there is no RocksDB state backend.

doc