Talk about concurrency and prefetch Parameters of FluxFlatMap

  reactor

Order

This paper mainly studies the concurrency and prefetch parameters of FluxFlatMap

Example

    @Test
    public void testConcurrencyAndPrefetch(){
        int concurrency = 3;
        int prefetch = 6;
        Flux.range(1,100)
                .log()
                .flatMap(i -> Flux.just(1,2,3,4,5,6,7,8,9,10).log(),
                        concurrency,prefetch)
                .subscribe();
    }

Partial output

23:29:38.515 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
23:29:38.534 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
23:29:38.537 [main] INFO reactor.Flux.Range.1 - | request(3)
23:29:38.537 [main] INFO reactor.Flux.Range.1 - | onNext(1)
23:29:38.538 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(6)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(1)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(2)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(3)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(4)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(5)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(5)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(6)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(7)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(8)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(9)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | onNext(10)
23:29:38.539 [main] INFO reactor.Flux.Array.2 - | request(5)
23:29:38.540 [main] INFO reactor.Flux.Array.2 - | onComplete()
23:29:38.540 [main] INFO reactor.Flux.Range.1 - | request(1)
23:29:38.540 [main] INFO reactor.Flux.Range.1 - | onNext(2)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | request(6)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onNext(1)
23:29:38.540 [main] INFO reactor.Flux.Array.3 - | onNext(2)

However, looking at the first request of the two flux inside and outside, we can initially see that they are concurrency and prefetch respectively.

Source code analysis

Flux

reactor-core-3.1.5.RELEASE-sources.jar! /reactor/core/publisher/Flux.java

    /**
     * Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
     * then flatten these inner publishers into a single {@link Flux} through merging,
     * which allow them to interleave.
     * <p>
     * There are three dimensions to this operator that can be compared with
     * {@link #flatMapSequential(Function) flatMapSequential} and {@link #concatMap(Function) concatMap}:
     * <ul>
     *     <li><b>Generation of inners and subscription</b>: this operator is eagerly
     *     subscribing to its inners.</li>
     *     <li><b>Ordering of the flattened values</b>: this operator does not necessarily preserve
     *     original ordering, as inner element are flattened as they arrive.</li>
     *     <li><b>Interleaving</b>: this operator lets values from different inners interleave
     *     (similar to merging the inner sequences).</li>
     * </ul>
     * The concurrency argument allows to control how many {@link Publisher} can be
     * subscribed to and merged in parallel. The prefetch argument allows to give an
     * arbitrary prefetch size to the merged {@link Publisher}.
     *
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/flatmapc.png" alt="">
     *
     * @param mapper the {@link Function} to transform input sequence into N sequences {@link Publisher}
     * @param concurrency the maximum number of in-flight inner sequences
     * @param prefetch the maximum in-flight elements from each inner {@link Publisher} sequence
     * @param <V> the merged output sequence type
     *
     * @return a merged {@link Flux}
     */
    public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int
            concurrency, int prefetch) {
        return flatMap(mapper, false, concurrency, prefetch);
    }

    final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends
            V>> mapper, boolean delayError, int concurrency, int prefetch) {
        return onAssembly(new FluxFlatMap<>(
                this,
                mapper,
                delayError,
                concurrency,
                Queues.get(concurrency),
                prefetch,
                Queues.get(prefetch)
        ));
    }

FluxFlatMap is used here.

FluxFlatMap

reactor-core-3.1.5.RELEASE-sources.jar! /reactor/core/publisher/FluxFlatMap.java

    FluxFlatMap(Flux<? extends T> source,
            Function<? super T, ? extends Publisher<? extends R>> mapper,
            boolean delayError,
            int maxConcurrency,
            Supplier<? extends Queue<R>> mainQueueSupplier,
            int prefetch,
            Supplier<? extends Queue<R>> innerQueueSupplier) {
        super(source);
        if (prefetch <= 0) {
            throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
        }
        if (maxConcurrency <= 0) {
            throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);
        }
        this.mapper = Objects.requireNonNull(mapper, "mapper");
        this.delayError = delayError;
        this.prefetch = prefetch;
        this.maxConcurrency = maxConcurrency;
        this.mainQueueSupplier =
                Objects.requireNonNull(mainQueueSupplier, "mainQueueSupplier");
        this.innerQueueSupplier =
                Objects.requireNonNull(innerQueueSupplier, "innerQueueSupplier");
    }

    @Override
    public void subscribe(CoreSubscriber<? super R> actual) {

        if (trySubscribeScalarMap(source, actual, mapper, false)) {
            return;
        }

        source.subscribe(new FlatMapMain<>(actual,
                mapper,
                delayError,
                maxConcurrency,
                mainQueueSupplier,
                prefetch, innerQueueSupplier));
    }    

