Talk about flink’s AsyncWaitOperator

  flink

Order

This article mainly studies flink’s AsyncWaitOperator.

AsyncWaitOperator

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

@Internal
public class AsyncWaitOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, OperatorActions {
    private static final long serialVersionUID = 1L;

    private static final String STATE_NAME = "_async_wait_operator_state_";

    /** Capacity of the stream element queue. */
    private final int capacity;

    /** Output mode for this operator. */
    private final AsyncDataStream.OutputMode outputMode;

    /** Timeout for the async collectors. */
    private final long timeout;

    protected transient Object checkpointingLock;

    /** {@link TypeSerializer} for inputs while making snapshots. */
    private transient StreamElementSerializer<IN> inStreamElementSerializer;

    /** Recovered input stream elements. */
    private transient ListState<StreamElement> recoveredStreamElements;

    /** Queue to store the currently in-flight stream elements into. */
    private transient StreamElementQueue queue;

    /** Pending stream element which could not yet added to the queue. */
    private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

    private transient ExecutorService executor;

    /** Emitter for the completed stream element queue entries. */
    private transient Emitter<OUT> emitter;

    /** Thread running the emitter. */
    private transient Thread emitterThread;

    public AsyncWaitOperator(
            AsyncFunction<IN, OUT> asyncFunction,
            long timeout,
            int capacity,
            AsyncDataStream.OutputMode outputMode) {
        super(asyncFunction);
        chainingStrategy = ChainingStrategy.ALWAYS;

        Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
        this.capacity = capacity;

        this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

        this.timeout = timeout;
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);

        this.checkpointingLock = getContainingTask().getCheckpointLock();

        this.inStreamElementSerializer = new StreamElementSerializer<>(
            getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

        // create the operators executor for the complete operations of the queue entries
        this.executor = Executors.newSingleThreadExecutor();

        switch (outputMode) {
            case ORDERED:
                queue = new OrderedStreamElementQueue(
                    capacity,
                    executor,
                    this);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue(
                    capacity,
                    executor,
                    this);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }
    }

    @Override
    public void open() throws Exception {
        super.open();

        // create the emitter
        this.emitter = new Emitter<>(checkpointingLock, output, queue, this);

        // start the emitter thread
        this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
        emitterThread.setDaemon(true);
        emitterThread.start();

        // process stream elements from state, since the Emit thread will start as soon as all
        // elements from previous state are in the StreamElementQueue, we have to make sure that the
        // order to open all operators in the operator chain proceeds from the tail operator to the
        // head operator.
        if (recoveredStreamElements != null) {
            for (StreamElement element : recoveredStreamElements.get()) {
                if (element.isRecord()) {
                    processElement(element.<IN>asRecord());
                }
                else if (element.isWatermark()) {
                    processWatermark(element.asWatermark());
                }
                else if (element.isLatencyMarker()) {
                    processLatencyMarker(element.asLatencyMarker());
                }
                else {
                    throw new IllegalStateException("Unknown record type " + element.getClass() +
                        " encountered while opening the operator.");
                }
            }
            recoveredStreamElements = null;
        }

    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);

