Talk about the ribbon’s retry

  springcloud

Order

This article mainly studies the ribbon’s retry

Configuration

HttpClientRibbonConfiguration

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar! /org/springframework/cloud/netflix/ribbon/apache/HttpClientRibbonConfiguration.java

    @Bean
    @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
            IClientConfig config, ServerIntrospector serverIntrospector,
            ILoadBalancer loadBalancer, RetryHandler retryHandler,
            LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient) {
        RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
            httpClient, config, serverIntrospector, loadBalancedRetryFactory);
        client.setLoadBalancer(loadBalancer);
        client.setRetryHandler(retryHandler);
        Monitors.registerObject("Client_" + this.name, client);
        return client;
    }

OkHttpRibbonConfiguration

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar! /org/springframework/cloud/netflix/ribbon/okhttp/OkHttpRibbonConfiguration.java

    @Bean
    @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
    public RetryableOkHttpLoadBalancingClient retryableOkHttpLoadBalancingClient(
        IClientConfig config,
        ServerIntrospector serverIntrospector,
        ILoadBalancer loadBalancer,
        RetryHandler retryHandler,
        LoadBalancedRetryFactory loadBalancedRetryFactory,
        OkHttpClient delegate) {
        RetryableOkHttpLoadBalancingClient client = new RetryableOkHttpLoadBalancingClient(delegate, config,
                serverIntrospector, loadBalancedRetryFactory);
        client.setLoadBalancer(loadBalancer);
        client.setRetryHandler(retryHandler);
        Monitors.registerObject("Client_" + this.name, client);
        return client;
    }

RetryableRibbonLoadBalancingHttpClient

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar! /org/springframework/cloud/netflix/ribbon/apache/RetryableRibbonLoadBalancingHttpClient.java

/**
 * An Apache HTTP client which leverages Spring Retry to retry failed requests.
 * @author Ryan Baxter
 * @author Gang Li
 */
public class RetryableRibbonLoadBalancingHttpClient extends RibbonLoadBalancingHttpClient {
    private LoadBalancedRetryFactory loadBalancedRetryFactory;

    public RetryableRibbonLoadBalancingHttpClient(CloseableHttpClient delegate,
                                                  IClientConfig config, ServerIntrospector serverIntrospector,
                                                  LoadBalancedRetryFactory loadBalancedRetryFactory) {
        super(delegate, config, serverIntrospector);
        this.loadBalancedRetryFactory = loadBalancedRetryFactory;
    }

