Rsocket-java

  rsocket

Order

This article mainly studies rsocket-java

RSocket

rsocket-core-0.12.1-sources.jar! /io/rsocket/RSocket.java

public interface RSocket extends Availability, Closeable {

  /**
   * Fire and Forget interaction model of {@code RSocket}.
   *
   * @param payload Request payload.
   * @return {@code Publisher} that completes when the passed {@code payload} is successfully
   *     handled, otherwise errors.
   */
  Mono<Void> fireAndForget(Payload payload);

  /**
   * Request-Response interaction model of {@code RSocket}.
   *
   * @param payload Request payload.
   * @return {@code Publisher} containing at most a single {@code Payload} representing the
   *     response.
   */
  Mono<Payload> requestResponse(Payload payload);

  /**
   * Request-Stream interaction model of {@code RSocket}.
   *
   * @param payload Request payload.
   * @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
   */
  Flux<Payload> requestStream(Payload payload);

  /**
   * Request-Channel interaction model of {@code RSocket}.
   *
   * @param payloads Stream of request payloads.
   * @return Stream of response payloads.
   */
  Flux<Payload> requestChannel(Publisher<Payload> payloads);

  /**
   * Metadata-Push interaction model of {@code RSocket}.
   *
   * @param payload Request payloads.
   * @return {@code Publisher} that completes when the passed {@code payload} is successfully
   *     handled, otherwise errors.
   */
  Mono<Void> metadataPush(Payload payload);

  @Override
  default double availability() {
    return isDisposed() ? 0.0 : 1.0;
  }
}
  • The RSocket interface inherits the Availability (Defines the doublavailability () method) and Closeable (The Mono<Void> onClose () method is defined) interface
  • RSocket defines four interactionmodels corresponding to fireAndForget, requestResponse, requestStream and requestChannel methods respectively.
  • The Frame of RSocket includes metadata and data payload, where metadata is optional and can be used to describe data payload, so RSocket also defines the metadataPush method for push metadata.

Interaction Model

fireAndForget

    @Test
    public void testFireAndForget() throws InterruptedException {
        //SERVER
        RSocketFactory.receive()
                .acceptor(
                        (setupPayload, reactiveSocket) ->
                                Mono.just(
                                        new AbstractRSocket() {
                                            @Override
                                            public Mono<Void> fireAndForget(Payload payload) {
                                                System.out.printf("fire-forget: %s%n", payload.getDataUtf8());
                                                return Mono.empty();
                                            }
                                        }))
                .transport(TcpServerTransport.create("localhost", 8080))
                .start()
                .subscribe();

        //CLIENT
        RSocket socket =
                RSocketFactory.connect()
                        .transport(TcpClientTransport.create("localhost", 8080))
                        .start()
                        .block();

        socket
                .fireAndForget(DefaultPayload.create("Hello"))
                .block();

        socket.dispose();

        TimeUnit.SECONDS.sleep(5);
    }

Similar to udp, it does not need ack, and is more suitable for metrics reporting, access log reporting, etc.

requestResponse

    @Test
    public void testRequestResponse(){
        //SERVER
        RSocketFactory.receive()
                .acceptor(
                        (setupPayload, reactiveSocket) ->
                                Mono.just(
                                        new AbstractRSocket() {
                                            @Override
                                            public Mono<Payload> requestResponse(Payload p) {
                                                return Mono.just(p);
                                            }
                                        }))
                .transport(TcpServerTransport.create("localhost", 8080))
                .start()
                .subscribe();

        //CLIENT
        RSocket socket =
                RSocketFactory.connect()
                        .transport(TcpClientTransport.create("localhost", 8080))
                        .start()
                        .block();

        socket
                .requestResponse(DefaultPayload.create("Hello"))
                .map(Payload::getDataUtf8)
                .onErrorReturn("error")
                .doOnNext(System.out::println)
                .block();

        socket.dispose();
    }

