Talk about Mono and Flux of reactive streams

  reactive-programming

Order

This article mainly talks about two abstract classes Mono and Flux of Publisher interface of reactive streams.

Publisher

reactive-streams-1.0.1-sources.jar! /org/reactivestreams/Publisher.java

/**
 * A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to
 * the demand received from its {@link Subscriber}(s).
 * <p>
 * A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link #subscribe(Subscriber)} dynamically
 * at various points in time.
 *
 * @param <T> the type of element signaled.
 */
public interface Publisher<T> {

    /**
     * Request {@link Publisher} to start streaming data.
     * <p>
     * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
     * <p>
     * Each {@link Subscription} will work for only a single {@link Subscriber}.
     * <p>
     * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
     * <p>
     * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
     * signal the error via {@link Subscriber#onError}.
     *
     * @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
     */
    public void subscribe(Subscriber<? super T> s);
}

Mono

reactor-core-3.1.2.RELEASE-sources.jar! /reactor/core/publisher/Mono.java

public abstract class Mono<T> implements Publisher<T> {
    //...
    /**
     * Expose the specified {@link Publisher} with the {@link Mono} API, and ensure it will emit 0 or 1 item.
     * The source emitter will be cancelled on the first `onNext`.
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/from1.png" alt="">
     * <p>
     * @param source the {@link Publisher} source
     * @param <T> the source type
     *
     * @return the next item emitted as a {@link Mono}
     */
    public static <T> Mono<T> from(Publisher<? extends T> source) {
        if (source instanceof Mono) {
            @SuppressWarnings("unchecked")
            Mono<T> casted = (Mono<T>) source;
            return casted;
        }
        if (source instanceof Flux) {
            @SuppressWarnings("unchecked")
            Flux<T> casted = (Flux<T>) source;
            return casted.next();
        }
        return onAssembly(new MonoFromPublisher<>(source));
    }

    /**
     * Create a new {@link Mono} that emits the specified item, which is captured at
     * instantiation time.
     *
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/just.png" alt="">
     * <p>
     * @param data the only item to onNext
     * @param <T> the type of the produced item
     *
     * @return a {@link Mono}.
     */
    public static <T> Mono<T> just(T data) {
        return onAssembly(new MonoJust<>(data));
    }
    //...
}

Flux

reactor-core-3.1.2.RELEASE-sources.jar! /reactor/core/publisher/Flux.java

public abstract class Flux<T> implements Publisher<T> {
    //......
    /**
     * Programmatically create a {@link Flux} with the capability of emitting multiple
     * elements in a synchronous or asynchronous manner through the {@link FluxSink} API.
     * <p>
     * This Flux factory is useful if one wants to adapt some other multi-valued async API
     * and not worry about cancellation and backpressure (which is handled by buffering
     * all signals if the downstream can't keep up).
     * <p>
     * For example:
     *
     * <pre><code>
     * Flux.&lt;String&gt;create(emitter -&gt; {
     *
     *     ActionListener al = e -&gt; {
     *         emitter.next(textField.getText());
     *     };
     *     // without cleanup support:
     *
     *     button.addActionListener(al);
     *
     *     // with cleanup support:
     *
     *     button.addActionListener(al);
     *     emitter.onDispose(() -> {
     *         button.removeListener(al);
     *     });
     * }, FluxSink.OverflowStrategy.LATEST);
     * </code></pre>
     *
     * @param <T> The type of values in the sequence
     * @param backpressure the backpressure mode, see {@link OverflowStrategy} for the
     * available backpressure modes
     * @param emitter Consume the {@link FluxSink} provided per-subscriber by Reactor to generate signals.
     * @return a {@link Flux}
     */
    public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {
        return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL));
    }

    /**
     * Decorate the specified {@link Publisher} with the {@link Flux} API.
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/from.png" alt="">
     * <p>
     * @param source the source to decorate
     * @param <T> The type of values in both source and output sequences
     *
     * @return a new {@link Flux}
     */
    public static <T> Flux<T> from(Publisher<? extends T> source) {
        if (source instanceof Flux) {
            @SuppressWarnings("unchecked")
            Flux<T> casted = (Flux<T>) source;
            return casted;
        }

        if (source instanceof Fuseable.ScalarCallable) {
            try {
                @SuppressWarnings("unchecked") T t =
                        ((Fuseable.ScalarCallable<T>) source).call();
                if (t != null) {
                    return just(t);
                }
                return empty();
            }
            catch (Exception e) {
                return error(e);
            }
        }
        return wrap(source);
    }

    /**
     * Programmatically create a {@link Flux} by generating signals one-by-one via a
     * consumer callback and some state, with a final cleanup callback. The
     * {@code stateSupplier} may return {@literal null} but your cleanup {@code stateConsumer}
     * will need to handle the null case.
     * <p>
     * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/generate.png" alt="">
     * <p>
     *
     * @param <T> the value type emitted
     * @param <S> the per-subscriber custom state type
     * @param stateSupplier called for each incoming Subscriber to provide the initial state for the generator bifunction
     * @param generator Consume the {@link SynchronousSink} provided per-subscriber by Reactor
     * as well as the current state to generate a <strong>single</strong> signal on each pass
     * and return a (new) state.
     * @param stateConsumer called after the generator has terminated or the downstream cancelled, receiving the last
     * state to be handled (i.e., release resources or do other cleanup).
     *
     * @return a {@link Flux}
     */
    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer) {
        return onAssembly(new FluxGenerate<>(stateSupplier, generator, stateConsumer));
    }
}

Example

Mono

    @Test
    public void testMonoBasic(){
        Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
        Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
        Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
    }

Mono means to trigger (emit) (event) only once at most. It corresponds to the Single and Maybe types of RxJava library or the Optional of java. Therefore, if an asynchronous task only wants to give a completion signal when it is completed, it can use Mono<Void >.

Calling single () of Flux<T > will return a single < t >, while connecting two Mono<T using concatWith will generate a Flux.

Flux

    @Test
    public void testBasic(){
        Flux.just("Hello", "World").subscribe(System.out::println);
        Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
        Flux.empty().subscribe(System.out::println);
        Flux.range(1, 10).subscribe(System.out::println);
        Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
    }

Flux is equivalent to an RxJava Observable, capable of issuing 0~N data items, and then (optionally) completing or erroring. Process multiple data items as stream.

Summary

Mono and Flux are abstract classes that implement Publisher interfaces. One is equivalent to Optional and the other is equivalent to stream with 0 .. n. Both are important basic concepts of spring 5 reactive programming.

doc