Order
This article mainly studies flink Table’s Distinct Aggregation.
Example
//Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation.
Table orders = tableEnv.scan("Orders");
// Distinct aggregation on group by
Table groupByDistinctResult = orders
.groupBy("a")
.select("a, b.sum.distinct as d");
// Distinct aggregation on time window group by
Table groupByWindowDistinctResult = orders
.window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
.select("a, b.sum.distinct as d");
// Distinct aggregation on over window
Table result = orders
.window(Over
.partitionBy("a")
.orderBy("rowtime")
.preceding("UNBOUNDED_RANGE")
.as("w"))
.select("a, b.avg.distinct over w, b.max over w, b.min over w");
//User-defined aggregation function can also be used with DISTINCT modifiers
Table orders = tEnv.scan("Orders");
// Use distinct aggregation for user-defined aggregate functions
tEnv.registerFunction("myUdagg", new MyUdagg());
orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");
- Distinct Aggregation can be used for built-in and custom aggregation function; ; Built-in aggregation function such as GroupBy Aggregation, GroupBy Window Aggregation, Over Window Aggregation
AggregateFunction
flink-table_2.11-1.7.0-sources.jar! /org/apache/flink/table/functions/AggregateFunction.scala
/**
* Base class for User-Defined Aggregates.
*
* The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
* methods. An [[AggregateFunction]] needs at least three methods:
* - createAccumulator,
* - accumulate, and
* - getValue.
*
* There are a few other methods that can be optional to have:
* - retract,
* - merge, and
* - resetAccumulator
*
* All these methods must be declared publicly, not static and named exactly as the names
* mentioned above. The methods createAccumulator and getValue are defined in the
* [[AggregateFunction]] functions, while other methods are explained below.
*
*
* {{{
* Processes the input values and update the provided accumulator instance. The method
* accumulate can be overloaded with different custom types and arguments. An AggregateFunction
* requires at least one accumulate() method.
*
* @param accumulator the accumulator which contains the current aggregated results
* @param [user defined inputs] the input value (usually obtained from a new arrived data).
*
* def accumulate(accumulator: ACC, [user defined inputs]): Unit
* }}}
*
*
* {{{
* Retracts the input values from the accumulator instance. The current design assumes the
* inputs are the values that have been previously accumulated. The method retract can be
* overloaded with different custom types and arguments. This function must be implemented for
* datastream bounded over aggregate.
*
* @param accumulator the accumulator which contains the current aggregated results
* @param [user defined inputs] the input value (usually obtained from a new arrived data).
*
* def retract(accumulator: ACC, [user defined inputs]): Unit
* }}}
*
*
* {{{
* Merges a group of accumulator instances into one accumulator instance. This function must be
* implemented for datastream session window grouping aggregate and dataset grouping aggregate.
*
* @param accumulator the accumulator which will keep the merged aggregate results. It should
* be noted that the accumulator may contain the previous aggregated
* results. Therefore user should not replace or clean this instance in the
* custom merge method.
* @param its an [[java.lang.Iterable]] pointed to a group of accumulators that will be
* merged.
*
* def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit
* }}}
*
*
* {{{
* Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
* dataset grouping aggregate.
*
* @param accumulator the accumulator which needs to be reset
*
* def resetAccumulator(accumulator: ACC): Unit
* }}}
*
*
* @tparam T the type of the aggregation result
* @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute an aggregation result.
* AggregateFunction represents its state using accumulator, thereby the state of the
* AggregateFunction must be put into the accumulator.
*/
abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
/**
* Creates and init the Accumulator for this [[AggregateFunction]].
*
* @return the accumulator with the initial value
*/
def createAccumulator(): ACC
/**
* Called every time when an aggregation result should be materialized.
* The returned value could be either an early and incomplete result
* (periodically emitted as data arrive) or the final result of the
* aggregation.
*
* @param accumulator the accumulator which contains the current
* aggregated results
* @return the aggregation result
*/
def getValue(accumulator: ACC): T
/**
* Returns true if this AggregateFunction can only be applied in an OVER window.
*
* @return true if the AggregateFunction requires an OVER window, false otherwise.
*/
def requiresOver: Boolean = false
/**
* Returns the TypeInformation of the AggregateFunction's result.
*
* @return The TypeInformation of the AggregateFunction's result or null if the result type
* should be automatically inferred.
*/
def getResultType: TypeInformation[T] = null
/**
* Returns the TypeInformation of the AggregateFunction's accumulator.
*
* @return The TypeInformation of the AggregateFunction's accumulator or null if the
* accumulator type should be automatically inferred.
*/
def getAccumulatorType: TypeInformation[ACC] = null
}
- AggregateFunction inherits UserDefinedFunction;; It has two generics, one T for value and one ACC for Accumulator. It defines the createAccumulator, getValue, getResultType, getAccumulatorType methods (
Subclasses of these methods must implement the createAccumulator and getValue methods
) - For AggregateFunction, there is an accumulate method not defined here, but subclass definition and implementation are required. This method receives ACC,T and T parameters and returns void; ; In addition, there are three optional methods: retract, merge, and resetAccumulator, which need subclasses to be defined and implemented according to the situation.
- For datastream bounded over aggregate operation, restract method is required to be implemented, which receives ACC,T and T parameters and returns void; . For datastreamsessionwindowgrouping aggregate and dataset grouping aggregate operations, the merge method is required to be implemented. the method receives ACC, Java.lang.iterate < t > two parameters and returns void; . For dataset grouping aggregate operation, it is required to implement the resetAccumulator method, which receives ACC parameters and returns void.
Summary
- Table’s Distinct Aggregation can be used for built-in and custom aggregation function; ; Built-in aggregation function such as GroupBy Aggregation, GroupBy Window Aggregation, Over Window Aggregation
- AggregateFunction inherits UserDefinedFunction;; It has two generics, one T for value and one ACC for Accumulator. It defines the createAccumulator, getValue, getResultType, getAccumulatorType methods (
Subclasses of these methods must implement the createAccumulator and getValue methods
) - For AggregateFunction, there is an accumulate method not defined here, but subclass definition and implementation are required. This method receives ACC,T and T parameters and returns void;; In addition, there are three optional methods: retract, merge, resetAccumulator, which need subclasses to define and implement according to the situation (
For datastream bounded over aggregate operation, restract method is required to be implemented, which receives ACC,T and T parameters and returns void;. For datastreamsessionwindowgrouping aggregate and dataset grouping aggregate operations, the merge method is required to be implemented, which receives ACC,java.lang.Iterable\<T\ < t \ > two parameters and returns void;; For dataset grouping aggregate operation, it is required to implement the resetAccumulator method, which receives ACC parameters and returns void.
)