Talk about processors of reactive streams.

  reactive-programming

Order

This article mainly studies processors of reactive streams.

Processors classification

Processors are both Publisher and Subscriber. Processor have many implementations in project reactor, and their classification is roughly as follows:

  • direct(DirectProcessor and UnicastProcessor)
  • synchronous(EmitterProcessor and ReplayProcessor)
  • asynchronous(TopicProcessor and WorkQueueProcessor)

direct

DirectProcessor

It does not support the backpressure feature. if publisher publishes n data and one of the subscriber requests < n, an IllegalStateException is thrown.

    @Test
    public void testDirectProcessor(){
        DirectProcessor<Integer> directProcessor = DirectProcessor.create();
        Flux<Integer> flux = directProcessor
                .filter(e -> e % 2 == 0)
                .map(e -> e +1);
        flux.subscribe(new Subscriber<Integer>() {
            private Subscription s;
            @Override
            public void onSubscribe(Subscription s) {
                this.s = s;
//                s.request(2);
            }

            @Override
            public void onNext(Integer integer) {
                LOGGER.info("subscribe:{}",integer);
            }

            @Override
            public void onError(Throwable t) {
                LOGGER.error(t.getMessage(),t);
            }

            @Override
            public void onComplete() {

            }
        });

        IntStream.range(1,20)
                .forEach(e -> {
                    directProcessor.onNext(e);
                });

        directProcessor.onComplete();
        directProcessor.blockLast();
    }

The output is as follows

16:00:11.201 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:00:11.216 [main] ERROR com.example.demo.ProcessorTest - Can't deliver value due to lack of requests
reactor.core.Exceptions$OverflowException: Can't deliver value due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    at reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:304)
    at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:106)
    at com.example.demo.ProcessorTest.lambda$testDirectProcessor$5(ProcessorTest.java:82)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)
    at com.example.demo.ProcessorTest.testDirectProcessor(ProcessorTest.java:81)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

UnicastProcessor

  • Backpressure feature is supported, but at most there can be only one subscriber. The default is unbounded. If subscriber has not had time to request after publishing the data, it will cache the data.
  • If a bounded queue is set, processor will refuse to push data when buffer is full and subscriber does not send enough request. In this scenario, processor has a callback built in, which triggers whenever an element is rejected.
    @Test
    public void testUnicastProcessor() throws InterruptedException {
        UnicastProcessor<Integer> unicastProcessor = UnicastProcessor.create(Queues.<Integer>get(8).get());
        Flux<Integer> flux = unicastProcessor
                .map(e -> e)
                .doOnError(e -> {
                    LOGGER.error(e.getMessage(),e);
                });

        IntStream.rangeClosed(1,12)
                .forEach(e -> {
                    LOGGER.info("emit:{}",e);
                    unicastProcessor.onNext(e);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        LOGGER.info("begin to sleep 7 seconds");
        TimeUnit.SECONDS.sleep(7);
        //UnicastProcessor allows only a single Subscriber
        flux.subscribe(e -> {
            LOGGER.info("flux subscriber:{}",e);
        });

        unicastProcessor.onComplete();
        TimeUnit.SECONDS.sleep(10);
//        unicastProcessor.blockLast(); //blockLast也是一个subscriber
    }

Output instance

16:31:04.970 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
16:31:04.977 [main] INFO com.example.demo.ProcessorTest - emit:1
16:31:05.990 [main] INFO com.example.demo.ProcessorTest - emit:2
16:31:06.991 [main] INFO com.example.demo.ProcessorTest - emit:3
16:31:07.994 [main] INFO com.example.demo.ProcessorTest - emit:4
16:31:08.998 [main] INFO com.example.demo.ProcessorTest - emit:5
16:31:10.002 [main] INFO com.example.demo.ProcessorTest - emit:6
16:31:11.007 [main] INFO com.example.demo.ProcessorTest - emit:7
16:31:12.010 [main] INFO com.example.demo.ProcessorTest - emit:8
16:31:13.014 [main] INFO com.example.demo.ProcessorTest - emit:9
16:31:14.029 [main] INFO com.example.demo.ProcessorTest - emit:10
16:31:14.030 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 10
16:31:15.034 [main] INFO com.example.demo.ProcessorTest - emit:11
16:31:15.034 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 11
16:31:16.038 [main] INFO com.example.demo.ProcessorTest - emit:12
16:31:16.038 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 12
16:31:17.043 [main] INFO com.example.demo.ProcessorTest - begin to sleep 7 seconds
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:1
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:2
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:3
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:4
16:31:24.053 [main] INFO com.example.demo.ProcessorTest - flux subscriber:5
16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:6
16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:7
16:31:24.054 [main] INFO com.example.demo.ProcessorTest - flux subscriber:8
16:31:24.058 [main] ERROR com.example.demo.ProcessorTest - The receiver is overrun by more signals than expected (bounded queue...)
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202)
    at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:330)
    at com.example.demo.ProcessorTest.lambda$testUnicastProcessor$8(ProcessorTest.java:108)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:557)

