Talk about the reduce operation of flink KeyedStream

  flink

Order

This article mainly studies the reduce operation of flink KeyedStream.

Example

    @Test
    public void testWordCount() throws Exception {
        // Checking input parameters
//        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
//        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text = env.fromElements(WORDS);

        DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0)
                        .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                            @Override
                            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                                System.out.println("value1:"+value1.f1+";value2:"+value2.f1);
                                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                            }
                        });

        // emit result
        System.out.println("Printing result to stdout. Use --output to specify output path.");
        counts.print();

        // execute program
        env.execute("Streaming WordCount");
    }
  • Here, the KeyedStream is reduced, and the ReduceFunction is customized. word counts are accumulated in the reduce method.

KeyedStream.reduce

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
    //......

    /**
     * Applies a reduce transformation on the grouped data stream grouped on by
     * the given key position. The {@link ReduceFunction} will receive input
     * values based on the key value. Only input values with the same key will
     * go to the same reducer.
     *
     * @param reducer
     *            The {@link ReduceFunction} that will be called for every
     *            element of the input values with the same key.
     * @return The transformed DataStream.
     */
    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
        return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
                clean(reducer), getType().createSerializer(getExecutionConfig())));
    }

    @Override
    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String operatorName,
            TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

        SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator);

        // inject the key selector and key type
        OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
        transform.setStateKeySelector(keySelector);
        transform.setStateKeyType(keyType);

        return returnStream;
    }

    //......
}
  • The reduce method of KeyedStream calls the transform method, while the constructed OneInputStreamOperator is StreamGroupedReduce

ReduceFunction

flink-core-1.7.0-sources.jar! /org/apache/flink/api/common/functions/ReduceFunction.java

@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {

    /**
     * The core method of ReduceFunction, combining two values into one value of the same type.
     * The reduce function is consecutively applied to all values of a group until only a single value remains.
     *
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}
  • ReduceFunction defines the reduce method, which is mainly used to operate two values of the same type into one value of the same type. the first parameter is the result of the previous reduce and the second parameter is the current element

Task.run

flink-runtime_2.11-1.7.0-sources.jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......
    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {

        // ----------------------------
        //  Initial State transition
        // ----------------------------
        //......

        // all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;

        try {

            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
        }
        catch (Throwable t) {
            //......
        }
        finally {
            //......
        }    
    }
}
  • The run method of a Task calls invokable.invoke (), where invoke is OneInputStreamTask, and OneInputStreamTask inherits StreamTask, and the invoke () method actually called here is inside StreamTask.

StreamTask.invoke

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/tasks/StreamTask.java

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

    //......

    protected abstract void run() throws Exception;

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            // -------- Initialize ---------
            LOG.debug("Initializing {}.", getName());

            asyncOperationsThreadPool = Executors.newCachedThreadPool();

            CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();

            synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
                getExecutionConfig().isFailTaskOnCheckpointError(),
                getEnvironment());

            asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);

            stateBackend = createStateBackend();
            checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

            // if the clock is not already set, then assign a default TimeServiceProvider
            if (timerService == null) {
                ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                    "Time Trigger for " + getName(), getUserCodeClassLoader());

                timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
            }

            operatorChain = new OperatorChain<>(this, streamRecordWriters);
            headOperator = operatorChain.getHeadOperator();

            // task specific initialization
            init();

            // save the work of reloading state, etc, if the task is already canceled
            if (canceled) {
                throw new CancelTaskException();
            }

            // -------- Invoke --------
            LOG.debug("Invoking {}", getName());

            // we need to make sure that any triggers scheduled in open() cannot be
            // executed before all operators are opened
            synchronized (lock) {

                // both the following operations are protected by the lock
                // so that we avoid race conditions in the case that initializeState()
                // registers a timer, that fires before the open() is called.

                initializeState();
                openAllOperators();
            }

            // final check to exit early before starting to run
            if (canceled) {
                throw new CancelTaskException();
            }

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            //......
        }
    }
}
  • The invoke method of StreamTask calls the run method, which is abstract and implemented by subclasses. this is the run method of OneInputStreamTask.

OneInputStreamTask.run

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

@Internal
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {

    private StreamInputProcessor<IN> inputProcessor;

    private volatile boolean running = true;

    private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();

    /**
     * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
     *
     * @param env The task environment for this task.
     */
    public OneInputStreamTask(Environment env) {
        super(env);
    }

    /**
     * Constructor for initialization, possibly with initial state (recovery / savepoint / etc).
     *
     * <p>This constructor accepts a special {@link ProcessingTimeService}. By default (and if
     * null is passes for the time provider) a {@link SystemProcessingTimeService DefaultTimerService}
     * will be used.
     *
     * @param env The task environment for this task.
     * @param timeProvider Optionally, a specific time provider to use.
     */
    @VisibleForTesting
    public OneInputStreamTask(
            Environment env,
            @Nullable ProcessingTimeService timeProvider) {
        super(env, timeProvider);
    }

    @Override
    public void init() throws Exception {
        StreamConfig configuration = getConfiguration();

        TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
        int numberOfInputs = configuration.getNumberOfInputs();

        if (numberOfInputs > 0) {
            InputGate[] inputGates = getEnvironment().getAllInputGates();

            inputProcessor = new StreamInputProcessor<>(
                    inputGates,
                    inSerializer,
                    this,
                    configuration.getCheckpointMode(),
                    getCheckpointLock(),
                    getEnvironment().getIOManager(),
                    getEnvironment().getTaskManagerInfo().getConfiguration(),
                    getStreamStatusMaintainer(),
                    this.headOperator,
                    getEnvironment().getMetricGroup().getIOMetricGroup(),
                    inputWatermarkGauge);
        }
        headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
        // wrap watermark gauge since registered metrics must be unique
        getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
    }

    @Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }

    @Override
    protected void cleanup() throws Exception {
        if (inputProcessor != null) {
            inputProcessor.cleanup();
        }
    }

    @Override
    protected void cancelTask() {
        running = false;
    }
}
  • The run method of OneInputStreamTask continuously calls inputProcessor.processInput (), inputProcessor here is StreamInputProcessor.

