Talk about the backpressure of reactive streams.

  reactive-programming

Order

This paper mainly studies the backpressure of reactive streams.

The difference between reactive streams and traditional streams

    @Test
    public void testShowReactiveStreams() throws InterruptedException {
        Flux.interval(Duration.ofMillis(1000))
                .take(500)
                .subscribe(e -> LOGGER.info("get {}",e));

        Thread.sleep(5*60*1000);
    }

Examples of output are as follows:

18:52:34.118 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
18:52:35.157 [parallel-2] INFO com.example.demo.FluxTest - get 0
18:52:36.156 [parallel-2] INFO com.example.demo.FluxTest - get 1
18:52:37.156 [parallel-2] INFO com.example.demo.FluxTest - get 2
18:52:38.159 [parallel-2] INFO com.example.demo.FluxTest - get 3
18:52:39.157 [parallel-2] INFO com.example.demo.FluxTest - get 4
18:52:40.155 [parallel-2] INFO com.example.demo.FluxTest - get 5
18:52:41.154 [parallel-2] INFO com.example.demo.FluxTest - get 6
18:52:42.158 [parallel-2] INFO com.example.demo.FluxTest - get 7
18:52:43.157 [parallel-2] INFO com.example.demo.FluxTest - get 8
18:52:44.156 [parallel-2] INFO com.example.demo.FluxTest - get 9
18:52:45.154 [parallel-2] INFO com.example.demo.FluxTest - get 10

The traditional list streams are not asynchronous. for example, a batch of 500 semi-finished products need to be processed in link a before the next link b, while reactive streams are just as good as this batch of 500 semi-finished products. each processed piece in link a can be immediately pushed to the next link b for continuous processing, instead of waiting for all semi-finished products to be processed in link a and then pushed to link b. A typical living example of a production line.

backpressure

In such a production line, there is a requirement that the processing of each link should be coordinated, just like the male leading role in the starting line of the movie went to work in the factory, and the flowers in the production line pushed the goods to him. He couldn’t keep up with the speed, causing the goods to fall to the ground and finally had to manually shut down the production line.
In an application, if the publisher is too fast and the subscriber is too slow, then data will accumulate and memory overflow will easily occur if it is not well controlled. backpressure is specifically used to solve this problem.

Backpressure of pull model

@Test
    public void testPullBackpressure(){
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}
                });

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Backpressure of push model

The speed of data generation is controlled by thread-related operators such as timeout (), delay elements (), buffer (), skip (), take ().

delayElements

@Test
    public void testPushBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .delayElements(Duration.ofMillis(200))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

Output instance

19:37:00.870 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:37:01.117 [parallel-1] INFO com.example.demo.FluxTest - subscribe:1
19:37:03.326 [parallel-2] INFO com.example.demo.FluxTest - subscribe:2
19:37:05.535 [parallel-3] INFO com.example.demo.FluxTest - subscribe:3
19:37:07.743 [parallel-4] INFO com.example.demo.FluxTest - subscribe:4
19:37:09.953 [parallel-5] INFO com.example.demo.FluxTest - subscribe:5
19:37:12.156 [parallel-6] INFO com.example.demo.FluxTest - subscribe:6
19:37:14.363 [parallel-7] INFO com.example.demo.FluxTest - subscribe:7
19:37:16.568 [parallel-8] INFO com.example.demo.FluxTest - subscribe:8
19:37:18.775 [parallel-1] INFO com.example.demo.FluxTest - subscribe:9

This is an example of delayElements, it can be seen that data is not lost, but the delay is production delay+consumption delay

sample

@Test
    public void testSampleBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .sample(Duration.ofMillis(1000))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

Output instance

19:48:40.516 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:48:40.544 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
19:48:40.546 [main] INFO reactor.Flux.Range.1 - | onNext(1)
19:48:40.770 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2)
19:48:40.974 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3)
19:48:41.175 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4)
19:48:41.378 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5)
19:48:41.543 [parallel-1] INFO com.example.demo.FluxTest - subscribe:4
19:48:41.583 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6)
19:48:41.785 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7)
19:48:41.989 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(8)
19:48:43.547 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(9)
19:48:43.548 [parallel-1] INFO com.example.demo.FluxTest - subscribe:8
19:48:43.751 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(10)
19:48:43.952 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(11)

It can be seen that due to the slow speed of subscribers, some data is discarded.

buffer

@Test
    public void testBufferBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
//                .log()
                .delayElements(Duration.ofMillis(200))
                .buffer(Duration.ofMillis(800))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

Output instance

19:55:06.680 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:55:06.712 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
19:55:06.714 [main] INFO reactor.Flux.Range.1 - | onNext(1)
19:55:06.940 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2)
19:55:07.141 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3)
19:55:07.343 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4)
19:55:07.509 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[1, 2, 3]
19:55:07.545 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5)
19:55:07.748 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6)
19:55:07.951 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7)
19:55:08.156 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(8)
19:55:09.512 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[4, 5, 6, 7]
19:55:11.515 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(9)
19:55:11.516 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[8]
19:55:11.719 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(10)
19:55:11.923 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(11)
19:55:12.127 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(12)
19:55:12.330 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(13)
19:55:12.533 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(14)
19:55:12.735 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(15)
19:55:12.941 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(16)
19:55:13.516 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[9, 10, 11, 12, 13, 14, 15]
19:55:15.517 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(17)
19:55:15.517 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[16]
19:55:15.721 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(18)
19:55:15.925 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(19)
19:55:16.127 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(20)
19:55:16.331 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(21)
19:55:16.537 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(22)
19:55:16.738 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(23)
19:55:16.942 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(24)
19:55:17.519 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[17, 18, 19, 20, 21, 22, 23]
19:55:19.522 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(25)
19:55:19.522 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[24]

The data generated within each 800ms is accumulated into a batch and pushed to subscribers

skip

@Test
    public void testSkip() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .skip(Duration.ofMillis(800))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

Output instance

20:02:07.558 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:02:07.606 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:02:07.608 [main] INFO reactor.Flux.Range.1 - | onNext(1)
20:02:07.815 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2)
20:02:08.016 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3)
20:02:08.218 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4)
20:02:08.421 [parallel-5] INFO com.example.demo.FluxTest - subscribe:4
20:02:10.425 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5)
20:02:10.631 [parallel-6] INFO com.example.demo.FluxTest - subscribe:5
20:02:12.635 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6)
20:02:12.840 [parallel-7] INFO com.example.demo.FluxTest - subscribe:6
20:02:14.843 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7)
20:02:15.049 [parallel-8] INFO com.example.demo.FluxTest - subscribe:7

The skip specifies to skip the data generated in the first time period

take

@Test
    public void testTakeBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .take(Duration.ofMillis(4000))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

Output instance

20:05:08.366 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:05:08.419 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:05:08.422 [main] INFO reactor.Flux.Range.1 - | onNext(1)
20:05:08.629 [parallel-2] INFO com.example.demo.FluxTest - subscribe:1
20:05:10.633 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2)
20:05:10.835 [parallel-3] INFO com.example.demo.FluxTest - subscribe:2
20:05:12.418 [parallel-1] INFO reactor.Flux.Range.1 - | cancel()

Take means that only the data generated in the previous few or a period of time will be pushed to subscribers.

Summary

Reactive streams is very useful for data processing with multiple stages, which can save a lot of time. In addition, there is backpressure to control the slow speed of subscribers, which is very worth using.

doc