synchronous

EmitterProcessor

  • It can support multiple subscribers while also supporting backpressure for each subscriber. It can also subscribe to publisher and then replay the data synchronously.
  • It has a bufferSize parameter, which is used for data during the period when there is no subscriber after the data is published. onNext will block until the data is consumed. When the first subscriber subscribes, it receives the data in buffer, while subsequent subscribers can only consume the data published since the time they subscribe.
  • When all subscriber cancel the subscription, the processor empties the buffer and stops receiving new subscriptions.
    @Test
    public void testEmitterProcessor() throws InterruptedException {
        int bufferSize = 3; //小于8的会被重置为8
        FluxProcessor<Integer, Integer> processor = EmitterProcessor.create(bufferSize);
        Flux<Integer> flux1 = processor.map(e -> e);
        Flux<Integer> flux2 = processor.map(e -> e*10);

        IntStream.rangeClosed(1,8).forEach(e -> {
            LOGGER.info("emit:{}",e);
            processor.onNext(e); //如果发布的未消费数据超过bufferSize,则会阻塞在这里
        });

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });

        IntStream.rangeClosed(9,10).forEach(e -> {
            LOGGER.info("emit:{}",e);
            processor.onNext(e);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        });

        //这个是后面添加的订阅,只能消费之后发布的数据
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });

        processor.onNext(11);
        processor.onNext(12);

        processor.onComplete();
        processor.blockLast();
    }

Output instance

17:27:01.008 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
17:27:01.044 [main] INFO com.example.demo.ProcessorTest - emit:1
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:2
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:3
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:4
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:5
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:6
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:7
17:27:01.084 [main] INFO com.example.demo.ProcessorTest - emit:8
17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1
17:27:01.086 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:6
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:7
17:27:01.087 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:8
17:27:01.088 [main] INFO com.example.demo.ProcessorTest - emit:9
17:27:01.088 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:9
17:27:02.091 [main] INFO com.example.demo.ProcessorTest - emit:10
17:27:02.092 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:10
17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:11
17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:110
17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:12
17:27:03.096 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:120

ReplayProcessor

You can cache the data generated by sink or subscribe to publisher’s data and replay it to subsequent subscribers. There are four configurations

  • cacheLast

Only the last data is cached

  • create(int)

Cache last n data

  • createTimeout(Duration)

Time stamp each data and cache only the data within the specified ttl of age

  • createSizeOrTimeout(int,Duration)

Each data is time stamped and only n data of age within the specified ttl are cached

Example

    @Test
    public void testReplayProcessor() throws InterruptedException {
        ReplayProcessor<Integer> replayProcessor = ReplayProcessor.create(3);
        Flux<Integer> flux1 = replayProcessor
                .map(e -> e);
        Flux<Integer> flux2 = replayProcessor
                .map(e -> e);

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });


        IntStream.rangeClosed(1,5)
                .forEach(e -> {
                    replayProcessor.onNext(e);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });

        LOGGER.info("finish publish data");
        TimeUnit.SECONDS.sleep(3);

        LOGGER.info("begin to subscribe flux2");
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });

        replayProcessor.onComplete();
        replayProcessor.blockLast();
    }