        if (timeout > 0L) {
            // register a timeout for this AsyncStreamRecordBufferEntry
            long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

            final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
                timeoutTimestamp,
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        userFunction.timeout(element.getValue(), streamRecordBufferEntry);
                    }
                });

            // Cancel the timer once we've completed the stream record buffer entry. This will remove
            // the register trigger task
            streamRecordBufferEntry.onComplete(
                (StreamElementQueueEntry<Collection<OUT>> value) -> {
                    timerFuture.cancel(true);
                },
                executor);
        }

        addAsyncBufferEntry(streamRecordBufferEntry);

        userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);

        addAsyncBufferEntry(watermarkBufferEntry);
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);

        ListState<StreamElement> partitionableState =
            getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
        partitionableState.clear();

        Collection<StreamElementQueueEntry<?>> values = queue.values();

        try {
            for (StreamElementQueueEntry<?> value : values) {
                partitionableState.add(value.getStreamElement());
            }

            // add the pending stream element queue entry if the stream element queue is currently full
            if (pendingStreamElementQueueEntry != null) {
                partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
            }
        } catch (Exception e) {
            partitionableState.clear();

            throw new Exception("Could not add stream element queue entries to operator state " +
                "backend of operator " + getOperatorName() + '.', e);
        }
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        recoveredStreamElements = context
            .getOperatorStateStore()
            .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));

    }

    @Override
    public void close() throws Exception {
        try {
            assert(Thread.holdsLock(checkpointingLock));

            while (!queue.isEmpty()) {
                // wait for the emitter thread to output the remaining elements
                // for that he needs the checkpointing lock and thus we have to free it
                checkpointingLock.wait();
            }
        }
        finally {
            Exception exception = null;

            try {
                super.close();
            } catch (InterruptedException interrupted) {
                exception = interrupted;

                Thread.currentThread().interrupt();
            } catch (Exception e) {
                exception = e;
            }

            try {
                // terminate the emitter, the emitter thread and the executor
                stopResources(true);
            } catch (InterruptedException interrupted) {
                exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

                Thread.currentThread().interrupt();
            } catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }

            if (exception != null) {
                LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
            }
        }
    }

    @Override
    public void dispose() throws Exception {
        Exception exception = null;

        try {
            super.dispose();
        } catch (InterruptedException interrupted) {
            exception = interrupted;

            Thread.currentThread().interrupt();
        } catch (Exception e) {
            exception = e;
        }

        try {
            stopResources(false);
        } catch (InterruptedException interrupted) {
            exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

            Thread.currentThread().interrupt();
        } catch (Exception e) {
            exception = ExceptionUtils.firstOrSuppressed(e, exception);
        }

        if (exception != null) {
            throw exception;
        }
    }

    private void stopResources(boolean waitForShutdown) throws InterruptedException {
        emitter.stop();
        emitterThread.interrupt();

        executor.shutdown();

        if (waitForShutdown) {
            try {
                if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();

                Thread.currentThread().interrupt();
            }

            /*
             * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
             * that the emitter thread can complete/react to the interrupt signal.
             */
            if (Thread.holdsLock(checkpointingLock)) {
                while (emitterThread.isAlive()) {
                    checkpointingLock.wait(100L);
                }
            }

            emitterThread.join();
        } else {
            executor.shutdownNow();
        }
    }

    private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        assert(Thread.holdsLock(checkpointingLock));

        pendingStreamElementQueueEntry = streamElementQueueEntry;

        while (!queue.tryPut(streamElementQueueEntry)) {
            // we wait for the emitter to notify us if the queue has space left again
            checkpointingLock.wait();
        }

        pendingStreamElementQueueEntry = null;
    }

    @Override
    public void failOperator(Throwable throwable) {
        getContainingTask().getEnvironment().failExternally(throwable);
    }
}
  • AsyncWaitOperator inherits the AbstractStreamOperator and covers the setup, open, initializeState, close, dispose methods of the AbstractStreamOperator. Implemented processElement, processWatermark, processLatencyMarker methods defined by OneInputStreamOperator interface. The failOperator method defined by OperatorActions is implemented.
  • The setup method uses executors.newsinglethreadexecutar () to create ExecutorService, and then creates different StreamElementQueue (OrderedStreamElementQueue or UnorderedStreamElementQueue); The open method uses Emitter to create and start AsyncIO-Emitter-Thread, and also processes recoveredStreamElements, calling processElement, processWatermark, processLatencyMarker methods respectively according to different types.
  • The processElement method first registers a timer according to timeout, executes userFunction.timeout in the ProcessingTimeCallback method, then adds StreamRecordQueueEntry StreamElementQueue, and finally triggers userFunction.asyncInvoke; ; The close and dispose methods call the stopResources method to close the resource, except that the waitForShutdown parameter passes different values, the close method passes true, and the dispose method passes false

Emitter

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/Emitter.java

