[case52] talk about the aggregation operation of flink KeyedStream.

  flink

Order

This article mainly studies the aggregation operation of flink KeyedStream.

Example

    @Test
    public void testMax() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new
                WordCount(1,"World", 3), new WordCount(2,"Hello", 1)};
        env.fromElements(data)
                .keyBy("word")
                .max("frequency")
                .addSink(new SinkFunction<WordCount>() {
                    @Override
                    public void invoke(WordCount value, Context context) throws Exception {
                        LOGGER.info("value:{}",value);
                    }
                });
        env.execute("testMax");
    }
  • Here, the word field is first keyBy operated, and then the max method of KeyedStream is used to get the maximum WordCount according to the frequency field.

KeyedStream.aggregate

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/KeyedStream.java

    public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String field) {
        return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int positionToMax) {
        return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String field) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
                false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int positionToMin) {
        return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String field) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
                false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
        return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
                first, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
        return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
        return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
                first, getExecutionConfig()));
    }

    protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
        StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
                clean(aggregate), getType().createSerializer(getExecutionConfig()));
        return transform("Keyed Aggregation", getType(), operator);
    }
  • The aggregation method of KeyedStream is protected modified. sum, max, min, maxBy, minBy actually call the Aggregation method, but the AggregationType of the Comparable aggregate they create is different, namely SUM, MAX, MIN, MAXBY, MINBY
  • Each sum, max, min, maxBy, minBy has two overloaded methods, one is an int type parameter and the other is a String type parameter
  • MaxBy, minBy more than sum, max, min first (boolean) parameter, which specifies whether to take the first return when multiple compare values are equal.

ComparableAggregator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java

@Internal
public class ComparableAggregator<T> extends AggregationFunction<T> {

    private static final long serialVersionUID = 1L;

    private Comparator comparator;
    private boolean byAggregate;
    private boolean first;
    private final FieldAccessor<T, Object> fieldAccessor;

    private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
        this.comparator = Comparator.getForAggregation(aggregationType);
        this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
        this.first = first;
        this.fieldAccessor = fieldAccessor;
    }

    public ComparableAggregator(int positionToAggregate,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            ExecutionConfig config) {
        this(positionToAggregate, typeInfo, aggregationType, false, config);
    }

    public ComparableAggregator(int positionToAggregate,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            boolean first,
            ExecutionConfig config) {
        this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
    }

    public ComparableAggregator(String field,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            boolean first,
            ExecutionConfig config) {
        this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
    }

    @SuppressWarnings("unchecked")
    @Override
    public T reduce(T value1, T value2) throws Exception {
        Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1);
        Object o2 = fieldAccessor.get(value2);

        int c = comparator.isExtremal(o1, o2);

        if (byAggregate) {
            // if they are the same we choose based on whether we want to first or last
            // element with the min/max.
            if (c == 0) {
                return first ? value1 : value2;
            }

            return c == 1 ? value1 : value2;

        } else {
            if (c == 0) {
                value1 = fieldAccessor.set(value1, o2);
            }
            return value1;
        }
    }
}
  • The ComparableAggregator inherits the AggregationFunction, while the AggregationFunction implements the reduceFunction interface. The Reduce method implemented by the ComparableAggregator first compares two objects with the help of the Comparator. Then different processing is carried out according to whether it is byAggregate. if it is byAggregate, when the comparison value is 0, it is judged whether the first encountered element is returned, if it is, value1 is returned, otherwise value2 is returned, if the comparison value is not 0, the element with the largest comparison value is taken to return; If it is not byAggregate, if the comparison value is 0 (When the value value1 of the comparison field is less than or equal to value2), the value of the comparison field of value2 is updated to value1 by reflection method, and value1 is finally returned

AggregationFunction

@Internal
public abstract class AggregationFunction<T> implements ReduceFunction<T> {
    private static final long serialVersionUID = 1L;

