Talk about flink’s Async I/O

  flink

Order

This article mainly studies flink’s async i/o.

Example

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
  • This example shows the basic usage of Flink Async I/O. Firstly, the AsyncFunction interface is implemented to write asynchronous request logic and set the result or exception to resultFuture. Secondly, the AsyncFunction is applied to DataStream as transformation; by using the unorderedWait or orderedWait method of AsyncDataStream. The unorderedWait or orderedWait of AsyncDataStream has two parameters about async operation, one is the timeout parameter used to set the timeout time of async, the other is the capacity parameter used to specify the maximum number allowed at the same time (erupt simultaneouslyAsync request is executing

AsyncFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/async/AsyncFunction.java

/**
 * A function to trigger Async I/O operation.
 *
 * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
 * the result can be collected by calling {@link ResultFuture#complete}. For each async
 * operation, its context is stored in the operator immediately after invoking
 * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
 *
 * <p>{@link ResultFuture} can be passed into callbacks or futures to collect the result data.
 * An error can also be propagate to the async IO operator by
 * {@link ResultFuture#completeExceptionally(Throwable)}.
 *
 * <p>Callback example usage:
 *
 * <pre>{@code
 * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
 *
 *   public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
 *     HBaseCallback cb = new HBaseCallback(result);
 *     Get get = new Get(Bytes.toBytes(row));
 *     hbase.asyncGet(get, cb);
 *   }
 * }
 * }</pre>
 *
 * <p>Future example usage:
 *
 * <pre>{@code
 * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
 *
 *   public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
 *     Get get = new Get(Bytes.toBytes(row));
 *     ListenableFuture<Result> future = hbase.asyncGet(get);
 *     Futures.addCallback(future, new FutureCallback<Result>() {
 *       public void onSuccess(Result result) {
 *         List<String> ret = process(result);
 *         result.complete(ret);
 *       }
 *       public void onFailure(Throwable thrown) {
 *         result.completeExceptionally(thrown);
 *       }
 *     });
 *   }
 * }
 * }</pre>
 *
 * @param <IN> The type of the input elements.
 * @param <OUT> The type of the returned elements.
 */
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {

    /**
     * Trigger async operation for each stream input.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     * @exception Exception in case of a user code error. An exception will make the task fail and
     * trigger fail-over process.
     */
    void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    /**
     * {@link AsyncFunction#asyncInvoke} timeout occurred.
     * By default, the result future is exceptionally completed with a timeout exception.
     *
     * @param input element coming from an upstream task
     * @param resultFuture to be completed with the result data
     */
    default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        resultFuture.completeExceptionally(
            new TimeoutException("Async function call has timed out."));
    }

}
  • The AsyncFunction interface inherits the Function and defines the asyncInvoke method and a default timeout method. The asyncInvoke method executes asy nchronous logic, and then sets the result to ResultFuture through ResultFuture.complete, and passes the result to ResultFuture through resultfuture.completeceptionally (throwable) if the exception occurs.

RichAsyncFunction

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

@PublicEvolving
public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {

    private static final long serialVersionUID = 3858030061138121840L;

    @Override
    public void setRuntimeContext(RuntimeContext runtimeContext) {
        Preconditions.checkNotNull(runtimeContext);

        if (runtimeContext instanceof IterationRuntimeContext) {
            super.setRuntimeContext(
                new RichAsyncFunctionIterationRuntimeContext(
                    (IterationRuntimeContext) runtimeContext));
        } else {
            super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));
        }
    }

    @Override
    public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

    //......
}
  • RichAsyncFunction inherits AbstractRichFunction and declares to implement AsyncFunction interface. It does not implement asyncInvoke but is implemented by subclasses. It covers the setRuntimeContext method, which is wrapped here with richasynchnoctivationruntimecontext or richasynchnoctivationruntimecontext.

RichAsyncFunctionRuntimeContext

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

    /**
     * A wrapper class for async function's {@link RuntimeContext}. The async function runtime
     * context only supports basic operations which are thread safe. Consequently, state access,
     * accumulators, broadcast variables and the distributed cache are disabled.
     */
    private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {
        private final RuntimeContext runtimeContext;

        RichAsyncFunctionRuntimeContext(RuntimeContext context) {
            runtimeContext = Preconditions.checkNotNull(context);
        }

        @Override
        public String getTaskName() {
            return runtimeContext.getTaskName();
        }

        @Override
        public MetricGroup getMetricGroup() {
            return runtimeContext.getMetricGroup();
        }

        @Override
        public int getNumberOfParallelSubtasks() {
            return runtimeContext.getNumberOfParallelSubtasks();
        }

        @Override
        public int getMaxNumberOfParallelSubtasks() {
            return runtimeContext.getMaxNumberOfParallelSubtasks();
        }

        @Override
        public int getIndexOfThisSubtask() {
            return runtimeContext.getIndexOfThisSubtask();
        }

        @Override
        public int getAttemptNumber() {
            return runtimeContext.getAttemptNumber();
        }

        @Override
        public String getTaskNameWithSubtasks() {
            return runtimeContext.getTaskNameWithSubtasks();
        }

        @Override
        public ExecutionConfig getExecutionConfig() {
            return runtimeContext.getExecutionConfig();
        }

        @Override
        public ClassLoader getUserCodeClassLoader() {
            return runtimeContext.getUserCodeClassLoader();
        }

        // -----------------------------------------------------------------------------------
        // Unsupported operations
        // -----------------------------------------------------------------------------------

        @Override
        public DistributedCache getDistributedCache() {
            throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions.");
        }

        @Override
        public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
            throw new UnsupportedOperationException("State is not supported in rich async functions.");
        }

        @Override
        public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
            throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
        }

        @Override
        public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
            throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
        }

        @Override
        public Map<String, Accumulator<?, ?>> getAllAccumulators() {
            throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
        }

        @Override
        public IntCounter getIntCounter(String name) {
            throw new UnsupportedOperationException("Int counters are not supported in rich async functions.");
        }

        @Override
        public LongCounter getLongCounter(String name) {
            throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
        }

        @Override
        public DoubleCounter getDoubleCounter(String name) {
            throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
        }

        @Override
        public Histogram getHistogram(String name) {
            throw new UnsupportedOperationException("Histograms are not supported in rich async functions.");
        }

        @Override
        public boolean hasBroadcastVariable(String name) {
            throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
        }

        @Override
        public <RT> List<RT> getBroadcastVariable(String name) {
            throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
        }

        @Override
        public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
            throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
        }
    }
  • RichAsyncFunctionRuntimeContext implements the RuntimeContext interface, which delegates some methods to the RuntimeContext, and all other Unsupported methods override and throw UnsupportedOperationException.

