Talk about flink’s AscendingTimestampExtractor

  flink

Order

This article mainly studies flink’s AscendingTimestampExtractor

AscendingTimestampExtractor

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java

/**
 * A timestamp assigner and watermark generator for streams where timestamps are monotonously
 * ascending. In this case, the local watermarks for the streams are easy to generate, because
 * they strictly follow the timestamps.
 *
 * @param <T> The type of the elements that this function can extract timestamps from
 */
@PublicEvolving
public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

    private static final long serialVersionUID = 1L;

    /** The current timestamp. */
    private long currentTimestamp = Long.MIN_VALUE;

    /** Handler that is called when timestamp monotony is violated. */
    private MonotonyViolationHandler violationHandler = new LoggingHandler();


    /**
     * Extracts the timestamp from the given element. The timestamp must be monotonically increasing.
     *
     * @param element The element that the timestamp is extracted from.
     * @return The new timestamp.
     */
    public abstract long extractAscendingTimestamp(T element);

    /**
     * Sets the handler for violations to the ascending timestamp order.
     *
     * @param handler The violation handler to use.
     * @return This extractor.
     */
    public AscendingTimestampExtractor<T> withViolationHandler(MonotonyViolationHandler handler) {
        this.violationHandler = requireNonNull(handler);
        return this;
    }

    // ------------------------------------------------------------------------

    @Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }

    @Override
    public final Watermark getCurrentWatermark() {
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }

    //......
}
  • The AscendingTimestampExtractor abstract class implements the extractTimestamp and getCurrentWatermark methods of the AssignerWithPeriodicWatermarks in terface, while declaring the abstract method extractAscendingTimestamp for subclasses to implement.
  • AscendingTimestampExtractor’s time for elements is monotonically increasing in each parallel task (timestamp monotony) scenario, extractTimestamp first calls the subclass implementation of the extractAscendingTimestamp to extract newTimestamp from the element, and then returns, for violation of timestampmonony, here calls MonotonyViolationHandler for processing
  • The getcurrentTimestamp method getCurrentWatermark(currentTimestamp-1) when CurrentTimestamp is not Long.MIN_VALUE

MonotonyViolationHandler

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java

    /**
     * Interface for handlers that handle violations of the monotonous ascending timestamps
     * property.
     */
    public interface MonotonyViolationHandler extends java.io.Serializable {

        /**
         * Called when the property of monotonously ascending timestamps is violated, i.e.,
         * when {@code elementTimestamp < lastTimestamp}.
         *
         * @param elementTimestamp The timestamp of the current element.
         * @param lastTimestamp The last timestamp.
         */
        void handleViolation(long elementTimestamp, long lastTimestamp);
    }

    /**
     * Handler that does nothing when timestamp monotony is violated.
     */
    public static final class IgnoringHandler implements MonotonyViolationHandler {
        private static final long serialVersionUID = 1L;

        @Override
        public void handleViolation(long elementTimestamp, long lastTimestamp) {}
    }

    /**
     * Handler that fails the program when timestamp monotony is violated.
     */
    public static final class FailingHandler implements MonotonyViolationHandler {
        private static final long serialVersionUID = 1L;

        @Override
        public void handleViolation(long elementTimestamp, long lastTimestamp) {
            throw new RuntimeException("Ascending timestamps condition violated. Element timestamp "
                    + elementTimestamp + " is smaller than last timestamp " + lastTimestamp);
        }
    }

    /**
     * Handler that only logs violations of timestamp monotony, on WARN log level.
     */
    public static final class LoggingHandler implements MonotonyViolationHandler {
        private static final long serialVersionUID = 1L;

        private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);

        @Override
        public void handleViolation(long elementTimestamp, long lastTimestamp) {
            LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);
        }
    }
  • MonotonyViolationHandler inherits Serializable, which defines handleViolation method. This interface has three implementation classes built in, namely IgnoringHandler, FailingHandler and FailingHandler
  • IgnoringHandler’s handleViolation method does not do any processing; FailingHandler’s handleViolation throws a RuntimeException; ; The handleViolation method of the LoggingHandler prints the warn log.
  • AscendingTimestampExtractor uses LoggingHandler by default or can be set by the withViolationHandler method.

Example

    @Test
    public void testWithFailingHandler() {
        AscendingTimestampExtractor<Long> extractor = (new AscendingTimestampExtractorTest.LongExtractor()).withViolationHandler(new FailingHandler());
        this.runValidTests(extractor);

        try {
            this.runInvalidTest(extractor);
            Assert.fail("should fail with an exception");
        } catch (Exception var3) {
            ;
        }

    }

    private void runValidTests(AscendingTimestampExtractor<Long> extractor) {
        Assert.assertEquals(13L, extractor.extractTimestamp(13L, -1L));
        Assert.assertEquals(13L, extractor.extractTimestamp(13L, 0L));
        Assert.assertEquals(14L, extractor.extractTimestamp(14L, 0L));
        Assert.assertEquals(20L, extractor.extractTimestamp(20L, 0L));
        Assert.assertEquals(20L, extractor.extractTimestamp(20L, 0L));
        Assert.assertEquals(20L, extractor.extractTimestamp(20L, 0L));
        Assert.assertEquals(500L, extractor.extractTimestamp(500L, 0L));
        Assert.assertEquals(9223372036854775806L, extractor.extractTimestamp(9223372036854775806L, 99999L));
    }

    private void runInvalidTest(AscendingTimestampExtractor<Long> extractor) {
        Assert.assertEquals(1000L, extractor.extractTimestamp(1000L, 100L));
        Assert.assertEquals(1000L, extractor.extractTimestamp(1000L, 100L));
        Assert.assertEquals(999L, extractor.extractTimestamp(999L, 100L));
    }

    private static class LongExtractor extends AscendingTimestampExtractor<Long> {
        private static final long serialVersionUID = 1L;

        private LongExtractor() {
        }

        public long extractAscendingTimestamp(Long element) {
            return element;
        }
    }
  • The violationHandler is set to FailingHandler by using withViolationHandler here. when it comes to 999, it will call the monotonyviolationhandler.handleviolation method because it is smaller than the previous 1000

Summary

  • Flink provides several built-in pre-defined timestampextractors/watermarkemitters for convenient development, one of which is AscendingTimestampExtractor.
  • The AscendingTimestampExtractor abstract class implements the extractTimestamp and getCurrentWatermark methods of the AssignerWithPeriodicWatermarks in terface, while declaring the abstract method extractAscendingTimestamp for subclasses to implement.
  • The time that AscendingTimestampExtractor applies to elements is monotonically increasing in each parallel task. For those that violate timestamp monot ony, the handleViolation method of MonotonyViolationHandler is called here for processing. MonotonyViolationHandler inherits Serializable, which defines handleViolation method. This interface has three implementation classes built in, namely IgnoringHandler, FailingHandler and FailingHandler

doc