@Internal
public class Emitter<OUT> implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);

    /** Lock to hold before outputting. */
    private final Object checkpointLock;

    /** Output for the watermark elements. */
    private final Output<StreamRecord<OUT>> output;

    /** Queue to consume the async results from. */
    private final StreamElementQueue streamElementQueue;

    private final OperatorActions operatorActions;

    /** Output for stream records. */
    private final TimestampedCollector<OUT> timestampedCollector;

    private volatile boolean running;

    public Emitter(
            final Object checkpointLock,
            final Output<StreamRecord<OUT>> output,
            final StreamElementQueue streamElementQueue,
            final OperatorActions operatorActions) {

        this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
        this.output = Preconditions.checkNotNull(output, "output");
        this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
        this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

        this.timestampedCollector = new TimestampedCollector<>(this.output);
        this.running = true;
    }

    @Override
    public void run() {
        try {
            while (running) {
                LOG.debug("Wait for next completed async stream element result.");
                AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

                output(streamElementEntry);
            }
        } catch (InterruptedException e) {
            if (running) {
                operatorActions.failOperator(e);
            } else {
                // Thread got interrupted which means that it should shut down
                LOG.debug("Emitter thread got interrupted, shutting down.");
            }
        } catch (Throwable t) {
            operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
                "unexpected throwable.", t));
        }
    }

    private void output(AsyncResult asyncResult) throws InterruptedException {
        if (asyncResult.isWatermark()) {
            synchronized (checkpointLock) {
                AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

                LOG.debug("Output async watermark.");
                output.emitWatermark(asyncWatermarkResult.getWatermark());

                // remove the peeked element from the async collector buffer so that it is no longer
                // checkpointed
                streamElementQueue.poll();

                // notify the main thread that there is again space left in the async collector
                // buffer
                checkpointLock.notifyAll();
            }
        } else {
            AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();

            if (streamRecordResult.hasTimestamp()) {
                timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
            } else {
                timestampedCollector.eraseTimestamp();
            }

            synchronized (checkpointLock) {
                LOG.debug("Output async stream element collection result.");

                try {
                    Collection<OUT> resultCollection = streamRecordResult.get();

                    if (resultCollection != null) {
                        for (OUT result : resultCollection) {
                            timestampedCollector.collect(result);
                        }
                    }
                } catch (Exception e) {
                    operatorActions.failOperator(
                        new Exception("An async function call terminated with an exception. " +
                            "Failing the AsyncWaitOperator.", e));
                }

                // remove the peeked element from the async collector buffer so that it is no longer
                // checkpointed
                streamElementQueue.poll();

                // notify the main thread that there is again space left in the async collector
                // buffer
                checkpointLock.notifyAll();
            }
        }
    }

    public void stop() {
        running = false;
    }
}
  • Emitter implements the Runnable interface, which is mainly responsible for taking out the element from StreamElementQueue and outputting it to TimestampedCollector.
  • Emitter’s run method is to repeatedly call streamelementqueue.peerlockingly () to block obtaining AsyncResult, and then call the output method to output the result after obtaining it.
  • Emitter’s output method does different processing according to whether asyncResult is a watermark. if it is not a watermark, it will output the result through timestampedCollector.collect if there is an exception, it will call operatorActions.failOperator to pass the exception, and finally it will call streamElementQueue.poll () to remove the element at the head of the queue.

StreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java

@Internal
public interface StreamElementQueue {

    <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

    <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

    AsyncResult peekBlockingly() throws InterruptedException;

    AsyncResult poll() throws InterruptedException;

    Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;

    boolean isEmpty();

    int size();
}
  • StreamElementQueue interface mainly defines the interface of blocking stream element queue to be used by AsyncWaitOperator; It defines put, tryPut, peekBlockingly, poll, values, isEmpty, size methods; The StreamElementQueue interface has two subclasses: UnorderedStreamElementQueue and OrderedStreamElementQueue; . Queue element type is StreamElementQueueEntry

UnorderedStreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java

@Internal
public class UnorderedStreamElementQueue implements StreamElementQueue {

    private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Executor to run the onComplete callbacks. */
    private final Executor executor;

    /** OperatorActions to signal the owning operator a failure. */
    private final OperatorActions operatorActions;

    /** Queue of uncompleted stream element queue entries segmented by watermarks. */
    private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;

    /** Queue of completed stream element queue entries. */
    private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;

    /** First (chronologically oldest) uncompleted set of stream element queue entries. */
    private Set<StreamElementQueueEntry<?>> firstSet;

    // Last (chronologically youngest) uncompleted set of stream element queue entries. New
    // stream element queue entries are inserted into this set.
    private Set<StreamElementQueueEntry<?>> lastSet;
    private volatile int numberEntries;

    /** Locks and conditions for the blocking queue. */
    private final ReentrantLock lock;
    private final Condition notFull;
    private final Condition hasCompletedEntries;

