Talk about schedulers of reactive streams.

  reactive-programming

Order

This article mainly studies the schedulers of reactive streams.

Background

By default, Mono and Flux both run on the main thread, sometimes blocking the main thread. You can set schedulers to run on other threads.

Original output

When publishOn and subscribeOn are not used, the output is as follows

11:26:10.668 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:26:11.097 [main] INFO com.example.demo.SchedulerTest - defer thread:[main]
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - subscribe thread:[main],data :2
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:26:11.116 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:26:11.117 [main] INFO com.example.demo.SchedulerTest - subscribe thread:[main],data :4

publishOn(Configure threads for subscriber)

    @Test
    public void testPublisherThread(){
        Scheduler pubScheduler = Schedulers.newSingle("pub-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            return Flux.range(1,4);
        })
                .filter(e -> {
                    LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .publishOn(pubScheduler)
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
    }

Output

11:31:23.691 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:31:23.871 [main] INFO com.example.demo.SchedulerTest - defer thread:[main]
11:31:23.880 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:31:23.881 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:31:23.881 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:31:23.881 [main] INFO com.example.demo.SchedulerTest - filter thread:[main]
11:31:23.881 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :2
11:31:23.881 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :4

It can be found that configuring publishOn has changed subscribe’s running thread.

subscribeOn(Configure threads for publisher)

    @Test
    public void testSubscriberThread() throws InterruptedException {
        Scheduler subScheduler = Schedulers.newSingle("sub-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            return Flux.range(1,4);
        })
                .filter(e -> {
                    LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .subscribeOn(subScheduler)
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
        Thread.sleep(10*1000);
    }

The output is as follows:

11:31:58.294 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:31:58.528 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - defer thread:[subscriber-thread-1]
11:31:58.532 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:31:58.532 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:31:58.532 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[subscriber-thread-1],data :2
11:31:58.533 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:31:58.533 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:31:58.533 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[subscriber-thread-1],data :4

It can be found that subscribe is configured, and all are running on this thread, including defer, filter and Subscribe.

PublishOn and subscribeOn

    @Test
    public void testPublisherAndSubscriberThread() throws InterruptedException {
        Scheduler pubScheduler = Schedulers.newSingle("publisher-thread");
        Scheduler subScheduler = Schedulers.newSingle("subscriber-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            return Flux.range(1,4);
        })
                .filter(e -> {
                    LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .publishOn(pubScheduler)
                .subscribeOn(subScheduler)
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
        Thread.sleep(10*1000);
    }

Output

11:33:00.964 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
11:33:01.125 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - defer thread:[subscriber-thread-1]
11:33:01.134 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:33:01.135 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:33:01.135 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:33:01.135 [subscriber-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[subscriber-thread-1]
11:33:01.135 [publisher-thread-2] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-2],data :2
11:33:01.135 [publisher-thread-2] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-2],data :4

If they are all configured, you can see that subscriber runs on the thread configured by publish, while defer, filter, etc. run on the thread configured by subscribe.

PublishOn and filter

    @Test
    public void testFilterThread(){
        Scheduler pubScheduler = Schedulers.newSingle("publisher-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            return Flux.range(1,4);
        })
                .publishOn(pubScheduler) //NOTE 注意这里放到了filter之前
                .filter(e -> {
                    LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
    }

Output

13:19:01.606 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:19:01.754 [main] INFO com.example.demo.SchedulerTest - defer thread:[main]
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[publisher-thread-1]
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[publisher-thread-1]
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :2
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[publisher-thread-1]
13:19:01.766 [publisher-thread-1] INFO com.example.demo.SchedulerTest - filter thread:[publisher-thread-1]
13:19:01.767 [publisher-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[publisher-thread-1],data :4

When publishing on is placed before filter, you can find that the filter thread has also become publisher thread.
Filter or maps after publishOn will use publishOn configured threads; Previously, the main thread or the thread configured by subscribeOn was used.

SubscribeOn and filter

SubscribeOn is placed before filter, which is no different from that after, because when publishOn is not configured, subscribon acts on all, including filter.

window scheduler

You can also set a thread pool for the window method.

    @Test
    public void testWindowScheduler() throws InterruptedException {
        Scheduler windowScheduler = Schedulers.newSingle("window-thread");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            return Flux.range(1,4);
        })
                .delayElements(Duration.ofMillis(200)) //默认会创建parallel线程,作用于subscribe线程
                .windowTimeout(1, Duration.ofMillis(100), windowScheduler)
                .onErrorReturn(Flux.<Integer>just(-1))
                .flatMap(e -> {
                    return e.map(item -> item*10);
                })
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
        Thread.sleep(10*1000);
    }

Output

14:15:28.523 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:15:28.701 [main] INFO com.example.demo.SchedulerTest - defer thread:[main]
14:15:28.961 [parallel-1] INFO com.example.demo.SchedulerTest - subscribe thread:[parallel-1],data :10
14:15:29.167 [window-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[window-thread-1],data :20
14:15:29.370 [window-thread-1] INFO com.example.demo.SchedulerTest - subscribe thread:[window-thread-1],data :30
14:15:29.573 [parallel-4] INFO com.example.demo.SchedulerTest - subscribe thread:[parallel-4],data :40

Note that the delayElements method creates a parallel thread for subscriber by default.
Methods such as timeout (), skip () also create threads by default

scheduleGroup

Previously, publishOn and subscribeOn used Schedulers.newSingle, or group of multiple threads, such as

Scheduler parallelGroup = Schedulers.newParallel("parallel-group", 8);

You can also use elastic type, which is more suitable for IO type operations.

    /**
     * {@link Scheduler} that dynamically creates ExecutorService-based Workers and caches
     * the thread pools, reusing them once the Workers have been shut down.
     * <p>
     * The maximum number of created thread pools is unbounded.
     * <p>
     * The default time-to-live for unused thread pools is 60 seconds, use the appropriate
     * factory to push a different value.
     * <p>
     * This scheduler is not restartable.
     *
     * @param name Thread prefix
     *
     * @return a new {@link Scheduler} that hosts a fixed pool of single-threaded
     * ExecutorService-based workers and is suited for parallel work
     */
    public static Scheduler newElastic(String name) {
        return newElastic(name, ElasticScheduler.DEFAULT_TTL_SECONDS);
    }

Example

    @Test
    public void testElasticGroup() throws InterruptedException {
        Scheduler elastic = Schedulers.newElastic("elastic-group");
        Flux.defer(() -> {
            LOGGER.info("defer thread:[{}]",Thread.currentThread().getName());
            return Flux.range(1,4);
        })
                .filter(e -> {
                    LOGGER.info("filter thread:[{}]",Thread.currentThread().getName());
                    return e % 2 == 0;
                })
                .publishOn(elastic)
                .map(e -> {
                    LOGGER.info("map thread:[{}]",Thread.currentThread().getName());
                    return e * 10;
                })
                .subscribeOn(elastic)
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:[{}],data :{}",Thread.currentThread().getName(),e);
                });
        Thread.sleep(10*1000);
    }

Output

13:58:37.356 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:58:37.514 [elastic-group-2] INFO com.example.demo.SchedulerTest - defer thread:[elastic-group-2]
13:58:37.520 [elastic-group-2] INFO com.example.demo.SchedulerTest - filter thread:[elastic-group-2]
13:58:37.520 [elastic-group-2] INFO com.example.demo.SchedulerTest - filter thread:[elastic-group-2]
13:58:37.520 [elastic-group-2] INFO com.example.demo.SchedulerTest - filter thread:[elastic-group-2]
13:58:37.520 [elastic-group-2] INFO com.example.demo.SchedulerTest - filter thread:[elastic-group-2]
13:58:37.520 [elastic-group-3] INFO com.example.demo.SchedulerTest - map thread:[elastic-group-3]
13:58:37.520 [elastic-group-3] INFO com.example.demo.SchedulerTest - subscribe thread:[elastic-group-3],data :20
13:58:37.521 [elastic-group-3] INFO com.example.demo.SchedulerTest - map thread:[elastic-group-3]
13:58:37.521 [elastic-group-3] INFO com.example.demo.SchedulerTest - subscribe thread:[elastic-group-3],data :40

Summary

  • Name

The name of this publishOn and subscribeOn method is a bit obscure, more straightforward equivalent to subscriberThreadPools and publisherThreadPools.

  • Location of publishOn and operations

Filter or maps after publishOn will use publishOn configured threads; Previously, the main thread or the thread configured by subscribeOn was used.

  • subscribeOn

If you do not configure publishOn but only subscribeOn, all will be affected.

  • Method built-in thread

DelayElements(),timeout(),skip () built-in uses additional threads

doc