Talk about parallel flux of reactive streams.

  reactive-programming

Order

This paper mainly studies the parallel operation mode of flux of reactive streams.

Purpose

In some IO operations, such as reading files and accessing databases, asynchronous threads are usually recommended to run in parallel mode to improve performance.

Example

    @Test
    public void testParallelRunOn(){
        Flux.range(1, 1000)
                .log()
                .parallel(8)
                .runOn(Schedulers.parallel()) //parallel flux
                .sequential() //必须使用sequential来将这些异步线程的执行结果汇集成一个stream
                .map(e -> {
                    LOGGER.info("map thread:{},e:{}",Thread.currentThread().getName(),e);
                    return e*10;
                })
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:{},e:{}",Thread.currentThread().getName(),e);
                });
    }

Partial output

2:38:53.949 [main] INFO reactor.Flux.Range.1 - | onNext(13)
22:38:53.949 [parallel-2] INFO com.example.demo.ParallelTest - subscribe thread:parallel-2,e:120
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(14)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:13
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:130
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(15)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:14
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:140
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(16)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:15
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:150
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(17)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:16
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(18)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:160
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:17
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(19)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:170
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:18

Summary

  • Parallel to specify the number of threads in the thread pool
  • RunOn starts parallel flux
  • Sequential assembles asynchronous thread pool execution results into a stream

doc