Talk about flink’s ProcessFunction

  flink

Order

This article mainly studies flink’s ProcessFunction

Example

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;


// the source data stream
DataStream<Tuple2<String, String>> stream = ...;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
    .keyBy(0)
    .process(new CountWithTimeoutFunction());

/**
 * The data type stored in the state
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}
  • This example shows how to use keyed state and timer; in ProcessFunction; The ProcessFunction used by the processmethod is CountWithTimeoutFunction.
  • The open method of CountWithTimeoutFunction creates a ValueState; of type CountWithTimestamp; The ValueState will be updated in the processElement method to record the number of element per key and the last access time, and then register an EventTimer to arrive 60 seconds after the current EventTimeTimer.
  • Ontimer is used to respond to Timer. It will judge emit related data if the key is not update within 60 seconds.

ProcessFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/ProcessFunction.java

@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

    public abstract class Context {

        public abstract Long timestamp();

        public abstract TimerService timerService();

        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }

    public abstract class OnTimerContext extends Context {

        public abstract TimeDomain timeDomain();
    }

}
  • ProcessFunction inherits AbstractRichFunction (Keyed state can be obtained through the RuntimeContext.), which defines the abstract method processElement and abstract classes Context, OnTimerContext
  • There are three abstract methods in the Context: timestamp, timerService and output; . OnTimerContext inherits the Context, which defines the timeDomain method.
  • The ProcessFunction also defines the onTimer method, which is used to respond to timer triggered by TimerService.

Summary

  • ProcessFunction is a low-level stream processing operation, which is equivalent to FlatMapFunction that can access keyed state and timer. when keyed state or timer is to be used, ProcessFunction can be used.
  • ProcessFunction inherits AbstractRichFunction (Keyed state can be obtained through the RuntimeContext.), which defines the abstract method processElement and abstract classes Context, OnTimerContext
  • There are three abstract methods in the Context: timestamp, timerService and output; . OnTimerContext inherits the Context, which defines the timeDomain method; The ProcessFunction also defines the onTimer method, which is used to respond to timer triggered by TimerService.

doc