The output is as follows

15:13:39.415 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:13:39.438 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:1
15:13:40.445 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:2
15:13:41.449 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:3
15:13:42.454 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:4
15:13:43.459 [main] INFO com.example.demo.ProcessorTest - flux1 subscriber:5
15:13:44.463 [main] INFO com.example.demo.ProcessorTest - finish publish data
15:13:47.466 [main] INFO com.example.demo.ProcessorTest - begin to subscribe flux2
15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:3
15:13:47.467 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:4
15:13:47.468 [main] INFO com.example.demo.ProcessorTest - flux2 subscriber:5

asynchronous

TopicProcessor

  • TopicProcessor is an asynchronous processor that supports concurrent playback of multiple publisher when shared is set to true. If the subscribed publ isher is a concurrent stream or the onNext,onCompleete,onError methods of Topicrocessor need to be called concurrently, then share must be forced to open. Closing share is a processor that follows the reactive streams specification, and concurrent calls are not allowed.
  • TopicProcessor also supports fan-out of messages to multiple subscribers, which binds a thread to each subscriber. The maximum number of subscriber that can be supported is limited by the thread pool executor.
  • TopicProcessor uses RingBuffer data structure to push data, and each subscriber thread records the location of its consumption in RingBuffer.
  • TopicProcessor also supports the autoCancel option, which defaults to true, that is, publisher will also be automatically cannel when all subscriber cancel their subscriptions.
    @Test
    public void testTopicProcessor() throws InterruptedException {
        TopicProcessor<Integer> topicProcessor = TopicProcessor.<Integer>builder()
                .share(true)
//                .executor(Executors.newSingleThreadExecutor())
                .build();
        Flux<Integer> flux1 = topicProcessor
                .map(e -> e);
        Flux<Integer> flux2 = topicProcessor
                .map(e -> e);
        Flux<Integer> flux3 = topicProcessor
                .map(e -> e);

        AtomicInteger count = new AtomicInteger(0);
        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
            count.incrementAndGet();
        });
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });
        flux3.subscribe(e -> {
            LOGGER.info("flux3 subscriber:{}",e);
        });

        IntStream.rangeClosed(1,100)
                .parallel()
                .forEach(e -> {
//                    LOGGER.info("emit:{}",e);
                    topicProcessor.onNext(e);
                });

        topicProcessor.onComplete();
        topicProcessor.blockLast();

        TimeUnit.SECONDS.sleep(10);
        System.out.println(count.get());
    }

Pay attention to two places:

  • share

What is set behind share is EventLoopProcessor’s multiproducers property.
reactor-core-3.1.2.RELEASE-sources.jar! /reactor/core/publisher/EventLoopProcessor.java

EventLoopProcessor(
            int bufferSize,
            @Nullable ThreadFactory threadFactory,
            @Nullable ExecutorService executor,
            ExecutorService requestExecutor,
            boolean autoCancel,
            boolean multiproducers,
            Supplier<Slot<IN>> factory,
            WaitStrategy strategy) {

        if (!Queues.isPowerOfTwo(bufferSize)) {
            throw new IllegalArgumentException("bufferSize must be a power of 2 : " + bufferSize);
        }

        if (bufferSize < 1){
            throw new IllegalArgumentException("bufferSize must be strictly positive, " +
                    "was: "+bufferSize);
        }

        this.autoCancel = autoCancel;

        contextClassLoader = new EventLoopContext(multiproducers);

        this.name = defaultName(threadFactory, getClass());

        this.requestTaskExecutor = Objects.requireNonNull(requestExecutor, "requestTaskExecutor");

        if (executor == null) {
            this.executor = Executors.newCachedThreadPool(threadFactory);
        }
        else {
            this.executor = executor;
        }

        if (multiproducers) {
            this.ringBuffer = RingBuffer.createMultiProducer(factory,
                    bufferSize,
                    strategy,
                    this);
        }
        else {
            this.ringBuffer = RingBuffer.createSingleProducer(factory,
                    bufferSize,
                    strategy,
                    this);
        }
    }

