Talk about LoadBalancerClientFilter of spring cloud gateway.

  springcloud

Order

This article mainly studies the LoadBalancerClientFilter of spring cloud gateway.

GatewayLoadBalancerClientAutoConfiguration

spring-cloud-gateway-core-2.0.0.RELEASE-sources.jar! /org/springframework/cloud/gateway/config/GatewayLoadBalancerClientAutoConfiguration.java

@Configuration
@ConditionalOnClass({LoadBalancerClient.class, RibbonAutoConfiguration.class, DispatcherHandler.class})
@AutoConfigureAfter(RibbonAutoConfiguration.class)
public class GatewayLoadBalancerClientAutoConfiguration {

    // GlobalFilter beans

    @Bean
    @ConditionalOnBean(LoadBalancerClient.class)
    public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client) {
        return new LoadBalancerClientFilter(client);
    }
}

If ribbon is detected, LoadBalancerClientFilter is opened.

LoadBalancerClientFilter

spring-cloud-gateway-core-2.0.0.RELEASE-sources.jar! /org/springframework/cloud/gateway/filter/LoadBalancerClientFilter.java

public class LoadBalancerClientFilter implements GlobalFilter, Ordered {

    private static final Log log = LogFactory.getLog(LoadBalancerClientFilter.class);
    public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;

    private final LoadBalancerClient loadBalancer;

    public LoadBalancerClientFilter(LoadBalancerClient loadBalancer) {
        this.loadBalancer = loadBalancer;
    }

    @Override
    public int getOrder() {
        return LOAD_BALANCER_CLIENT_FILTER_ORDER;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
        if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
            return chain.filter(exchange);
        }
        //preserve the original url
        addOriginalRequestUrl(exchange, url);

        log.trace("LoadBalancerClientFilter url before: " + url);

        final ServiceInstance instance = loadBalancer.choose(url.getHost());

        if (instance == null) {
            throw new NotFoundException("Unable to find instance for " + url.getHost());
        }

        URI uri = exchange.getRequest().getURI();

        // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
        // if the loadbalancer doesn't provide one.
        String overrideScheme = null;
        if (schemePrefix != null) {
            overrideScheme = url.getScheme();
        }

        URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);

        log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
        return chain.filter(exchange);
    }
    //......
}
  • Here, the scheme is obtained first, and if it is not lb, it is passed directly to the next filter.
  • After that, the service instance is selected through loadbalancer.choose (URL.gethost ())
  • Finally, the requestUrl is constructed and set into the GATEWAY_REQUEST_URL_ATTR attribute.

GATEWAY_SCHEME_PREFIX_ATTR

spring-cloud-gateway-core-2.0.0.RELEASE-sources.jar! /org/springframework/cloud/gateway/filter/RouteToRequestUrlFilter.java
GATEWAY_SCHEME_PREFIX_ATTR is set in RouteToRequestUrlFilter.

public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {

    private static final Log log = LogFactory.getLog(RouteToRequestUrlFilter.class);

    public static final int ROUTE_TO_URL_FILTER_ORDER = 10000;
    private static final String SCHEME_REGEX = "[a-zA-Z]([a-zA-Z]|\\d|\\+|\\.|-)*:.*";
    static final Pattern schemePattern = Pattern.compile(SCHEME_REGEX);

    @Override
    public int getOrder() {
        return ROUTE_TO_URL_FILTER_ORDER;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
        if (route == null) {
            return chain.filter(exchange);
        }
        log.trace("RouteToRequestUrlFilter start");
        URI uri = exchange.getRequest().getURI();
        boolean encoded = containsEncodedParts(uri);
        URI routeUri = route.getUri();

        if (hasAnotherScheme(routeUri)) {
            // this is a special url, save scheme to special attribute
            // replace routeUri with schemeSpecificPart
            exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
            routeUri = URI.create(routeUri.getSchemeSpecificPart());
        }

        URI requestUrl = UriComponentsBuilder.fromUri(uri)
                .uri(routeUri)
                .build(encoded)
                .toUri();
        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
        return chain.filter(exchange);
    }

    /* for testing */ static boolean hasAnotherScheme(URI uri) {
        return schemePattern.matcher(uri.getSchemeSpecificPart()).matches() && uri.getHost() == null
                && uri.getRawPath() == null;
    }
}

GATEWAY_REQUEST_URL_ATTR

spring-cloud-gateway-core-2.0.0.RELEASE-sources.jar! /org/springframework/cloud/gateway/filter/NettyRoutingFilter.java

public class NettyRoutingFilter implements GlobalFilter, Ordered {

    private final HttpClient httpClient;
    private final ObjectProvider<List<HttpHeadersFilter>> headersFilters;

    public NettyRoutingFilter(HttpClient httpClient,
            ObjectProvider<List<HttpHeadersFilter>> headersFilters) {
        this.httpClient = httpClient;
        this.headersFilters = headersFilters;
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

        String scheme = requestUrl.getScheme();
        if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
            return chain.filter(exchange);
        }
        setAlreadyRouted(exchange);

        ServerHttpRequest request = exchange.getRequest();

        final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
        final String url = requestUrl.toString();

        HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(),
                exchange);

        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        filtered.forEach(httpHeaders::set);

        String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
        boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);

        boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);

        return this.httpClient.request(method, url, req -> {
            final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
                    .headers(httpHeaders)
                    .chunkedTransfer(chunkedTransfer)
                    .failOnServerError(false)
                    .failOnClientError(false);

            if (preserveHost) {
                String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                proxyRequest.header(HttpHeaders.HOST, host);
            }

            return proxyRequest.sendHeaders() //I shouldn't need this
                    .send(request.getBody().map(dataBuffer ->
                            ((NettyDataBuffer)dataBuffer).getNativeBuffer()));
        }).doOnNext(res -> {
            ServerHttpResponse response = exchange.getResponse();
            // put headers and status so filters can modify the response
            HttpHeaders headers = new HttpHeaders();

            res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

            exchange.getAttributes().put("original_response_content_type", headers.getContentType());

            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
                    this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE);
            
            response.getHeaders().putAll(filteredResponseHeaders);
            HttpStatus status = HttpStatus.resolve(res.status().code());
            if (status != null) {
                response.setStatusCode(status);
            } else if (response instanceof AbstractServerHttpResponse) {
                // https://jira.spring.io/browse/SPR-16748
                ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code());
            } else {
                throw new IllegalStateException("Unable to set status code on response: " +res.status().code()+", "+response.getClass());
            }

            // Defer committing the response until all route filters have run
            // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
        }).then(chain.filter(exchange));
    }
}

In NettyRoutingFilter, read the requestUrl according to the GATEWAY_REQUEST_URL_ATTR attribute, and then make the corresponding request.

Summary

LoadBalancerClientFilter will act on the route with the url beginning with lb, then use LoadBalancer to obtain the service instance, construct the target requestUrl, and set it into the GATEWAY_REQUEST_URL_ATTR attribute for NettyRoutingFilter to use.

doc