    @Override
    public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
        final RequestConfig.Builder builder = RequestConfig.custom();
        IClientConfig config = configOverride != null ? configOverride : this.config;
        RibbonProperties ribbon = RibbonProperties.from(config);
        builder.setConnectTimeout(ribbon.connectTimeout(this.connectTimeout));
        builder.setSocketTimeout(ribbon.readTimeout(this.readTimeout));
        builder.setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects));

        final RequestConfig requestConfig = builder.build();
        final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);

        RetryCallback<RibbonApacheHttpResponse, Exception> retryCallback = context -> {
            //on retries the policy will choose the server and set it in the context
            //extract the server and update the request being made
            RibbonApacheHttpRequest newRequest = request;
            if (context instanceof LoadBalancedRetryContext) {
                ServiceInstance service = ((LoadBalancedRetryContext) context).getServiceInstance();
                validateServiceInstance(service);
                if (service != null) {
                    //Reconstruct the request URI using the host and port set in the retry context
                    newRequest = newRequest.withNewUri(UriComponentsBuilder.newInstance().host(service.getHost())
                            .scheme(service.getUri().getScheme()).userInfo(newRequest.getURI().getUserInfo())
                            .port(service.getPort()).path(newRequest.getURI().getPath())
                            .query(newRequest.getURI().getQuery()).fragment(newRequest.getURI().getFragment())
                            .build().encode().toUri());
                }
            }
            newRequest = getSecureRequest(newRequest, configOverride);
            HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);
            final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
            if (retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {
                throw new HttpClientStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,
                        httpResponse, HttpClientUtils.createEntity(httpResponse), httpUriRequest.getURI());
            }
            return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
        };
        LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse> recoveryCallback = new LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse>() {
            @Override
            protected RibbonApacheHttpResponse createResponse(HttpResponse response, URI uri) {
                return new RibbonApacheHttpResponse(response, uri);
            }
         };
        return this.executeWithRetry(request, retryPolicy, retryCallback, recoveryCallback);
    }
    
    @Override
    public boolean isClientRetryable(ContextAwareRequest request) {
        return request!= null && isRequestRetryable(request);
    }

    private boolean isRequestRetryable(ContextAwareRequest request) {
        if (request.getContext() == null || request.getContext().getRetryable() == null) {
            return true;
        }
        return request.getContext().getRetryable();
    }

    private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, LoadBalancedRetryPolicy retryPolicy,
                                                      RetryCallback<RibbonApacheHttpResponse, Exception> callback,
                                                      RecoveryCallback<RibbonApacheHttpResponse> recoveryCallback) throws Exception {
        RetryTemplate retryTemplate = new RetryTemplate();
        boolean retryable = isRequestRetryable(request);
        retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
                : new RetryPolicy(request, retryPolicy, this, this.getClientName()));
        BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
        retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
        RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());
        if (retryListeners != null && retryListeners.length != 0) {
            retryTemplate.setListeners(retryListeners);
        }
        return retryTemplate.execute(callback, recoveryCallback);
    }

    @Override
    public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
        return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
    }

    static class RetryPolicy extends InterceptorRetryPolicy {
        public RetryPolicy(HttpRequest request, LoadBalancedRetryPolicy policy,
                ServiceInstanceChooser serviceInstanceChooser, String serviceName) {
            super(request, policy, serviceInstanceChooser, serviceName);
        }
    }
}
  • The focus here is on executeWithRetry, which is implemented by setting the RetryPolicy and BackOffPolicy of the RetryTemplate.

RetryableOkHttpLoadBalancingClient

spring-cloud-netflix-ribbon-2.0.0.RELEASE-sources.jar! /org/springframework/cloud/netflix/ribbon/okhttp/RetryableOkHttpLoadBalancingClient.java

/**
 * An OK HTTP client which leverages Spring Retry to retry failed request.
 * @author Ryan Baxter
 * @author Gang Li
 */
public class RetryableOkHttpLoadBalancingClient extends OkHttpLoadBalancingClient {

    private LoadBalancedRetryFactory loadBalancedRetryFactory;

    public RetryableOkHttpLoadBalancingClient(OkHttpClient delegate, IClientConfig config, ServerIntrospector serverIntrospector,
                                              LoadBalancedRetryFactory loadBalancedRetryPolicyFactory) {
        super(delegate, config, serverIntrospector);
        this.loadBalancedRetryFactory = loadBalancedRetryPolicyFactory;
    }

    @Override
    public boolean isClientRetryable(ContextAwareRequest request) {
        return request!= null && isRequestRetryable(request);
    }
    
    private boolean isRequestRetryable(ContextAwareRequest request) {
        if (request.getContext() == null || request.getContext().getRetryable() == null) {
            return true;
        }
        return request.getContext().getRetryable();
    }
    
