Talk about the iterate operation of flink DataStream

  flink

Order

This article mainly studies the iterate operation of flink DataStream.

Example

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});
  • This example shows some basic usage of IterativeStream. iterate is used to create IterativeStream, and Iterate’s closeWith method is used to close feedbackStream.

DataStream.iterate

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {
    //......

    @PublicEvolving
    public IterativeStream<T> iterate() {
        return new IterativeStream<>(this, 0);
    }

    @PublicEvolving
    public IterativeStream<T> iterate(long maxWaitTimeMillis) {
        return new IterativeStream<>(this, maxWaitTimeMillis);
    }

    //......
}
  • DataStream provides two iterate methods, which create and return an IterativeStream. The parameterless iterate method has a maxWaitTimeMillis of 0

IterativeStream

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/IterativeStream.java

@PublicEvolving
public class IterativeStream<T> extends SingleOutputStreamOperator<T> {

    // We store these so that we can create a co-iteration if we need to
    private DataStream<T> originalInput;
    private long maxWaitTime;

    protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
        super(dataStream.getExecutionEnvironment(),
                new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
        this.originalInput = dataStream;
        this.maxWaitTime = maxWaitTime;
        setBufferTimeout(dataStream.environment.getBufferTimeout());
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public DataStream<T> closeWith(DataStream<T> feedbackStream) {

        Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

        if (!predecessors.contains(this.transformation)) {
            throw new UnsupportedOperationException(
                    "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
        }

        ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());

        return feedbackStream;
    }

    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
        return withFeedbackType(TypeInformation.of(feedbackTypeClass));
    }

    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeHint<F> feedbackTypeHint) {
        return withFeedbackType(TypeInformation.of(feedbackTypeHint));
    }

    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
        return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime);
    }

    @Public
    public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {

        private CoFeedbackTransformation<F> coFeedbackTransformation;

        public ConnectedIterativeStreams(DataStream<I> input,
                TypeInformation<F> feedbackType,
                long waitTime) {
            super(input.getExecutionEnvironment(),
                    input,
                    new DataStream<>(input.getExecutionEnvironment(),
                            new CoFeedbackTransformation<>(input.getParallelism(),
                                    feedbackType,
                                    waitTime)));
            this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
        }

        public DataStream<F> closeWith(DataStream<F> feedbackStream) {

            Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

            if (!predecessors.contains(this.coFeedbackTransformation)) {
                throw new UnsupportedOperationException(
                        "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
            }

            coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());

            return feedbackStream;
        }

        private UnsupportedOperationException groupingException =
                new UnsupportedOperationException("Cannot change the input partitioning of an" +
                        "iteration head directly. Apply the partitioning on the input and" +
                        "feedback streams instead.");

        @Override
        public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {
            throw groupingException;
        }

        @Override
        public ConnectedStreams<I, F> keyBy(String field1, String field2) {
            throw groupingException;
        }

        @Override
        public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {
            throw groupingException;
        }

        @Override
        public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {
            throw groupingException;
        }

        @Override
        public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) {
            throw groupingException;
        }
    }
}
  • IterativeStream inherits SingleOutputStreamOperator constructor receives two parameters, one is originalInput and the other is maxWaitTime; . It creates FeedbackTransformation; based on dataStream.getTransformation () and maxWaitTime; The constructor also sets the bufferTimeout of transformation according to the datastream.environment.getbuffertimeout () parameter.
  • IterativeStream mainly provides two methods. One is the closeWith method, which is used for close iteration. It is mainly used to define this part of iteration (Can be understood as reflux, or similar recursive operations, filter controls the recursive conditions, through the elements of filter will re-enter the header of IterativeStream to continue to participate in subsequent operations); The withFeedbackType method creates ConnectedIterativeStreams
  • ConnectedIterativeStreams inherits ConnectedStreams. it allows the type of iteration to be feedback to be different from the type of originalInput. it also defines the closeWith method, but it covers the keyBy method of ConnectedStreams and throws an UnsupportedOperationException exception.

FeedbackTransformation

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/transformations/FeedbackTransformation.java

@Internal
public class FeedbackTransformation<T> extends StreamTransformation<T> {

    private final StreamTransformation<T> input;

    private final List<StreamTransformation<T>> feedbackEdges;

    private final Long waitTime;

    public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
        super("Feedback", input.getOutputType(), input.getParallelism());
        this.input = input;
        this.waitTime = waitTime;
        this.feedbackEdges = Lists.newArrayList();
    }

    public StreamTransformation<T> getInput() {
        return input;
    }

    public void addFeedbackEdge(StreamTransformation<T> transform) {

        if (transform.getParallelism() != this.getParallelism()) {
            throw new UnsupportedOperationException(
                    "Parallelism of the feedback stream must match the parallelism of the original" +
                            " stream. Parallelism of original stream: " + this.getParallelism() +
                            "; parallelism of feedback stream: " + transform.getParallelism() +
                            ". Parallelism can be modified using DataStream#setParallelism() method");
        }

        feedbackEdges.add(transform);
    }

    public List<StreamTransformation<T>> getFeedbackEdges() {
        return feedbackEdges;
    }

    public Long getWaitTime() {
        return waitTime;
    }

    @Override
    public final void setChainingStrategy(ChainingStrategy strategy) {
        throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
    }

    @Override
    public Collection<StreamTransformation<?>> getTransitivePredecessors() {
        List<StreamTransformation<?>> result = Lists.newArrayList();
        result.add(this);
        result.addAll(input.getTransitivePredecessors());
        return result;
    }
}
  • FeedbackTransformation inherits StreamTransformation and has properties such as feedbacktransformations and waitTime.
  • The addFeedbackEdge method is used to add a feedback edge. the closeWith method of IterativeStream calls addFeedbackEdge to add a StreamTransformation.
  • WaitTime specifies the time that the feedback operator waits for feedback elements. once waitTime passes, the operation will close and no longer accept new feedback elements.

Summary

  • DataStream provides two iterate methods, which create and return an IterativeStream. The parameterless iterate method has a maxWaitTimeMillis of 0
  • The constructor of IterativeStream receives two parameters, one is originalInput and the other is maxWaitTime; ; It creates FeedbackTransformation; based on dataStream.getTransformation () and maxWaitTime; The constructor also sets the bufferTimeout; of transformation according to the datastream.environment.getbuffertimeout () parameter; FeedbackTransformation inherits StreamTransformation. it has feedbacktransformations, waitTime and other attributes. waitTime specifies the time that feedbackoperators wait for feedback elements. once waitTime elapses, operation will close and no longer accept new feedback elements.
  • IterativeStream inherits SingleOutputStreamOperator mainly provides two methods. One is the closeWith method, which is used for close iteration. It is mainly used to define this part of iteration to be feedback to the Iteration header. The withFeedbackType me thod creates ConnectedIterativeStreams, which inherits ConnectedStreams and allows the type of iteration to be feedback to be different from the type of originalInput. It also defines the c loseWith method, but it overrides the keyBy method of ConnectedStreams and throws an UnsupportedOperationException exception.

doc