    /**
     * Aggregation types that can be used on a windowed stream or keyed stream.
     */
    public enum AggregationType {
        SUM, MIN, MAX, MINBY, MAXBY,
    }
}
  • The AggregationFunction declaration implements the ReduceFunction and defines five types of AggregationType, namely SUM, MIN, MAX, MINBY, MAXBY

Comparator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/aggregation/Comparator.java

@Internal
public abstract class Comparator implements Serializable {

    private static final long serialVersionUID = 1L;

    public abstract <R> int isExtremal(Comparable<R> o1, R o2);

    public static Comparator getForAggregation(AggregationType type) {
        switch (type) {
        case MAX:
            return new MaxComparator();
        case MIN:
            return new MinComparator();
        case MINBY:
            return new MinByComparator();
        case MAXBY:
            return new MaxByComparator();
        default:
            throw new IllegalArgumentException("Unsupported aggregation type.");
        }
    }

    private static class MaxComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            return o1.compareTo(o2) > 0 ? 1 : 0;
        }

    }

    private static class MaxByComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            int c = o1.compareTo(o2);
            if (c > 0) {
                return 1;
            }
            if (c == 0) {
                return 0;
            } else {
                return -1;
            }
        }

    }

    private static class MinByComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            int c = o1.compareTo(o2);
            if (c < 0) {
                return 1;
            }
            if (c == 0) {
                return 0;
            } else {
                return -1;
            }
        }

    }

    private static class MinComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            return o1.compareTo(o2) < 0 ? 1 : 0;
        }

    }
}
  • Comparator implements Serializable interface, defines isExtremal abstract method, and provides getForAggregation factory method to create different comparators according to different AggregationType.
  • Four subclasses of maxComparator, MinComparator, MinByComparator and MaxByComparator are defined in the comparator, which all implement isExtremal methods.
  • MaxComparator directly uses the compareTo method defined by the Comparable interface, but its return is only 0 and 1, and returns 1 when compareTo is greater than 0, otherwise returns 0, that is, returns 1 when it is greater than 0, otherwise returns 0; MaxBycompareTo also obtains the value according to the Comparator method defined by the Comparable interface. However, there are three types of return values. When it is greater than 0, it returns 1; when it is equal to 0, it returns 0; when it is less than 0, it returns -1; that is, when it is greater than 0, it returns 1; when it is equal, it returns 0; when it is less than, it returns -1

Summary

  • The aggregation operation of KeyedStream is mainly divided into sum, max, min, maxBy and minBy. protected modified aggregation method is called inside them, but the AggregationType of ComparableAggregator they create is different, namely sum, max, min, maxby and minby.
  • The ComparableAggregator inherits the AggregationFunction, while the AggregationFunction implements the reduceFunction interface. The Reduce method implemented by the ComparableAggregator first compares two ob jects with the help of the Comparator. Then, different processing is carried out according to whether it is byAggregate. if it is byAggregate, it is judged whether to return the first encountered element when the comparison value is 0. if it is, it is returned the first encountered element, otherwise it is returned the last encountered element. if the comparison value is not 0, it is returned by taking the element with the largest comparison value. If it is not byAggregate, if the comparison value is 0, the reflection method is used to update the latter value to value1, and finally value1 is returned.
  • Four subclasses of MaxComparator, MinComparator, MinByComparator and MaxByComparator are defined in Comparator, which all implement isExtremal methods. The difference between MaxComparator and MaxByComparator is that MaxComparator returns more than 1 and less than or equal to 0, while MaxByComparator returns a finer value, greater than 1, equal to 0 and less than-1; This difference is also reflected in the reduce method of ComparableAggregator, and maxBy and minBy have one more first (boolean) parameter, specifically used to select which element to return when comparing values of 0; For non-byAggregate operations, the reduce method always returns value1. when the comparison value is less than or equal to, it uses reflection to update value1 and then returns value1

doc