FluxSink example and analysis

  reactor

Order

This paper mainly studies the mechanism of FluxSink

FluxSink

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/FluxSink.java

/**
 * Wrapper API around a downstream Subscriber for emitting any number of
 * next signals followed by zero or one onError/onComplete.
 * <p>
 * @param <T> the value type
 */
public interface FluxSink<T> {

    /**
     * @see Subscriber#onComplete()
     */
    void complete();

    /**
     * Return the current subscriber {@link Context}.
     * <p>
     *   {@link Context} can be enriched via {@link Flux#subscriberContext(Function)}
     *   operator or directly by a child subscriber overriding
     *   {@link CoreSubscriber#currentContext()}
     *
     * @return the current subscriber {@link Context}.
     */
    Context currentContext();

    /**
     * @see Subscriber#onError(Throwable)
     * @param e the exception to signal, not null
     */
    void error(Throwable e);

    /**
     * Try emitting, might throw an unchecked exception.
     * @see Subscriber#onNext(Object)
     * @param t the value to emit, not null
     */
    FluxSink<T> next(T t);

    /**
     * The current outstanding request amount.
     * @return the current outstanding request amount
     */
    long requestedFromDownstream();

    /**
     * Returns true if the downstream cancelled the sequence.
     * @return true if the downstream cancelled the sequence
     */
    boolean isCancelled();

    /**
     * Attaches a {@link LongConsumer} to this {@link FluxSink} that will be notified of
     * any request to this sink.
     * <p>
     * For push/pull sinks created using {@link Flux#create(java.util.function.Consumer)}
     * or {@link Flux#create(java.util.function.Consumer, FluxSink.OverflowStrategy)},
     * the consumer
     * is invoked for every request to enable a hybrid backpressure-enabled push/pull model.
     * When bridging with asynchronous listener-based APIs, the {@code onRequest} callback
     * may be used to request more data from source if required and to manage backpressure
     * by delivering data to sink only when requests are pending.
     * <p>
     * For push-only sinks created using {@link Flux#push(java.util.function.Consumer)}
     * or {@link Flux#push(java.util.function.Consumer, FluxSink.OverflowStrategy)},
     * the consumer is invoked with an initial request of {@code Long.MAX_VALUE} when this method
     * is invoked.
     *
     * @param consumer the consumer to invoke on each request
     * @return {@link FluxSink} with a consumer that is notified of requests
     */
    FluxSink<T> onRequest(LongConsumer consumer);

    /**
     * Associates a disposable resource with this FluxSink
     * that will be disposed in case the downstream cancels the sequence
     * via {@link org.reactivestreams.Subscription#cancel()}.
     * @param d the disposable callback to use
     * @return the {@link FluxSink} with resource to be disposed on cancel signal
     */
    FluxSink<T> onCancel(Disposable d);

    /**
     * Associates a disposable resource with this FluxSink
     * that will be disposed on the first terminate signal which may be
     * a cancel, complete or error signal.
     * @param d the disposable callback to use
     * @return the {@link FluxSink} with resource to be disposed on first terminate signal
     */
    FluxSink<T> onDispose(Disposable d);

    /**
     * Enumeration for backpressure handling.
     */
    enum OverflowStrategy {
        /**
         * Completely ignore downstream backpressure requests.
         * <p>
         * This may yield {@link IllegalStateException} when queues get full downstream.
         */
        IGNORE,
        /**
         * Signal an {@link IllegalStateException} when the downstream can't keep up
         */
        ERROR,
        /**
         * Drop the incoming signal if the downstream is not ready to receive it.
         */
        DROP,
        /**
         * Downstream will get only the latest signals from upstream.
         */
        LATEST,
        /**
         * Buffer all signals if the downstream can't keep up.
         * <p>
         * Warning! This does unbounded buffering and may lead to {@link OutOfMemoryError}.
         */
        BUFFER
    }
}

Note that OverflowStrategy.BUFFER uses an unbounded queue and requires extra attention to OOM issues

Example

    public static void main(String[] args) throws InterruptedException {
        final Flux<Integer> flux = Flux.<Integer> create(fluxSink -> {
            //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink
            LOGGER.info("sink:{}",fluxSink.getClass());
            while (true) {
                LOGGER.info("sink next");
                fluxSink.next(ThreadLocalRandom.current().nextInt());
            }
        }, FluxSink.OverflowStrategy.BUFFER);

        //NOTE flux:class reactor.core.publisher.FluxCreate,prefetch:-1
        LOGGER.info("flux:{},prefetch:{}",flux.getClass(),flux.getPrefetch());

        flux.subscribe(e -> {
            LOGGER.info("subscribe:{}",e);
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });

        TimeUnit.MINUTES.sleep(20);
    }