    public UnorderedStreamElementQueue(
            int capacity,
            Executor executor,
            OperatorActions operatorActions) {

        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
        this.capacity = capacity;

        this.executor = Preconditions.checkNotNull(executor, "executor");

        this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

        this.uncompletedQueue = new ArrayDeque<>(capacity);
        this.completedQueue = new ArrayDeque<>(capacity);

        this.firstSet = new HashSet<>(capacity);
        this.lastSet = firstSet;

        this.numberEntries = 0;

        this.lock = new ReentrantLock();
        this.notFull = lock.newCondition();
        this.hasCompletedEntries = lock.newCondition();
    }

    @Override
    public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (numberEntries >= capacity) {
                notFull.await();
            }

            addEntry(streamElementQueueEntry);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (numberEntries < capacity) {
                addEntry(streamElementQueueEntry);

                LOG.debug("Put element into unordered stream element queue. New filling degree " +
                    "({}/{}).", numberEntries, capacity);

                return true;
            } else {
                LOG.debug("Failed to put element into unordered stream element queue because it " +
                    "was full ({}/{}).", numberEntries, capacity);

                return false;
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public AsyncResult peekBlockingly() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (completedQueue.isEmpty()) {
                hasCompletedEntries.await();
            }

            LOG.debug("Peeked head element from unordered stream element queue with filling degree " +
                "({}/{}).", numberEntries, capacity);

            return completedQueue.peek();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public AsyncResult poll() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (completedQueue.isEmpty()) {
                hasCompletedEntries.await();
            }

            numberEntries--;
            notFull.signalAll();

            LOG.debug("Polled element from unordered stream element queue. New filling degree " +
                "({}/{}).", numberEntries, capacity);

            return completedQueue.poll();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[numberEntries];

            array = completedQueue.toArray(array);

            int counter = completedQueue.size();

            for (StreamElementQueueEntry<?> entry: firstSet) {
                array[counter] = entry;
                counter++;
            }

            for (Set<StreamElementQueueEntry<?>> asyncBufferEntries : uncompletedQueue) {

                for (StreamElementQueueEntry<?> streamElementQueueEntry : asyncBufferEntries) {
                    array[counter] = streamElementQueueEntry;
                    counter++;
                }
            }

            return Arrays.asList(array);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean isEmpty() {
        return numberEntries == 0;
    }

    @Override
    public int size() {
        return numberEntries;
    }

    public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (firstSet.remove(streamElementQueueEntry)) {
                completedQueue.offer(streamElementQueueEntry);

                while (firstSet.isEmpty() && firstSet != lastSet) {
                    firstSet = uncompletedQueue.poll();

                    Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();

                    while (it.hasNext()) {
                        StreamElementQueueEntry<?> bufferEntry = it.next();

                        if (bufferEntry.isDone()) {
                            completedQueue.offer(bufferEntry);
                            it.remove();
                        }
                    }
                }

                LOG.debug("Signal unordered stream element queue has completed entries.");
                hasCompletedEntries.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
        assert(lock.isHeldByCurrentThread());

        if (streamElementQueueEntry.isWatermark()) {
            lastSet = new HashSet<>(capacity);

            if (firstSet.isEmpty()) {
                firstSet.add(streamElementQueueEntry);
            } else {
                Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
                watermarkSet.add(streamElementQueueEntry);
                uncompletedQueue.offer(watermarkSet);
            }
            uncompletedQueue.offer(lastSet);
        } else {
            lastSet.add(streamElementQueueEntry);
        }

        streamElementQueueEntry.onComplete(
            (StreamElementQueueEntry<T> value) -> {
                try {
                    onCompleteHandler(value);
                } catch (InterruptedException e) {
                    // The accept executor thread got interrupted. This is probably cause by
                    // the shutdown of the executor.
                    LOG.debug("AsyncBufferEntry could not be properly completed because the " +
                        "executor thread has been interrupted.", e);
                } catch (Throwable t) {
                    operatorActions.failOperator(new Exception("Could not complete the " +
                        "stream element queue entry: " + value + '.', t));
                }
            },
            executor);

        numberEntries++;
    }
}
  • UnorderedStreamElementQueue implements the StreamElementQueue interface. the order of its emit results is unordered. two ArrayDeque are used inside, one is uncompletedQueue and the other is completedQueue
  • The peekBlockingly method first determines whether the completedQueue has elements, and if not, executes hasCompletedEntries.await (), and if not, executes completedQueue.peek(); (); Put and tryPut both call the addEntry method, which adds elements to the uncompletedQueue and then registers an onComplete handler for each streamelementqueue’s oncomplete method at the same time.
  • The onCompleteHandler method removes the completed streamelementqueue from the uncompletedQueue and adds it to the completedQueue.

OrderedStreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java

@Internal
public class OrderedStreamElementQueue implements StreamElementQueue {