Similar to http, but better than http because it is asynchronous and multiplexed.

requestStream

    @Test
    public void testRequestStream(){
        //SERVER
        RSocketFactory.receive()
                .acceptor(new SocketAcceptor() {
                    @Override
                    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
                        return Mono.just(
                                new AbstractRSocket() {
                                    @Override
                                    public Flux<Payload> requestStream(Payload payload) {
                                        return Flux.interval(Duration.ofMillis(100))
                                                .map(aLong -> DefaultPayload.create("Interval: " + aLong));
                                    }
                                });
                    }
                })
                .transport(TcpServerTransport.create("localhost", 8080))
                .start()
                .subscribe();

        //CLIENT
        RSocket socket =
                RSocketFactory.connect()
                        .transport(TcpClientTransport.create("localhost", 8080))
                        .start()
                        .block();

        socket
                .requestStream(DefaultPayload.create("Hello"))
                .map(Payload::getDataUtf8)
                .doOnNext(System.out::println)
                .take(10)
                .then()
                .doFinally(signalType -> socket.dispose())
                .then()
                .block();
    }

Similar to Request-Response (Return to Mono), but the return is Flux

requestChannel

    @Test
    public void testRequestChannel(){
        //SERVER
        RSocketFactory.receive()
                .acceptor(new SocketAcceptor(){
                    @Override
                    public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                        return Mono.just(
                                new AbstractRSocket() {
                                    @Override
                                    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                                        return Flux.from(payloads)
                                                .map(Payload::getDataUtf8)
                                                .map(s -> "Echo: " + s)
                                                .map(DefaultPayload::create);
                                    }
                                });
                    }
                })
                .transport(TcpServerTransport.create("localhost", 8080))
                .start()
                .subscribe();

        //CLIENT
        RSocket socket =
                RSocketFactory.connect()
                        .transport(TcpClientTransport.create("localhost", 8080))
                        .start()
                        .block();

        socket
                .requestChannel(
                        Flux.interval(Duration.ofMillis(1000)).map(i -> DefaultPayload.create("Hello")))
                .map(Payload::getDataUtf8)
                .doOnNext(System.out::println)
                .take(10)
                .doFinally(signalType -> socket.dispose())
                .then()
                .block();
    }

Similar to websocket, it can communicate in two directions.

MetadataPush

    @Test
    public void testMetadataPush() throws InterruptedException {
        //SERVER
        RSocketFactory.receive()
                .acceptor(
                        (setupPayload, reactiveSocket) ->
                                Mono.just(
                                        new AbstractRSocket() {

                                            @Override
                                            public Mono<Void> metadataPush(Payload payload) {
                                                System.out.printf("metadataPush: %s%n", payload.getDataUtf8());
                                                return Mono.empty();
                                            }
                                        }))
                .transport(TcpServerTransport.create("localhost", 8080))
                .start()
                .subscribe();

        //CLIENT
        RSocket socket =
                RSocketFactory.connect()
                        .transport(TcpClientTransport.create("localhost", 8080))
                        .start()
                        .block();

        socket
                .metadataPush(DefaultPayload.create("hello","version=1.0.0+"))
                .block();

        socket.dispose();

        TimeUnit.SECONDS.sleep(5);
    }
  • RSocket also defines the metadataPush method. unlike the fireAndForget method, the metadataPush method waits for datapush to succeed and then complete when receiving the complete signal sent by the other party.

Summary

  • RSocket is a bi-directional, multiplexed, message-based binary protocol.
  • There are four interactionmodels for RSocket, namely Request-Response, fire-and-forge, Request-Stream and Channel.
  • RSocket’s Frame contains metadata and data payload, where metadata is optional and can be used to describe datapayload; In addition to setting metadata in the Payload parameter of the four Interaction Model corresponding methods, the metadataPush method defined by RSocket can be used to specifically push metadata.

doc