If share is true, createMultiProducer is created.
The concrete representation is that if the processor’s onNext method is called by multiple threads and share is not turned on, there will be a concurrency problem, i.e. data will be lost. For example, if share(true) is commented out in the above code, the size of the last count will not necessarily be 100, while if share is turned on, the size of the last count will be guaranteed to be 100.

If Executor (Executors. NewSinglethreadeExecutor ()) is set, subscribers to flux1,flux2,flux3 are executed sequentially, not concurrently.

WorkQueueProcessor

  • WorkQueueprocessor is also an asynchronous processor. When shared is set to true, it supports concurrent playback of multiple publisher.
  • WorkQueueProcessor uses RingBuffer data structure to push data.
  • WorkQueueProcessor does not create a thread for every subscriber, so it is a little more scalable than TopicProcessor. The maximum number of subscriber that can be supported is limited by the thread pool executor. However, it is worth noting that it is best not to add too many subscriber to the WorkQueueProcessor, which will increase the lock contention of the processor. It is better to use ThreadPoolExecutor or or ForkJoinPool. processor can detect their capac ity and throw exceptions when there are too many subscribers.
  • WorkQueueProcessor do not follow the specifications of reactive streams and therefore consume less resources than TopicProcessor. As trade-off, the request of all subscribers will be accumulated together, and then the WorkQueueProcessor will only replay data to one subscriber at a time. compared with TopicProcessorde fan-out broadcast mode, it is similar to round-robin mode, but fair round-robin mode is not guaranteed.
    @Test
    public void testWorkQueueProcessor(){
        WorkQueueProcessor<Integer> workQueueProcessor = WorkQueueProcessor.create();
        Flux<Integer> flux1 = workQueueProcessor
                .map(e -> e);
        Flux<Integer> flux2 = workQueueProcessor
                .map(e -> e);
        Flux<Integer> flux3 = workQueueProcessor
                .map(e -> e);

        flux1.subscribe(e -> {
            LOGGER.info("flux1 subscriber:{}",e);
        });
        flux2.subscribe(e -> {
            LOGGER.info("flux2 subscriber:{}",e);
        });
        flux3.subscribe(e -> {
            LOGGER.info("flux3 subscriber:{}",e);
        });

        IntStream.range(1,20)
                .forEach(e -> {
                    workQueueProcessor.onNext(e);
                });

        workQueueProcessor.onComplete();
        workQueueProcessor.blockLast();
    }

Output instance

21:56:58.203 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
21:56:58.214 [main] DEBUG reactor.core.publisher.UnsafeSupport - Starting UnsafeSupport init in Java 1.8
21:56:58.215 [main] DEBUG reactor.core.publisher.UnsafeSupport - Unsafe is available
21:56:58.228 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:1
21:56:58.228 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:3
21:56:58.228 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:2
21:56:58.229 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:4
21:56:58.229 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:5
21:56:58.229 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:6
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:7
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:8
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:9
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:10
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:11
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:12
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:13
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:14
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:15
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:17
21:56:58.230 [WorkQueueProcessor-1] INFO com.example.demo.ProcessorTest - flux1 subscriber:16
21:56:58.230 [WorkQueueProcessor-3] INFO com.example.demo.ProcessorTest - flux3 subscriber:19
21:56:58.230 [WorkQueueProcessor-2] INFO com.example.demo.ProcessorTest - flux2 subscriber:18

It can be seen that the subscriber of the WorkQueueProcessor is similar to kafka’s consumers who belong to the same group. the sum of the messages they consume is the total messages published by publisher, unlike the broadcast messaging of TopicProcessor.

doc