    private OkHttpRibbonResponse executeWithRetry(OkHttpRibbonRequest request, LoadBalancedRetryPolicy retryPolicy,
                                                  RetryCallback<OkHttpRibbonResponse, Exception> callback,
                                                  RecoveryCallback<OkHttpRibbonResponse> recoveryCallback) throws Exception {
        RetryTemplate retryTemplate = new RetryTemplate();
        BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
        retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
        RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());
        if (retryListeners != null && retryListeners.length != 0) {
            retryTemplate.setListeners(retryListeners);
        }
        boolean retryable = isRequestRetryable(request);
        retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
                : new RetryPolicy(request, retryPolicy, this, this.getClientName()));
        return retryTemplate.execute(callback, recoveryCallback);
    }

    @Override
    public OkHttpRibbonResponse execute(final OkHttpRibbonRequest ribbonRequest,
                                        final IClientConfig configOverride) throws Exception {
        final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);
        RetryCallback<OkHttpRibbonResponse, Exception> retryCallback  = new RetryCallback<OkHttpRibbonResponse, Exception>() {
            @Override
            public OkHttpRibbonResponse doWithRetry(RetryContext context) throws Exception {
                //on retries the policy will choose the server and set it in the context
                //extract the server and update the request being made
                OkHttpRibbonRequest newRequest = ribbonRequest;
                if(context instanceof LoadBalancedRetryContext) {
                    ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();
                    validateServiceInstance(service);
                    //Reconstruct the request URI using the host and port set in the retry context
                    newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),
                            newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),
                            newRequest.getURI().getPath(), newRequest.getURI().getQuery(),
                            newRequest.getURI().getFragment()));
                }
                if (isSecure(configOverride)) {
                    final URI secureUri = UriComponentsBuilder.fromUri(newRequest.getUri())
                            .scheme("https").build().toUri();
                    newRequest = newRequest.withNewUri(secureUri);
                }
                OkHttpClient httpClient = getOkHttpClient(configOverride, secure);

                final Request request = newRequest.toRequest();
                Response response = httpClient.newCall(request).execute();
                if(retryPolicy.retryableStatusCode(response.code())) {
                    ResponseBody responseBody = response.peekBody(Integer.MAX_VALUE);
                    response.close();
                    throw new OkHttpStatusCodeException(RetryableOkHttpLoadBalancingClient.this.clientName,
                            response, responseBody, newRequest.getURI());
                }
                return new OkHttpRibbonResponse(response, newRequest.getUri());
            }
        };
        return this.executeWithRetry(ribbonRequest, retryPolicy, retryCallback, new LoadBalancedRecoveryCallback<OkHttpRibbonResponse, Response>(){

            @Override
            protected OkHttpRibbonResponse createResponse(Response response, URI uri) {
                return new OkHttpRibbonResponse(response, uri);
            }
        });
    }

    

    @Override
    public RequestSpecificRetryHandler getRequestSpecificRetryHandler(OkHttpRibbonRequest request, IClientConfig requestConfig) {
        return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
    }

    static class RetryPolicy extends InterceptorRetryPolicy {
        public RetryPolicy(HttpRequest request, LoadBalancedRetryPolicy policy, ServiceInstanceChooser serviceInstanceChooser, String serviceName) {
            super(request, policy, serviceInstanceChooser, serviceName);
        }
    }
}
  • ExecuteWithRetry here also implements the RetryPolicy by setting the retry policy and BackOffPolicy of the RetryTemplate.

RetryTemplate

spring-retry-1.2.2.RELEASE-sources.jar! /org/springframework/retry/support/RetryTemplate.java

public class RetryTemplate implements RetryOperations {

    /**
     * Retry context attribute name that indicates the context should be considered global
     * state (never closed). TODO: convert this to a flag in the RetryState.
     */
    private static final String GLOBAL_STATE = "state.global";

    protected final Log logger = LogFactory.getLog(getClass());

    private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy();

