[case17] Talk about WebClient’s LoadBalance Support

  springcloud

Order

This article mainly studies WebClient’s LoadBalance support

Code instance

Configuration

@Configuration
public class WebClientConfig {

    @Autowired
    private LoadBalancerExchangeFilterFunction lbFunction;

    @Bean
    public WebClient webClient(){
        return WebClient.builder()
                .filter(lbFunction)
                .build();
    }
}

Request department-service

@Component
public class DepartmentService {

    @Autowired
    WebClient webClient;

    public Flux<Department> getDepartmentsByOrgId(Long orgId) {
        return webClient
                .get()
                .uri("http://department-service/organization/{orgId}",orgId)
                .retrieve()
                .bodyToFlux(Department.class);
    }
}

controller

    @Autowired
    DepartmentService departmentService;

    @GetMapping("/departments")
    public Flux<Department> getDepartmentsById(Long orgId){
        return departmentService.getDepartmentsByOrgId(orgId);
    }

/flux/departments? orgId=1

[
  {
    "id": 1,
    "name": "department 1",
    "employees": []
  }
]

Abnormal situation

Connection refused

2018-04-29 13:09:15 ERROR [organization-service,,,] Failed to handle request [GET http://localhost:8092/flux/departments?orgId=1]
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /172.16.205.106:8091
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_151]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_151]
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
Caused by: java.net.ConnectException: Connection refused
    ... 10 common frames omitted

instance can not be null

2018-04-29 13:12:08 ERROR [organization-service,,,] Failed to handle request [GET http://localhost:8092/flux/departments?orgId=1]
java.lang.IllegalArgumentException: instance can not be null
    at org.springframework.util.Assert.notNull(Assert.java:193) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.reconstructURI(RibbonLoadBalancerClient.java:53) ~[spring-cloud-netflix-ribbon-2.0.0.RC1.jar:2.0.0.RC1]
    at org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerExchangeFilterFunction.filter(LoadBalancerExchangeFilterFunction.java:34) ~[spring-cloud-commons-2.0.0.RC1.jar:2.0.0.RC1]
    at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$andThen$1(ExchangeFilterFunction.java:56) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$apply$2(ExchangeFilterFunction.java:67) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:320) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.retrieve(DefaultWebClient.java:367) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at com.example.service.DepartmentService.getDepartmentsByOrgId(DepartmentService.java:24) ~[classes/:na]
    at com.example.controller.FluxDemoController.getDepartmentsById(FluxDemoController.java:23) ~[classes/:na]
    at sun.reflect.GeneratedMethodAccessor82.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
    at org.springframework.web.reactive.result.method.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:243) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:138) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:323) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoDefaultIfEmpty.subscribe(MonoDefaultIfEmpty.java:37) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:803) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:323) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:312) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:70) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]
    at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:381) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE]
    at reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:397) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]

Source code analysis

LoadBalancerExchangeFilterFunction

spring-cloud-commons-2.0.0.RC1-sources.jar! /org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerExchangeFilterFunction.java

public class LoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {

    private final LoadBalancerClient loadBalancerClient;

    public LoadBalancerExchangeFilterFunction(LoadBalancerClient loadBalancerClient) {
        this.loadBalancerClient = loadBalancerClient;
    }

    @Override
    public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
        URI originalUrl = request.url();
        String serviceId = originalUrl.getHost();
        Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUrl);
        //TODO: reactive lb client

        ServiceInstance instance = this.loadBalancerClient.choose(serviceId);
        URI uri = this.loadBalancerClient.reconstructURI(instance, originalUrl);
        ClientRequest newRequest = ClientRequest.method(request.method(), uri)
                .headers(headers -> headers.addAll(request.headers()))
                .cookies(cookies -> cookies.addAll(request.cookies()))
                .attributes(attributes -> attributes.putAll(request.attributes()))
                .body(request.body())
                .build();
        return next.exchange(newRequest);
    }

}

For webclient, LoadBalanceExchangeFilterFunction is used in this filterChain. It can be seen that the original request is wrapped in the filter method using LoadBalanceExchangeFilterFunction. Use the loadBalancerClient to discover the service according to the service ID, select the available service address, then replace the original uri, construct a new request and pass it to the next filter.

DefaultWebClientBuilder

spring-webflux-5.0.5.RELEASE-sources.jar! /org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java

class DefaultWebClientBuilder implements WebClient.Builder {

    @Nullable
    private List<ExchangeFilterFunction> filters;

    @Override
    public WebClient.Builder filter(ExchangeFilterFunction filter) {
        Assert.notNull(filter, "ExchangeFilterFunction must not be null");
        initFilters().add(filter);
        return this;
    }

    @Override
    public WebClient build() {
        ExchangeFunction exchange = initExchangeFunction();
        ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream()
                .reduce(ExchangeFilterFunction::andThen)
                .map(filter -> filter.apply(exchange))
                .orElse(exchange) : exchange);
        return new DefaultWebClient(filteredExchange, initUriBuilderFactory(),
                unmodifiableCopy(this.defaultHeaders), unmodifiableCopy(this.defaultCookies),
                new DefaultWebClientBuilder(this));
    }

It can be seen that the filter calling webClient will be added to filters, and then when building, an ExchangeFunction will be constructed by using ExchangeFilterFunction:: AnD then passed to the constructor of DefaultWebClient.

ExchangeFilterFunction

spring-webflux-5.0.5.RELEASE-sources.jar! /org/springframework/web/reactive/function/client/ExchangeFilterFunction.java

    /**
     * Apply this filter to the given request and exchange function.
     * <p>The given {@linkplain ExchangeFunction exchange function} represents the next entity
     * in the chain, and can be {@linkplain ExchangeFunction#exchange(ClientRequest) invoked}
     * in order to proceed to the exchange, or not invoked to block the chain.
     * @param request the request
     * @param next the next exchange function in the chain
     * @return the filtered response
     */
    Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);

    /**
     * Return a composed filter function that first applies this filter, and then applies the
     * {@code after} filter.
     * @param after the filter to apply after this filter is applied
     * @return a composed filter that first applies this function and then applies the
     * {@code after} function
     */
    default ExchangeFilterFunction andThen(ExchangeFilterFunction after) {
        Assert.notNull(after, "'after' must not be null");
        return (request, next) -> {
            ExchangeFunction nextExchange = exchangeRequest -> after.filter(exchangeRequest, next);
            return filter(request, nextExchange);
        };
    }

    /**
     * Apply this filter to the given exchange function, resulting in a filtered exchange function.
     * @param exchange the exchange function to filter
     * @return the filtered exchange function
     */
    default ExchangeFunction apply(ExchangeFunction exchange) {
        Assert.notNull(exchange, "'exchange' must not be null");
        return request -> this.filter(request, exchange);
    }

ExchangeFilterFunction adds a filter chain (Lambda inside is a functional method to implement filter, and finally converted to ExchangeFunction through apply.

Summary

WebClient’s loadBalanced support is more concise and clear than restTemplate. it directly uses filter mode, obtains the service address through loadBalancerClient, replaces uri and passes it to the next filter. If the loadBalancerClient cannot obtain the service address, the ribbonloadbalancerclient.reconstrucuri method will report an error java.lang.illegalargumentexception: instancecannotbe null. In addition, there may be a delay in the information of the registration center, and there may also be a Connection refused exception.

doc