Encapsulation of TcpClient by HttpClient in reactor-netty

  reactor

Order

This paper mainly studies the encapsulation of TcpClien by HttpClient in reactor-netty

maven

        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.7.3.RELEASE</version>
        </dependency>

Example

        HttpClient client = HttpClient.create();
        Mono<HttpClientResponse> mono = client.get("http://baidu.com");
        //NOTE reactor.ipc.netty.http.client.MonoHttpClientResponse
        LOGGER.info("mono resp:{}",mono.getClass());
        mono.subscribe();

HttpClient.request

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/http/client/HttpClient.java

    /**
     * Use the passed HTTP method to send to the given URL. When connection has been made,
     * the passed handler is invoked and can be used to tune the request and
     * write data to it.
     *
     * @param method the HTTP method to send
     * @param url the target remote URL
     * @param handler the {@link Function} to invoke on opened TCP connection
     * @return a {@link Mono} of the {@link HttpServerResponse} ready to consume for
     * response
     */
    public Mono<HttpClientResponse> request(HttpMethod method,
            String url,
            Function<? super HttpClientRequest, ? extends Publisher<Void>> handler) {

        if (method == null || url == null) {
            throw new IllegalArgumentException("Method && url cannot be both null");
        }

        return new MonoHttpClientResponse(this, url, method, handler(handler, options));
    }

Mono.subscribe

reactor-core-3.1.3.RELEASE-sources.jar! /reactor/core/publisher/Mono.java

    /**
     * Subscribe to this {@link Mono} and request unbounded demand.
     * <p>
     * This version doesn't specify any consumption behavior for the events from the
     * chain, especially no error handling, so other variants should usually be preferred.
     *
     * <p>
     * <img width="500" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/unbounded1.png" alt="">
     * <p>
     *
     * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription}
     */
    public final Disposable subscribe() {
        if(this instanceof MonoProcessor){
            MonoProcessor<T> s = (MonoProcessor<T>)this;
            s.connect();
            return s;
        }
        else{
            return subscribeWith(new LambdaMonoSubscriber<>(null, null, null, null));
        }
    }

SubscribeWith is called here to create a LambdaMonoSubscriber.

    /**
     * Subscribe the given {@link Subscriber} to this {@link Mono} and return said
     * {@link Subscriber} (eg. a {@link MonoProcessor}).
     *
     * @param subscriber the {@link Subscriber} to subscribe with
     * @param <E> the reified type of the {@link Subscriber} for chaining
     *
     * @return the passed {@link Subscriber} after subscribing it to this {@link Mono}
     */
    public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
        subscribe(subscriber);
        return subscriber;
    }
    public final void subscribe(Subscriber<? super T> actual) {
        onLastAssembly(this).subscribe(Operators.toCoreSubscriber(actual));
    }    

This OnAstAssembly (this). Subscribe calls methods of subclasses

MonoHttpClientResponse.subscribe

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/http/client/MonoHttpClientResponse.java

    public void subscribe(final CoreSubscriber<? super HttpClientResponse> subscriber) {
        ReconnectableBridge bridge = new ReconnectableBridge();
        bridge.activeURI = startURI;

        Mono.defer(() -> parent.client.newHandler(new HttpClientHandler(this, bridge),
                parent.options.getRemoteAddress(bridge.activeURI),
                HttpClientOptions.isSecure(bridge.activeURI),
                bridge))
            .retry(bridge)
            .cast(HttpClientResponse.class)
            .subscribe(subscriber);
    }

This is the first time that I’ve defer been able to do this.Defers the creation of the actual Publisher the Subscriber will be subscribed to., which means delaying the creation of publisher

The subscriber here is operators. tocorresubscribe (lambdamonosubscriber)

You can see the client.newHandler that called parent here. parent here is HttpClient, and the client inside is TcpClient.

Retry uses ReconnectableBridge and handler uses HttpClientHandler.