Here create creates reactor.core.publisher.fluxcreate, and its sink is reactor.core.publisher.fluxcreate $ serializedsink.

Flux.subscribe

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/Flux.java

    /**
     * Subscribe {@link Consumer} to this {@link Flux} that will respectively consume all the
     * elements in the sequence, handle errors, react to completion, and request upon subscription.
     * It will let the provided {@link Subscription subscriptionConsumer}
     * request the adequate amount of data, or request unbounded demand
     * {@code Long.MAX_VALUE} if no such consumer is provided.
     * <p>
     * For a passive version that observe and forward incoming data see {@link #doOnNext(java.util.function.Consumer)},
     * {@link #doOnError(java.util.function.Consumer)}, {@link #doOnComplete(Runnable)}
     * and {@link #doOnSubscribe(Consumer)}.
     * <p>For a version that gives you more control over backpressure and the request, see
     * {@link #subscribe(Subscriber)} with a {@link BaseSubscriber}.
     * <p>
     * Keep in mind that since the sequence can be asynchronous, this will immediately
     * return control to the calling thread. This can give the impression the consumer is
     * not invoked when executing in a main thread or a unit test for instance.
     *
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/subscribecomplete.png" alt="">
     *
     * @param consumer the consumer to invoke on each value
     * @param errorConsumer the consumer to invoke on error signal
     * @param completeConsumer the consumer to invoke on complete signal
     * @param subscriptionConsumer the consumer to invoke on subscribe signal, to be used
     * for the initial {@link Subscription#request(long) request}, or null for max request
     *
     * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription}
     */
    public final Disposable subscribe(
            @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer,
            @Nullable Consumer<? super Subscription> subscriptionConsumer) {
        return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
                completeConsumer,
                subscriptionConsumer));
    }

    @Override
    public final void subscribe(Subscriber<? super T> actual) {
        onLastAssembly(this).subscribe(Operators.toCoreSubscriber(actual));
    }

LambdaSubscriber was created and FluxCreate.subscribe was finally called.

FluxCreate.subscribe

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/FluxCreate.java

    public void subscribe(CoreSubscriber<? super T> actual) {
        BaseSink<T> sink = createSink(actual, backpressure);

        actual.onSubscribe(sink);
        try {
            source.accept(
                    createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :
                            sink);
        }
        catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            sink.error(Operators.onOperatorError(ex, actual.currentContext()));
        }
    }
    static <T> BaseSink<T> createSink(CoreSubscriber<? super T> t,
            OverflowStrategy backpressure) {
        switch (backpressure) {
            case IGNORE: {
                return new IgnoreSink<>(t);
            }
            case ERROR: {
                return new ErrorAsyncSink<>(t);
            }
            case DROP: {
                return new DropAsyncSink<>(t);
            }
            case LATEST: {
                return new LatestAsyncSink<>(t);
            }
            default: {
                return new BufferAsyncSink<>(t, Queues.SMALL_BUFFER_SIZE);
            }
        }
    }    

First create sink, here create BufferAsyncSink, then call LambdaSubscriber.onSubscribe
Then call source.accept, that is, call lambda method of fluxSink to generate data and open stream mode.

LambdaSubscriber.onSubscribe

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/LambdaSubscriber.java

    public final void onSubscribe(Subscription s) {
        if (Operators.validate(subscription, s)) {
            this.subscription = s;
            if (subscriptionConsumer != null) {
                try {
                    subscriptionConsumer.accept(s);
                }
                catch (Throwable t) {
                    Exceptions.throwIfFatal(t);
                    s.cancel();
                    onError(t);
                }
            }
            else {
                s.request(Long.MAX_VALUE);
            }
        }
    }

Here again called bufferasinkrequest (long.max _ value), actually called BaseSink’s request.

        public final void request(long n) {
            if (Operators.validate(n)) {
                Operators.addCap(REQUESTED, this, n);

                LongConsumer consumer = requestConsumer;
                if (n > 0 && consumer != null && !isCancelled()) {
                    consumer.accept(n);
                }
                onRequestedFromDownstream();
            }
        }

