Talk about boundededoutoforderliness of flink

  flink

Order

This paper mainly studies flink’s boundededoutofordersestimestampextractor.

BoundedOutOfOrdernessTimestampExtractor

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java

/**
 * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
 * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
 * help reduce the number of elements that are ignored due to lateness when computing the final result for a
 * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
 * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
 * */
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

    private static final long serialVersionUID = 1L;

    /** The current maximum timestamp seen so far. */
    private long currentMaxTimestamp;

    /** The timestamp of the last emitted watermark. */
    private long lastEmittedWatermark = Long.MIN_VALUE;

    /**
     * The (fixed) interval between the maximum seen timestamp seen in the records
     * and that of the watermark to be emitted.
     */
    private final long maxOutOfOrderness;

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }

    /**
     * Extracts the timestamp from the given element.
     *
     * @param element The element that the timestamp is extracted from.
     * @return The new timestamp.
     */
    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        // this guarantees that the watermark never goes backwards.
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}
  • Boundedoutoftordersestimestampatextracter abstract class implements the extractTimestamp and getcurrentcontracttimestamps methods of the AssignerWithPe riodicWatermarks interface, and declares the abstract method extractAscendingTimestamp for subclasses to implement.
  • The constructor of boundededoutoofordersestimestampextractor receives the maxOutOfOrderness parameter to specify that the element allows hysteresis (T-t_w, t is the eventTime of element, t_w is the time of the previous watermark.) will be ignored if this value is exceeded when calculating window data.
  • BoundedoutOfOrdersTimestamp method of BoundedoutTimestamp will call the extraction time of the subclass’s extractTimestamp method. If the time is greater than CurrentTimestamp, CurrentTimestamp will be updated. GetCurrentWatermark first calculates the potentialWM, and if the potentialWM is greater than or equal to LastemittedWartermark, updates LastemittedWartermark (CurrentMaxTimestamp-LastemittedWartermark > = maxOutOfOrderness, which means that LastemittedWartermark is too small so the difference exceeds MaxOutOfOrderness, thus increasing LastemittedWartermark), and finally returned to Watermark(lastEmittedWatermark)

Example

    public static void main(String[] args) throws Exception {

        final int popThreshold = 20; // threshold for popular places

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        // create a Kafka consumer
        FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
                "cleansedRides",
                new TaxiRideSchema(),
                kafkaProps);
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

        // create a TaxiRide data stream
        DataStream<TaxiRide> rides = env.addSource(consumer);

        // find popular places
        DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
                // match ride to grid cell and event type (start or end)
                .map(new GridCellMatcher())
                // partition by cell id and event type
                .keyBy(0, 1)
                // build sliding window
                .timeWindow(Time.minutes(15), Time.minutes(5))
                // count ride events in window
                .apply(new RideCounter())
                // filter by popularity threshold
                .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
                // map grid cell to coordinates
                .map(new GridToCoordinates());

        popularPlaces.print();

        // execute the transformation pipeline
        env.execute("Popular Places from Kafka");
    }

    /**
     * Assigns timestamps to TaxiRide records.
     * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
     */
    public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

        public TaxiRideTSExtractor() {
            super(Time.seconds(MAX_EVENT_DELAY));
        }

        @Override
        public long extractTimestamp(TaxiRide ride) {
            if (ride.isStart) {
                return ride.startTime.getMillis();
            }
            else {
                return ride.endTime.getMillis();
            }
        }
    }
  • This example uses AssignerWithPeriodicWatermarks, and sets the interval of watermark through env.getconfig (). setautowatermarkinterval (1000) . AssignerWithPeriodicWatermarks are designated as TaxiRideTSExtractor through assignTimestampsAndWatermarks, which inherits the BoundedoutOfOrderMessTimesTampExtentrator abstract class.

Summary

  • Flink provides several built-in pre-defined timestampextractors/watermarkemitters for convenient development, one of which is boundededoutofordersestimestampxtractor.
  • Boundedoutoftordersestimestampatextracter abstract class implements the extractTimestamp and getcurrentcontracttimestamps methods of the AssignerWithPe riodicWatermarks interface, and declares the abstract method extractAscendingTimestamp for subclasses to implement.
  • The constructor of boundededoutoofordersestimestampextractor receives the maxOutOfOrderness parameter to specify that the element allows hysteresis (T-t_w, t is the eventTime of element, t_w is the time of the previous watermark.) will be ignored if this value is exceeded when calculating window data.

doc