    private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Executor to run the onCompletion callback. */
    private final Executor executor;

    /** Operator actions to signal a failure to the operator. */
    private final OperatorActions operatorActions;

    /** Lock and conditions for the blocking queue. */
    private final ReentrantLock lock;
    private final Condition notFull;
    private final Condition headIsCompleted;

    /** Queue for the inserted StreamElementQueueEntries. */
    private final ArrayDeque<StreamElementQueueEntry<?>> queue;

    public OrderedStreamElementQueue(
            int capacity,
            Executor executor,
            OperatorActions operatorActions) {

        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
        this.capacity = capacity;

        this.executor = Preconditions.checkNotNull(executor, "executor");

        this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

        this.lock = new ReentrantLock(false);
        this.headIsCompleted = lock.newCondition();
        this.notFull = lock.newCondition();

        this.queue = new ArrayDeque<>(capacity);
    }

    @Override
    public AsyncResult peekBlockingly() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (queue.isEmpty() || !queue.peek().isDone()) {
                headIsCompleted.await();
            }

            LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
                "({}/{}).", queue.size(), capacity);

            return queue.peek();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public AsyncResult poll() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (queue.isEmpty() || !queue.peek().isDone()) {
                headIsCompleted.await();
            }

            notFull.signalAll();

            LOG.debug("Polled head element from ordered stream element queue. New filling degree " +
                "({}/{}).", queue.size() - 1, capacity);

            return queue.poll();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[queue.size()];

            array = queue.toArray(array);

            return Arrays.asList(array);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (queue.size() >= capacity) {
                notFull.await();
            }

            addEntry(streamElementQueueEntry);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (queue.size() < capacity) {
                addEntry(streamElementQueueEntry);

                LOG.debug("Put element into ordered stream element queue. New filling degree " +
                    "({}/{}).", queue.size(), capacity);

                return true;
            } else {
                LOG.debug("Failed to put element into ordered stream element queue because it " +
                    "was full ({}/{}).", queue.size(), capacity);

                return false;
            }
        } finally {
            lock.unlock();
        }
    }

    private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
        assert(lock.isHeldByCurrentThread());

        queue.addLast(streamElementQueueEntry);

        streamElementQueueEntry.onComplete(
            (StreamElementQueueEntry<T> value) -> {
                try {
                    onCompleteHandler(value);
                } catch (InterruptedException e) {
                    // we got interrupted. This indicates a shutdown of the executor
                    LOG.debug("AsyncBufferEntry could not be properly completed because the " +
                        "executor thread has been interrupted.", e);
                } catch (Throwable t) {
                    operatorActions.failOperator(new Exception("Could not complete the " +
                        "stream element queue entry: " + value + '.', t));
                }
            },
            executor);
    }

    private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (!queue.isEmpty() && queue.peek().isDone()) {
                LOG.debug("Signal ordered stream element queue has completed head element.");
                headIsCompleted.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}
  • OrderedStreamElementqueue implements the StreamElementQueue interface, which emit results in an orderly manner and has a Queue of ArrayDeque type inside it
  • The peekBlockingly method first determines whether the queue has an element and is executed. If it does not execute headIsCompleted.await (), then it executes queue. PEEK (); Put and tryPut both call the addEntry method, which executes queue.addlast (streamelementqueue) and then registers an onComplete handler for each streamelementqueue’s oncomplete method at the same time.
  • The onCompleteHandler method detects whether the element that completed execution is the first element of the queue, and if so, executes headIsCompleted.signalAll ()

AsyncResult

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java

@Internal
public interface AsyncResult {

    boolean isWatermark();

    boolean isResultCollection();

    AsyncWatermarkResult asWatermark();

    <T> AsyncCollectionResult<T> asResultCollection();
}
  • The AsyncResult interface defines the method to be implemented for the asynchronous return of the element of StreamElementQueue. the async result may be a watermark or a real result.

StreamElementQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java

@Internal
public abstract class StreamElementQueueEntry<T> implements AsyncResult {

    private final StreamElement streamElement;

    public StreamElementQueueEntry(StreamElement streamElement) {
        this.streamElement = Preconditions.checkNotNull(streamElement);
    }