MonoHttpClientResponse#ReconnectableBridge

    static final class ReconnectableBridge
            implements Predicate<Throwable>, Consumer<Channel> {

        volatile URI      activeURI;
        volatile String[] redirectedFrom;

        ReconnectableBridge() {
        }

        void redirect(String to) {
            String[] redirectedFrom = this.redirectedFrom;
            URI from = activeURI;
            try {
                activeURI = new URI(to);
            }
            catch (URISyntaxException e) {
                throw Exceptions.propagate(e);
            }
            if (redirectedFrom == null) {
                this.redirectedFrom = new String[]{from.toString()};
            }
            else {
                String[] newRedirectedFrom = new String[redirectedFrom.length + 1];
                System.arraycopy(redirectedFrom,
                        0,
                        newRedirectedFrom,
                        0,
                        redirectedFrom.length);
                newRedirectedFrom[redirectedFrom.length] = from.toString();
                this.redirectedFrom = newRedirectedFrom;
            }
        }

        @Override
        public void accept(Channel channel) {
            String[] redirectedFrom = this.redirectedFrom;
            if (redirectedFrom != null) {
                channel.attr(HttpClientOperations.REDIRECT_ATTR_KEY)
                       .set(redirectedFrom);
            }
        }

        @Override
        public boolean test(Throwable throwable) {
            if (throwable instanceof RedirectClientException) {
                RedirectClientException re = (RedirectClientException) throwable;
                redirect(re.location);
                return true;
            }
            if (AbortedException.isConnectionReset(throwable)) {
                redirect(activeURI.toString());
                return true;
            }
            return false;
        }
    }

This seems to be dealing with redirect, not the real meaning of retry, such as how many times

MonoHttpClientResponse#HttpClientHandler

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/http/client/MonoHttpClientResponse.java

    static final class HttpClientHandler
            implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {

        final MonoHttpClientResponse parent;
        final ReconnectableBridge    bridge;

        HttpClientHandler(MonoHttpClientResponse parent, ReconnectableBridge bridge) {
            this.bridge = bridge;
            this.parent = parent;
        }

        @Override
        public Publisher<Void> apply(NettyInbound in, NettyOutbound out) {
            try {
                URI uri = bridge.activeURI;
                HttpClientOperations ch = (HttpClientOperations) in;
                String host = uri.getHost();
                int port = uri.getPort();
                if (port != -1 && port != 80 && port != 443) {
                    host = host + ':' + port;
                }
                ch.getNettyRequest()
                  .setUri(uri.getRawPath() + (uri.getQuery() == null ? "" :
                          "?" + uri.getRawQuery()))
                  .setMethod(parent.method)
                  .setProtocolVersion(HttpVersion.HTTP_1_1)
                  .headers()
                  .add(HttpHeaderNames.HOST, host)
                  .add(HttpHeaderNames.ACCEPT, ALL);

                if (parent.method == HttpMethod.GET
                        || parent.method == HttpMethod.HEAD
                        || parent.method == HttpMethod.DELETE) {
                    ch.chunkedTransfer(false);
                }

                if (parent.handler != null) {
                    return parent.handler.apply(ch);
                }
                else {
                    return ch.send();
                }
            }
            catch (Throwable t) {
                return Mono.error(t);
            }
        }

        @Override
        public String toString() {
            return "HttpClientHandler{" + "startURI=" + bridge.activeURI + ", method=" + parent.method + ", handler=" + parent.handler + '}';
        }

    }

Handler here can see traces of netty, and finally directly call the HttpClientOperations.send method.

reactor-netty-0.7.3.RELEASE-sources.jar! /reactor/ipc/netty/http/client/HttpClientOperations.java

    public Mono<Void> send() {
        if (markSentHeaderAndBody()) {
            HttpMessage request = newFullEmptyBodyMessage();
            return FutureMono.deferFuture(() -> channel().writeAndFlush(request));
        }
        else {
            return Mono.empty();
        }
    }

Finally call netty’s Channel (). WriteAndFlush (Request) to send the request

Summary

HttpClient in reactor-netty bridges TcpClient, while TcpClient is implemented based on netty.