Reactive streams and observer mode

  reactive-programming

Order

This article mainly studies the reactive streams and observer mode in java.

reactive streams

Reactive programming paradigm is an asynchronous programming paradigm, which mainly involves data flow and propagation of changes. It can be seen as an extension of observer design pattern.

Iterator in java is based on pull model, that is, subscribers use next to pull down the next data. Whereas reactive streams is based on push model, subscribers call subscriber method to subscribe, and publishers call subscriber’s onNext to notify subscribers of new messages.

reactive streams java api

Reactive streams defines 4 java api as follows

Processor<T,R>

Processor is both Subscriber and Publisher, representing the processing stages of both.

Publisher<T>

Publisher is the provider of data and publishes the data to subscribers.

Subscriber<T>

After calling Publisher.subscribe(Subscriber subscription will be called.

Subscription

Subscription represents a subscription cycle between the subscriber and the publisher. Once cancel is called to remove the subscription, the publisher will not push the message again.

observer mode

The realization of observer mode includes push model and pull model.

  • Pull model

That is, the publisher notifies the subscriber of new news, and the subscriber goes to the publisher to retrieve it.

  • Push model

That is, the publisher informs the subscriber that there is a message and has brought a new message with him at the time of notification.

Reactor instance

maven

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.1.2.RELEASE</version>
        </dependency>

Reactor 3 is an implementation of reactive streams in java. java api based on reactive streams is the foundation of spring 5 reactive programming.

Flux instance

    @Test
    public void testBackpressure(){
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}
                });

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Summary

From the above code, reactive streams is actually a combination of push-pull combination mode. Why do you want to pull?

rabbitmq vs kafka

Rabbitmq is push-based. If consumers cannot keep up with their spending power, messages will pile up in memory queues (Write to disk if necessary)

Kafka is mainly pulling. Producers push messages to broker, and consumers pull messages from broker according to their own capabilities. Since messages are persistent, there is no need to care about the imbalance of production and consumption rates.

backpressure

Backpressure is derived to deal with the imbalance between production rate and consumption rate. Subscribers can use the request method to tell publishers to take N data according to their own situation in the next method, while publishers push N data to subscribers. Through request, the feedback of subscribers to publishers is reached. For publishers, in order to implement backpressure, a cache queue is needed to buffer data that subscribers have not had time to consume. When it comes to buffering, it involves whether the capacity is bounded or unbounded. If it is bounded, what is the processing strategy when buffering is slow, etc.

doc