Talk about flink’s EventTime

  flink

Order

This article mainly studies flink’s EventTime

SourceFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/source/SourceFunction.java

    /**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        /**
         * Emits one element from the source, without attaching a timestamp. In most cases,
         * this is the default way of emitting elements.
         *
         * <p>The timestamp that the element will get assigned depends on the time characteristic of
         * the streaming program:
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
         *         current time as the timestamp.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
         *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
         *         operation (like time windows).</li>
         * </ul>
         *
         * @param element The element to emit
         */
        void collect(T element);

        /**
         * Emits one element from the source, and attaches the given timestamp. This method
         * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
         * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
         * on the stream.
         *
         * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
         * This allows programs to switch between the different time characteristics and behaviors
         * without changing the code of the source functions.
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
         *         because processing time never works with element timestamps.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
         *         system's current time, to realize proper ingestion time semantics.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
         * </ul>
         *
         * @param element The element to emit
         * @param timestamp The timestamp in milliseconds since the Epoch
         */
        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        /**
         * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
         * elements with a timestamp {@code t' <= t} will occur any more. If further such
         * elements will be emitted, those elements are considered <i>late</i>.
         *
         * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
         * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
         * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
         * automatic ingestion time watermarks.
         *
         * @param mark The Watermark to emit
         */
        @PublicEvolving
        void emitWatermark(Watermark mark);

        /**
         * Marks the source to be temporarily idle. This tells the system that this source will
         * temporarily stop emitting records and watermarks for an indefinite amount of time. This
         * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
         * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
         * watermarks without the need to wait for watermarks from this source while it is idle.
         *
         * <p>Source functions should make a best effort to call this method as soon as they
         * acknowledge themselves to be idle. The system will consider the source to resume activity
         * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
         * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
         */
        @PublicEvolving
        void markAsTemporarilyIdle();

        /**
         * Returns the checkpoint lock. Please refer to the class-level comment in
         * {@link SourceFunction} for details about how to write a consistent checkpointed
         * source.
         *
         * @return The object to use as the lock
         */
        Object getCheckpointLock();

        /**
         * This method is called by the system to shut down the context.
         */
        void close();
    }
  • The SourceFunction defines the SourceContext interface, which defines the collectWithTimestamp and emitWatermark methods. The former is used to assign event timestamp, while the latter is used to emit watermark.

Example

public abstract class TestSource implements SourceFunction {
    private volatile boolean running = true;
    protected Object[] testStream;

    @Override
    public void run(SourceContext ctx) throws Exception {
        for (int i = 0; (i < testStream.length) && running; i++) {
            if (testStream[i] instanceof TaxiRide) {
                TaxiRide ride = (TaxiRide) testStream[i];
                ctx.collectWithTimestamp(ride, ride.getEventTime());
            } else if (testStream[i] instanceof TaxiFare) {
                TaxiFare fare = (TaxiFare) testStream[i];
                ctx.collectWithTimestamp(fare, fare.getEventTime());
            } else if (testStream[i] instanceof String) {
                String s = (String) testStream[i];
                ctx.collectWithTimestamp(s, 0);
            } else if (testStream[i] instanceof Long) {
                Long ts = (Long) testStream[i];
                ctx.emitWatermark(new Watermark(ts));
            } else {
                throw new RuntimeException(testStream[i].toString());
            }
        }
        // test sources are finite, so they have a Long.MAX_VALUE watermark when they finishes
    }

    @Override
    public void cancel() {
        running = false;
    }
}
  • This shows how to assign timestamp (collectWithTimestamp) and emit watermark (emitWatermark)