StreamInputProcessor.processInput

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

@Internal
public class StreamInputProcessor<IN> {

    //......

    public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

            //......
        }
    }

    //......
}
  • The processInput method of StreamInputProcessor will continuously process nextRecord in the while true loop. here, different processing will be done according to different types of StreamElement. if it is common data, streamOperator.processElement will be called for processing. streamOperator here is StreamGroupedReduce

StreamGroupedReduce.processElement

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/StreamGroupedReduce.java

/**
 * A {@link StreamOperator} for executing a {@link ReduceFunction} on a
 * {@link org.apache.flink.streaming.api.datastream.KeyedStream}.
 */

@Internal
public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
        implements OneInputStreamOperator<IN, IN> {

    private static final long serialVersionUID = 1L;

    private static final String STATE_NAME = "_op_state";

    private transient ValueState<IN> values;

    private TypeSerializer<IN> serializer;

    public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) {
        super(reducer);
        this.serializer = serializer;
    }

    @Override
    public void open() throws Exception {
        super.open();
        ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
        values = getPartitionedState(stateId);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        IN value = element.getValue();
        IN currentValue = values.value();

        if (currentValue != null) {
            IN reduced = userFunction.reduce(currentValue, value);
            values.update(reduced);
            output.collect(element.replace(reduced));
        } else {
            values.update(value);
            output.collect(element.replace(value));
        }
    }
}
  • StreamGroupedReduce uses ValueState to store the result value of the reduce operation. In the processelement method, the reduce operation of the userFunction is called. The userFunction is the user-defined ReduceFunction. The first parameter of reduce is the value of ValueState, which is the result value of the previous reduce operation. Then the second parameter is the value of the current Element. The result will be update to ValueState after the reduce operation of userFunction is completed.

Summary

  • KeyedStream’s reduce method, in which the transform method is called, and the constructed OneInputStreamOperator is StreamGroupedReduce; ; The reduce method receives a ReduceFunction, which defines a reduce method to operate two values of the same type into one value of the same type
  • The run method of Task calls invokable.invoke (), where invoke is OnInputStreamTask, and OnInputStreamTask inherits StreamTask. The Invoke () method actually called here is inside StreamTask; The invoke method of StreamTask calls the run method, which is abstract and implemented by subclasses. here is the run method of OneInputStreamTask. The run method of OneInputStreamTask will repeatedly c all inputProcessor.processInput (), inputProcessor here is StreamInputProcessor; ; The processInput method of StreamInputProcessor will continuously process nextRecord in the while true loop. here, different processing will be done according to different types of StreamElement. if it is common data, streamOperator.processElement will be called for processing. streamOperator here is StreamGroupedReduce
  • The processElement method of StreamGroupedReduce calls the reduce operation of userFunction. the first parameter is the value of ValueState, that is, the result value of the previous reduce operation, and then the second parameter is the value of the current element. The result will be update to ValueState after the reduce operation of userFunction is completed.

doc