Talk about retry of reactor extra.

  reactor

Order

This article mainly studies the retry of reactor extra.

maven

        <dependency>
            <groupId>io.projectreactor.addons</groupId>
            <artifactId>reactor-extra</artifactId>
            <version>3.1.4.RELEASE</version>
        </dependency>

Example

        TcpClient client = TcpClient.create("localhost", 8888);
        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .subscribe();

The TcpClient above will directly report an error when the server is not started.

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8888
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
    ... 10 more

Connection Failed Reconnect

Simple retry

        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retry(3)
                .subscribe();

Retry can directly specify the number of retries

Advanced retry

        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retryWhen(Retry.allBut(RuntimeException.class)
                .retryMax(5000)
                .fixedBackoff(Duration.ofSeconds(5))
                .doOnRetry(e -> {
                    e.exception().printStackTrace();
                })
        )
                .subscribe();

Using the Retry help class in the reactor extra project, you can easily specify advanced retry policies, such as fixedBackoff, or exponentialBackoff, etc.

        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retryWhen(Retry.allBut(RuntimeException.class)
                .retryMax(5000)           .exponentialBackoffWithJitter(Duration.ofMillis(100),Duration.ofMillis(500))
                .doOnRetry(e -> {
                    e.exception().printStackTrace();
                })
        )
                .subscribe();

The first parameter is the firstBackoff time, and the second parameter is maxBackoff, which is maxbackoff. if null, it is equivalent to duration.of seconds (long.max _ value)

Retry

reactor-extra-3.1.4.RELEASE-sources.jar! /reactor/retry/Retry.java

    /**
     * Returns a retry function that retries errors resulting from all exceptions except
     * the specified non-retriable exceptions. More constraints may be added using
     * {@link #retryMax(int)} or {@link #timeout(Duration)}.
     *
     * @param nonRetriableExceptions exceptions that may not be retried
     * @return retry function that retries all exceptions except the specified non-retriable exceptions.
     */
    @SafeVarargs
    static <T> Retry<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) {
        Predicate<? super RetryContext<T>> predicate = context -> {
            Throwable exception = context.exception();
            if (exception == null)
                return true;
            for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
                if (clazz.isInstance(exception))
                    return false;
            }
            return true;
        };
        return DefaultRetry.<T>create(predicate);
    }

You can see that using DefaultRetry to create

reactor-extra-3.1.4.RELEASE-sources.jar! /reactor/retry/DefaultRetry.java

    public static <T> DefaultRetry<T> create(Predicate<? super RetryContext<T>> retryPredicate) {
        return new DefaultRetry<T>(retryPredicate,
                1,
                null,
                Backoff.zero(),
                Jitter.noJitter(),
                null,
                NOOP_ON_RETRY,
                (T) null);
    }

Note that maxIterations here defaults to 1, which means that if retryMax is not specified, the advanced retry strategy will be wasted. this requires extra attention.

Summary

The Retry tool class provided by Reactor Extra is very easy to use and is worth testing.

doc