OnRequestedFromDownstream here called onRequestedFromDownstream of BufferAsyncSink

        @Override
        void onRequestedFromDownstream() {
            drain();
        }

The call is drain of BufferAsyncSink

BufferAsyncSink.drain

        void drain() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }

            int missed = 1;
            final Subscriber<? super T> a = actual;
            final Queue<T> q = queue;

            for (; ; ) {
                long r = requested;
                long e = 0L;

                while (e != r) {
                    if (isCancelled()) {
                        q.clear();
                        return;
                    }

                    boolean d = done;

                    T o = q.poll();

                    boolean empty = o == null;

                    if (d && empty) {
                        Throwable ex = error;
                        if (ex != null) {
                            super.error(ex);
                        }
                        else {
                            super.complete();
                        }
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(o);

                    e++;
                }

                if (e == r) {
                    if (isCancelled()) {
                        q.clear();
                        return;
                    }

                    boolean d = done;

                    boolean empty = q.isEmpty();

                    if (d && empty) {
                        Throwable ex = error;
                        if (ex != null) {
                            super.error(ex);
                        }
                        else {
                            super.complete();
                        }
                        return;
                    }
                }

                if (e != 0) {
                    Operators.produced(REQUESTED, this, e);
                }

                missed = WIP.addAndGet(this, -missed);
                if (missed == 0) {
                    break;
                }
            }
        }

The queue here is specified by creating BufferAsyncSink, and the default is Queues.SMALL_BUFFER_SIZE (Math.max(16,Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256"))))
OnNext here is the consumer that calls LambdaSubscriber synchronously.

FluxCreate.subscribe#source.accept

source.accept(
                    createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :
                            sink);

CreateMode.PUSH_PULL wraps sink here as SerializedSink, and then calls Flux.create custom lambda consumer.

fluxSink -> {
            //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink
            LOGGER.info("sink:{}",fluxSink.getClass());
            while (true) {
                LOGGER.info("sink next");
                fluxSink.next(ThreadLocalRandom.current().nextInt());
            }
        }

After that, data push is started.

SerializedSink.next

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/FluxCreate.java#SerializedSink.next

        public FluxSink<T> next(T t) {
            Objects.requireNonNull(t, "t is null in sink.next(t)");
            if (sink.isCancelled() || done) {
                Operators.onNextDropped(t, sink.currentContext());
                return this;
            }
            if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
                try {
                    sink.next(t);
                }
                catch (Throwable ex) {
                    Operators.onOperatorError(sink, ex, t, sink.currentContext());
                }
                if (WIP.decrementAndGet(this) == 0) {
                    return this;
                }
            }
            else {
                Queue<T> q = queue;
                synchronized (this) {
                    q.offer(t);
                }
                if (WIP.getAndIncrement(this) != 0) {
                    return this;
                }
            }
            drainLoop();
            return this;
        }

Next is called here, and then drainLoop is returned.

BufferAsyncSink.next

        public FluxSink<T> next(T t) {
            queue.offer(t);
            drain();
            return this;
        }

Here, the data is put into queue, then drain is called to fetch the data, and onNext of LambdaSubscriber is called synchronously.

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/LambdaSubscriber.java

    @Override
    public final void onNext(T x) {
        try {
            if (consumer != null) {
                consumer.accept(x);
            }
        }
        catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            this.subscription.cancel();
            onError(t);
        }
    }

That is, the user-defined subscribe method is called synchronously. In addition to log, the instance will sleep, which is blocked synchronously.
After calling here, fluxSink’s next method here returns and continues the loop.

fluxSink -> {
            //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink
            LOGGER.info("sink:{}",fluxSink.getClass());
            while (true) {
                LOGGER.info("sink next");
                fluxSink.next(ThreadLocalRandom.current().nextInt());
            }
        }

Summary

FluxSink looks at data generated by infinite loop next, but there is no need to worry if subscribe and fluxSink are both in the same thread (This example is all in the main thread.), which are synchronously blocked calls.

When subscribing, call LambdaSubscriber.onSubscribe, request(N) to request data, and then call source.accept, that is, call lambda method of fluxSink to generate data and open stream mode.

Next blocks the consumer that called subscribe, and returns before continuing the loop.

As for the issue of BUFFER mode OOM, we can think about how to generate it.