    public StreamElement getStreamElement() {
        return streamElement;
    }

    public boolean isDone() {
        return getFuture().isDone();
    }

    public void onComplete(
            final Consumer<StreamElementQueueEntry<T>> completeFunction,
            Executor executor) {
        final StreamElementQueueEntry<T> thisReference = this;

        getFuture().whenCompleteAsync(
            // call the complete function for normal completion as well as exceptional completion
            // see FLINK-6435
            (value, throwable) -> completeFunction.accept(thisReference),
            executor);
    }

    protected abstract CompletableFuture<T> getFuture();

    @Override
    public final boolean isWatermark() {
        return AsyncWatermarkResult.class.isAssignableFrom(getClass());
    }

    @Override
    public final boolean isResultCollection() {
        return AsyncCollectionResult.class.isAssignableFrom(getClass());
    }

    @Override
    public final AsyncWatermarkResult asWatermark() {
        return (AsyncWatermarkResult) this;
    }

    @Override
    public final <T> AsyncCollectionResult<T> asResultCollection() {
        return (AsyncCollectionResult<T>) this;
    }
}
  • StreamElementQueueEntry implements the AsyncResult interface, which defines the onComplete method for callback processing when the result is completed, and also defines the abstract method getFuture for subclass implementation. It has two subclasses, WatermarkQueueEntry and StreamRecordQueueEntry.

WatermarkQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java

@Internal
public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {

    private final CompletableFuture<Watermark> future;

    public WatermarkQueueEntry(Watermark watermark) {
        super(watermark);

        this.future = CompletableFuture.completedFuture(watermark);
    }

    @Override
    public Watermark getWatermark() {
        return (Watermark) getStreamElement();
    }

    @Override
    protected CompletableFuture<Watermark> getFuture() {
        return future;
    }
}
  • WatermarkQueueEntry inherits StreamElementQueueEntry, whose element type is Watermark, and implements AsyncWatermarkResult interface at the same time.

StreamRecordQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar! /org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java

@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
    implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {

    /** Timestamp information. */
    private final boolean hasTimestamp;
    private final long timestamp;

    /** Future containing the collection result. */
    private final CompletableFuture<Collection<OUT>> resultFuture;

    public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
        super(streamRecord);

        hasTimestamp = streamRecord.hasTimestamp();
        timestamp = streamRecord.getTimestamp();

        resultFuture = new CompletableFuture<>();
    }

    @Override
    public boolean hasTimestamp() {
        return hasTimestamp;
    }

    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public Collection<OUT> get() throws Exception {
        return resultFuture.get();
    }

    @Override
    protected CompletableFuture<Collection<OUT>> getFuture() {
        return resultFuture;
    }

    @Override
    public void complete(Collection<OUT> result) {
        resultFuture.complete(result);
    }

    @Override
    public void completeExceptionally(Throwable error) {
        resultFuture.completeExceptionally(error);
    }
}
  • StreamRecordQueueEntry inherits StreamElementQueue and implements AsyncCollectionResult and ResultFuture interfaces at the same time.

Summary

  • AsyncWaitOperator inherits the AbstractStreamOperator and covers the setup, open, initializeState, close, dispose methods of the AbstractStreamOperator. Implemented processElement, processWatermark, processLatencyMarker methods defined by OneInputStreamOperator interface. The failOperator method defined by OperatorActions is implemented. The ope n method uses Emitter to create and start AsyncIO-Emitter-Thread.
  • Emitter implements the Runnable interface, which is mainly responsible for taking out the element from StreamElementQueue and outputting it to TimestampedCollector; ; The run method is to call StreamElementQueue. PeekblockingGly () repeatedly to block obtaining AsyncResult. After obtaining the result, the output method is called to output the result.
  • StreamElementQueue interface mainly defines the interface of blocking stream element queue to be used by AsyncWaitOperator; It defines put, tryPut, peekBlockingly, poll, values, isEmpty, size methods; The StreamElementQueue interface has two subclasses: UnorderedStreamElementQueue and OrderedStreamElementQueue; . The queue element type is streamelementqueue, which implements the AsyncResult interface, defines the onComplete method for callback processing when the result is completed, and also defines the abstract method getFuture for subclass implementation. It has two subclasses, WatermarkQueueEntry and StreamRecordQueueEntry.

doc