    private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3);

    private volatile RetryListener[] listeners = new RetryListener[0];

    private RetryContextCache retryContextCache = new MapRetryContextCache();

    private boolean throwLastExceptionOnExhausted;

    //......

    /**
     * Execute the callback once if the policy dictates that we can, otherwise execute the
     * recovery callback.
     * @param recoveryCallback the {@link RecoveryCallback}
     * @param retryCallback the {@link RetryCallback}
     * @param state the {@link RetryState}
     * @param <T> the type of the return value
     * @param <E> the exception type to throw
     * @see RetryOperations#execute(RetryCallback, RecoveryCallback, RetryState)
     * @throws ExhaustedRetryException if the retry has been exhausted.
     * @throws E an exception if the retry operation fails
     * @return T the retried value
     */
    protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
            RecoveryCallback<T> recoveryCallback, RetryState state)
            throws E, ExhaustedRetryException {

        RetryPolicy retryPolicy = this.retryPolicy;
        BackOffPolicy backOffPolicy = this.backOffPolicy;

        // Allow the retry policy to initialise itself...
        RetryContext context = open(retryPolicy, state);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("RetryContext retrieved: " + context);
        }

        // Make sure the context is available globally for clients who need
        // it...
        RetrySynchronizationManager.register(context);

        Throwable lastException = null;

        boolean exhausted = false;
        try {

            // Give clients a chance to enhance the context...
            boolean running = doOpenInterceptors(retryCallback, context);

            if (!running) {
                throw new TerminatedRetryException(
                        "Retry terminated abnormally by interceptor before first attempt");
            }

            // Get or Start the backoff context...
            BackOffContext backOffContext = null;
            Object resource = context.getAttribute("backOffContext");

            if (resource instanceof BackOffContext) {
                backOffContext = (BackOffContext) resource;
            }

            if (backOffContext == null) {
                backOffContext = backOffPolicy.start(context);
                if (backOffContext != null) {
                    context.setAttribute("backOffContext", backOffContext);
                }
            }

            /*
             * We allow the whole loop to be skipped if the policy or context already
             * forbid the first try. This is used in the case of external retry to allow a
             * recovery in handleRetryExhausted without the callback processing (which
             * would throw an exception).
             */
            while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

                try {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Retry: count=" + context.getRetryCount());
                    }
                    // Reset the last exception, so if we are successful
                    // the close interceptors will not think we failed...
                    lastException = null;
                    return retryCallback.doWithRetry(context);
                }
                catch (Throwable e) {

                    lastException = e;

                    try {
                        registerThrowable(retryPolicy, state, context, e);
                    }
                    catch (Exception ex) {
                        throw new TerminatedRetryException("Could not register throwable",
                                ex);
                    }
                    finally {
                        doOnErrorInterceptors(retryCallback, context, e);
                    }

                    if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                        try {
                            backOffPolicy.backOff(backOffContext);
                        }
                        catch (BackOffInterruptedException ex) {
                            lastException = e;
                            // back off was prevented by another thread - fail the retry
                            if (this.logger.isDebugEnabled()) {
                                this.logger
                                        .debug("Abort retry because interrupted: count="
                                                + context.getRetryCount());
                            }
                            throw ex;
                        }
                    }

                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug(
                                "Checking for rethrow: count=" + context.getRetryCount());
                    }

                    if (shouldRethrow(retryPolicy, context, state)) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Rethrow in retry for policy: count="
                                    + context.getRetryCount());
                        }
                        throw RetryTemplate.<E>wrapIfNecessary(e);
                    }

                }

                /*
                 * A stateful attempt that can retry may rethrow the exception before now,
                 * but if we get this far in a stateful retry there's a reason for it,
                 * like a circuit breaker or a rollback classifier.
                 */
                if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                    break;
                }
            }

            if (state == null && this.logger.isDebugEnabled()) {
                this.logger.debug(
                        "Retry failed last attempt: count=" + context.getRetryCount());
            }

            exhausted = true;
            return handleRetryExhausted(recoveryCallback, context, state);

        }
        catch (Throwable e) {
            throw RetryTemplate.<E>wrapIfNecessary(e);
        }
        finally {
            close(retryPolicy, context, state, lastException == null || exhausted);
            doCloseInterceptors(retryCallback, context, lastException);
            RetrySynchronizationManager.clear();
        }

    }

    //......
}
  • The logic of the entire retry is to control the number of retries through while. there is no return in the loop, and an ExhaustedRetryException will be thrown outside the loop.
  • The backOff method of BackOffPolicy performs retry intervals through sleeper.sleep
  • BackOffPolicy has a subinterface SleepingBackOffPolicy and an abstract class StatelessBackOffPolicy that implements the interface.
  • The implementation classes of SleepingBackOffPolicy include FixedBackOffPolicy, UniformRandomBackOffPolicy, ExponentialBackOffPolicy, ExponentialRandomBackOffPolicy
  • StatelessBackOffPolicy has three subclasses: FixedBackOffPolicy, NoBackOffPolicy, UniformRandomBackOffPolicy

Summary

There are two implementations for the retry of the ribbon, one is the RETRY BLERIBBON LoadBalanchingHTTPClient, and the other is the RETRY BLEOKHTTTPLoadBalanchingClient. They use the RetryTemplate of the spring-retry component to implement it. The template is mainly controlled by RetryPolicy and BackOffPolicy to retry. The logic of retry is to use while to control the number of retries, and then use BackOffPolicy to control the retry interval.

doc