DataStream.assignTimestampsAndWatermarks

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * Assigns timestamps to the elements in the data stream and periodically creates
     * watermarks to signal event time progress.
     *
     * <p>This method creates watermarks periodically (for example every second), based
     * on the watermarks indicated by the given watermark generator. Even when no new elements
     * in the stream arrive, the given watermark generator will be periodically checked for
     * new watermarks. The interval in which watermarks are generated is defined in
     * {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     *
     * <p>Use this method for the common cases, where some characteristic over all elements
     * should generate the watermarks, or where watermarks are simply trailing behind the
     * wall clock time by a certain amount.
     *
     * <p>For the second case and when the watermarks are required to lag behind the maximum
     * timestamp seen so far in the elements of the stream by a fixed amount of time, and this
     * amount is known in advance, use the
     * {@link BoundedOutOfOrdernessTimestampExtractor}.
     *
     * <p>For cases where watermarks should be created in an irregular fashion, for example
     * based on certain markers that some element carry, use the
     * {@link AssignerWithPunctuatedWatermarks}.
     *
     * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
     *                                      watermark generator.
     * @return The stream after the transformation, with assigned timestamps and watermarks.
     *
     * @see AssignerWithPeriodicWatermarks
     * @see AssignerWithPunctuatedWatermarks
     * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
     */
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

        // match parallelism to input, otherwise dop=1 sources could lead to some strange
        // behaviour: the watermark will creep along very slowly because the elements
        // from the source go to each extraction operator round robin.
        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

        TimestampsAndPeriodicWatermarksOperator<T> operator =
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }

    /**
     * Assigns timestamps to the elements in the data stream and creates watermarks to
     * signal event time progress based on the elements themselves.
     *
     * <p>This method creates watermarks based purely on stream elements. For each element
     * that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},
     * the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)}
     * method is called, and a new watermark is emitted, if the returned watermark value is
     * non-negative and greater than the previous watermark.
     *
     * <p>This method is useful when the data stream embeds watermark elements, or certain elements
     * carry a marker that can be used to determine the current event time watermark.
     * This operation gives the programmer full control over the watermark generation. Users
     * should be aware that too aggressive watermark generation (i.e., generating hundreds of
     * watermarks every second) can cost some performance.
     *
     * <p>For cases where watermarks should be created in a regular fashion, for example
     * every x milliseconds, use the {@link AssignerWithPeriodicWatermarks}.
     *
     * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
     *                                      watermark generator.
     * @return The stream after the transformation, with assigned timestamps and watermarks.
     *
     * @see AssignerWithPunctuatedWatermarks
     * @see AssignerWithPeriodicWatermarks
     * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
     */
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {

        // match parallelism to input, otherwise dop=1 sources could lead to some strange
        // behaviour: the watermark will creep along very slowly because the elements
        // from the source go to each extraction operator round robin.
        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

        TimestampsAndPunctuatedWatermarksOperator<T> operator =
                new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner);

        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }
  • DataStream defines the assignTimestampsAndWatermarks method, which is used to set timestampAndWatermarkAssigner (Type AssignerWithPeriodicWatermarks or assignerwith punctuatedwaremarks), telling flink how to extract eventTime

AssignerWithPeriodicWatermarks

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {

    /**
     * Returns the current watermark. This method is periodically called by the
     * system to retrieve the current watermark. The method may return {@code null} to
     * indicate that no new Watermark is available.
     *
     * <p>The returned watermark will be emitted only if it is non-null and its timestamp
     * is larger than that of the previously emitted watermark (to preserve the contract of
     * ascending watermarks). If the current watermark is still
     * identical to the previous one, no progress in event time has happened since
     * the previous call to this method. If a null value is returned, or the timestamp
     * of the returned watermark is smaller than that of the last emitted one, then no
     * new watermark will be generated.
     *
     * <p>The interval in which this method is called and Watermarks are generated
     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     * @see ExecutionConfig#getAutoWatermarkInterval()
     *
     * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
     */
    @Nullable
    Watermark getCurrentWatermark();
}
  • AssignerWithPeriodicWatermarks inherited the TimestampAssigner interface (The extractTimestamp method is defined.), the getCurrentWatermark method is defined here, which will be called periodically to return current watermark, or null if not