RichAsyncFunctionIterationRuntimeContext

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java

    private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {

        private final IterationRuntimeContext iterationRuntimeContext;

        RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) {
            super(iterationRuntimeContext);

            this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext);
        }

        @Override
        public int getSuperstepNumber() {
            return iterationRuntimeContext.getSuperstepNumber();
        }

        // -----------------------------------------------------------------------------------
        // Unsupported operations
        // -----------------------------------------------------------------------------------

        @Override
        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
            throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
        }

        @Override
        public <T extends Value> T getPreviousIterationAggregate(String name) {
            throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
        }
    }
  • RichAsyncFunctionRunTimeContext inherits RichAsyncFunctionRunTimeContext and implements IterationRuntimeContext interface. It passes the getSuperstepN umber method to the IterationRuntimeContext for processing, and then overwrites the getIterationAggregator and getPreviousIterationAggregate methods to throw UnsupportedOperationException.

AsyncDataStream

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/datastream/AsyncDataStream.java

@PublicEvolving
public class AsyncDataStream {

    /**
     * Output mode for asynchronous operations.
     */
    public enum OutputMode { ORDERED, UNORDERED }

    private static final int DEFAULT_QUEUE_CAPACITY = 100;

    private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            int bufSize,
            OutputMode mode) {

        TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
            func,
            AsyncFunction.class,
            0,
            1,
            new int[]{1, 0},
            in.getType(),
            Utils.getCallLocationName(),
            true);

        // create transform
        AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
            in.getExecutionEnvironment().clean(func),
            timeout,
            bufSize,
            mode);

        return in.transform("async wait operator", outTypeInfo, operator);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit) {
        return addOperator(
            in,
            func,
            timeUnit.toMillis(timeout),
            DEFAULT_QUEUE_CAPACITY,
            OutputMode.UNORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit) {
        return addOperator(
            in,
            func,
            timeUnit.toMillis(timeout),
            DEFAULT_QUEUE_CAPACITY,
            OutputMode.ORDERED);
    }
}
  • AsyncDataStream provides two methods, unorderedWait and orderedWait, to apply AsyncFunction to DataStream.
  • The unorderedWait and orderedWait methods include those with or without the capacity parameter. if there is no capacity parameter, DEFAULT_QUEUE_CAPACITY, i.e. 100, is used by default. These methods are finally implemented by calling the addOperator private method, which uses AsyncWaitOperator; ; The unorderedWait and orderedWait methods both have timeout parameters to specify the timeout period for waiting for the async operation to complete.
  • AsyncDataStream provides two OutputMode, among which UNORDERED is unordered, i.e. emit results once async operation is completed, and this mode has the lowest delay and load when using timecharacterstics.processing time; ORDERED is orderly, i.e. emit results are in the input order of element. In order to ensure orderly operator needs to buffer data, thus causing certain delay and load.

Summary

  • Flink provides Asynchronous I/O API for external data access, which is used to improve the throughput of streaming. its basic use is to define a function that implements the AsyncFunction interface, and then use the unorderedWait or orderedWait method of AsyncDataStream to apply AsyncFunction to DataStream as transformation.
  • The AsyncFunction interface inherits the Function and defines the asyncInvoke method and a default timeout method. The asyncInvoke method executes asy nchronous logic, and then sets the result or exception to ResultFuture through ResultFuture.complete, and passes the result or exception to resultfuture through resultfuture.completeceptionally (throwable) if the exception occurs; RichAsyncFunction inherits AbstractRichFunction and declares to implement AsyncFunction interface. It does not implement asyncInvoke but is implemented by subclasses. It covers the setRuntimeContext method, which is wrapped here with richasynchnoctivationruntimecontext or richasynchnoctivationruntimecontext.
  • The unorderedWait or orderedWait of AsyncDataStream has two parameters about async operation, one is the timeout parameter used to set the timeout time of async, the other is the capacity parameter used to specify the maximum number allowed at the same time (erupt simultaneouslyAsync request is executing; AsyncDataStream provides two OutputMode, among which UNORDERED is unordered, i.e. emit results once async operation is completed, and this mode has the lowest delay and load when using timecharacterstics.processing time; ORDERED is orderly, i.e. emit results are in the input order of element. In order to ensure orderly operator needs to buffer data, thus causing certain delay and load.

doc