You can see here that when subscribe, FlatMapMain was used.

FlatMapMain

static final class FlatMapMain<T, R> extends FlatMapTracker<FlatMapInner<R>>
            implements InnerOperator<T, R> {

        FlatMapMain(CoreSubscriber<? super R> actual,
                Function<? super T, ? extends Publisher<? extends R>> mapper,
                boolean delayError,
                int maxConcurrency,
                Supplier<? extends Queue<R>> mainQueueSupplier,
                int prefetch,
                Supplier<? extends Queue<R>> innerQueueSupplier) {
            this.actual = actual;
            this.mapper = mapper;
            this.delayError = delayError;
            this.maxConcurrency = maxConcurrency;
            this.mainQueueSupplier = mainQueueSupplier;
            this.prefetch = prefetch;
            this.innerQueueSupplier = innerQueueSupplier;
            this.limit = Operators.unboundedOrLimit(maxConcurrency);
        }

        @Override
        public void request(long n) {
            if (Operators.validate(n)) {
                Operators.addCap(REQUESTED, this, n);
                drain();
            }
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (Operators.validate(this.s, s)) {
                this.s = s;

                actual.onSubscribe(this);
                s.request(Operators.unboundedOrPrefetch(maxConcurrency));
            }
        }

        @SuppressWarnings("unchecked")
        @Override
        public void onNext(T t) {
            if (done) {
                Operators.onNextDropped(t, actual.currentContext());
                return;
            }

            Publisher<? extends R> p;

            try {
                p = Objects.requireNonNull(mapper.apply(t),
                "The mapper returned a null Publisher");
            }
            catch (Throwable e) {
                onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
                return;
            }

            if (p instanceof Callable) {
                R v;
                try {
                    v = ((Callable<R>) p).call();
                }
                catch (Throwable e) {
                    if (!delayError || !Exceptions.addThrowable(ERROR, this, e)) {
                        onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
                    }
                    return;
                }
                tryEmitScalar(v);
            }
            else {
                FlatMapInner<R> inner = new FlatMapInner<>(this, prefetch);
                if (add(inner)) {

                    p.subscribe(inner);
                }
            }

        }

        //...
}                        

This can be understood as an operation on the outer layer flux. when you can see onSubscribe, the size of its internal request is operators. unboundedorprefitch (max concurrency), which is the first parameter concurrency.

In onNext, FlatMapInner was used for flux in OnNext

FlatMapInner

static final class FlatMapInner<R>
            implements InnerConsumer<R>, Subscription {

        FlatMapInner(FlatMapMain<?, R> parent, int prefetch) {
            this.parent = parent;
            this.prefetch = prefetch;
//            this.limit = prefetch >> 2;
            this.limit = Operators.unboundedOrLimit(prefetch);
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (Operators.setOnce(S, this, s)) {
                if (s instanceof Fuseable.QueueSubscription) {
                    @SuppressWarnings("unchecked") Fuseable.QueueSubscription<R> f =
                            (Fuseable.QueueSubscription<R>) s;
                    int m = f.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER);
                    if (m == Fuseable.SYNC) {
                        sourceMode = Fuseable.SYNC;
                        queue = f;
                        done = true;
                        parent.drain();
                        return;
                    }
                    if (m == Fuseable.ASYNC) {
                        sourceMode = Fuseable.ASYNC;
                        queue = f;
                    }
                    // NONE is just fall-through as the queue will be created on demand
                }
                s.request(Operators.unboundedOrPrefetch(prefetch));
            }
        }        

        @Override
        public void request(long n) {
            long p = produced + n;
            if (p >= limit) {
                produced = 0L;
                s.request(p);
            }
            else {
                produced = p;
            }
        }        
}            

When subscribe, the number of request is operators. unboundedorprefetch (prefetch)
It can be seen here that prefetch is shifted to the right by 2, which is equivalent to dividing by 4. as limit, limit is a judgment used to limit the number of request for inner’s flux

Summary

The two parameters concurrency and prefetch of flatMap are the two flux acting on the outside and the inside respectively. The first request uses this value, and then the number of requests will be judged and adjusted internally.

doc