Instances of AssignerWithPeriodicWatermarks

    public static void main(String[] args) throws Exception {

        final int popThreshold = 20; // threshold for popular places

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        // create a Kafka consumer
        FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
                "cleansedRides",
                new TaxiRideSchema(),
                kafkaProps);
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

        // create a TaxiRide data stream
        DataStream<TaxiRide> rides = env.addSource(consumer);

        // find popular places
        DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
                // match ride to grid cell and event type (start or end)
                .map(new GridCellMatcher())
                // partition by cell id and event type
                .keyBy(0, 1)
                // build sliding window
                .timeWindow(Time.minutes(15), Time.minutes(5))
                // count ride events in window
                .apply(new RideCounter())
                // filter by popularity threshold
                .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
                // map grid cell to coordinates
                .map(new GridToCoordinates());

        popularPlaces.print();

        // execute the transformation pipeline
        env.execute("Popular Places from Kafka");
    }

    /**
     * Assigns timestamps to TaxiRide records.
     * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
     */
    public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

        public TaxiRideTSExtractor() {
            super(Time.seconds(MAX_EVENT_DELAY));
        }

        @Override
        public long extractTimestamp(TaxiRide ride) {
            if (ride.isStart) {
                return ride.startTime.getMillis();
            }
            else {
                return ride.endTime.getMillis();
            }
        }
    }
  • Here, DataStream’s assignTimestampsAndWatermarks method is used, and the set timestampAndWatermarkAssigner implements the AssignerWithPeriodicWatermarks interface (Boundedoutofordersestimestampextractor implemented the AssignerWithPeriodicWatermarks interface); Here, the interval of AssignerWithPeriodicWatermarks is set by env.getconfig (). setautowatermarkinterval (1000)

AssignerWithPunctuatedWatermarks

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java

public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

    /**
     * Asks this implementation if it wants to emit a watermark. This method is called right after
     * the {@link #extractTimestamp(Object, long)} method.
     *
     * <p>The returned watermark will be emitted only if it is non-null and its timestamp
     * is larger than that of the previously emitted watermark (to preserve the contract of
     * ascending watermarks). If a null value is returned, or the timestamp of the returned
     * watermark is smaller than that of the last emitted one, then no new watermark will
     * be generated.
     *
     * <p>For an example how to use this method, see the documentation of
     * {@link AssignerWithPunctuatedWatermarks this class}.
     *
     * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
     */
    @Nullable
    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
  • The assignerwith punctuateddemarks interface inherits the TimestampAssigner interface (The extractTimestamp method is defined.), which defines the checkAndGetNextWatermark method, which is called after the extractTimestamp method is executed (When called, the extractedTimestamp just obtained is passed through the method parameter.)

Instances of assignerwith punctuatedwarermarks

public static void main(String[] args) throws Exception {

        // read parameters
        ParameterTool params = ParameterTool.fromArgs(args);
        String input = params.getRequired("input");

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // connect to the data file
        DataStream<String> carData = env.readTextFile(input);

        // map to events
        DataStream<ConnectedCarEvent> events = carData
                .map((String line) -> ConnectedCarEvent.fromString(line))
                .assignTimestampsAndWatermarks(new ConnectedCarAssigner());

        // sort events
        events.keyBy((ConnectedCarEvent event) -> event.carId)
                .process(new SortFunction())
                .print();

        env.execute("Sort Connected Car Events");
    }

public class ConnectedCarAssigner implements AssignerWithPunctuatedWatermarks<ConnectedCarEvent> {
    @Override
    public long extractTimestamp(ConnectedCarEvent event, long previousElementTimestamp) {
        return event.timestamp;
    }

    @Override
    public Watermark checkAndGetNextWatermark(ConnectedCarEvent event, long extractedTimestamp) {
        // simply emit a watermark with every event
        return new Watermark(extractedTimestamp - 30000);
    }
}
  • Here, DataStream’s assignTimestampsAndWatermarks method is used, and the timestampAndWatermarkAssigner set up implements the AssignmentWithpunctuatedTradeMarks interface.

Summary

  • If you use EventTime, you need to tell flink where to get the eventTime for each data. this usually tells flink eventTime; together with the generate watermarks operation. There are two ways, one is the internal processing of data stream source, the other is through the times tam assigner/watermark generator (In flink, timestamp assigners also define how to emit watermark, using milliseconds since 1970-01-01T00:00:00Z)
  • If defined in the source, that is, the collectWithTimestamp and emitmark methods of the SourceContext interface defined in the SourceFunction are used. the former is used to assign event timestamp and the latter is used to emitmark.
  • If it is defined outside the source, it is to set timestampAndWatermarkAssigner; through DataStream’s assignTimestampsAndWatermarks method. It has two types: AssignerWithPeriodicWatermarks (The getCurrentWatermark method is defined to return the current watermark;. The periodic interval parameter is set by env.getconfig (). setautowatermarkinterval (1000)); AssignerWithPunctuatedWatermarks(Defines the checkAndGetNextWatermark method, which is called after the extractTimestamp method is executed (When called, pass the extractedTimestamp `) just obtained through the method parameter.)

doc