Talk about Variable Delivery of reactor Asynchronous Thread

  reactor

Order

This paper mainly studies variable transfer of reactor asynchronous thread

The problem with threadlocal

In the traditional request/response synchronization mode, it is very convenient to use threadlocal to pass context variables, which can save adding common variables to each method parameter, such as the currently logged-in user. However, the business method may use async or execute asynchronously in other thread pools, at which time the function of threadlocal becomes invalid.

The solution at this time is to adopt the propagation mode, that is, to propagate this variable at the juncture of synchronous threads and asynchronous threads.

TaskDecorator

Spring, for example, provides TaskDecorator. By implementing this interface, you can control and propagate those variables yourself. For example:

class MdcTaskDecorator implements TaskDecorator {
 
  @Override
  public Runnable decorate(Runnable runnable) {
    // Right now: Web thread context !
    // (Grab the current thread MDC data)
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
      try {
        // Right now: @Async thread context !
        // (Restore the Web thread context's MDC data)
        MDC.setContextMap(contextMap);
        runnable.run();
      } finally {
        MDC.clear();
      }
    };
  }
}

Note here that in finally clear

Configure this taskDecorator.

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
 
  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setTaskDecorator(new MdcTaskDecorator());
    executor.initialize();
    return executor;
  }

}

For a complete example, seeSpring 4.3: Using a TaskDecorator to copy MDC data to @Async threads

Reactor Context

Spring5 introduces webflux, the bottom layer of which is based on reactor, so how does reactor spread context variables? Officials have provided a Context object to replace threadlocal.

Its characteristics are as follows:

  • Kv operations like map, such as put (objectkey, objectvalue), putall (context), haskey (objectkey)
  • Immutable, that is, the same key, which will not be overwritten by put
  • Provides getOrDefault, getOrEmpty methods
  • Context is bound to each Subscriber on the action chain.
  • Access through the subscriberContext(Context)
  • The function of Context is from bottom to top.

Example

Setting and reading

    @Test
    public void testSubscriberContext(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World")
                .verifyComplete();
    }

Here, set the message value to World from the bottommost subscriberContext, and then access it through the subscriberContext in flatMap.

From bottom to top

    @Test
    public void testContextSequence(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                //NOTE 这个subscriberContext设置的太高了
                .subscriberContext(ctx -> ctx.put(key, "World"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));

        StepVerifier.create(r)
                .expectNext("Hello Stranger")
                .verifyComplete();
    }

Because the subscriberContext of this example is set too high, it cannot be applied to Mono.subscriberContext () in flatMap.

Invariable

    @Test
    public void testContextImmutable(){
        String key = "message";

        Mono<String> r = Mono.subscriberContext()
                .map( ctx -> ctx.put(key, "Hello"))
                //这里返回了一个新的,因此上面的设置失效了
                .flatMap( ctx -> Mono.subscriberContext())
                .map( ctx -> ctx.getOrDefault(key,"Default"));

        StepVerifier.create(r)
                .expectNext("Default")
                .verifyComplete();
    }

SubscriberContext always returns a new

Multiple consecutive subscriberContext

    @Test
    public void testReadOrder(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor")
                .verifyComplete();
    }

Operator will only read the nearest context

SubscriberContext between flatMap

    @Test
    public void testContextBetweenFlatMap(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor World")
                .verifyComplete();
    }

FlatMap reads the context closest to it.

SubscriberContext in flatMap

    @Test
    public void testContextInFlatMap(){
        String key = "message";
        Mono<String> r =
                Mono.just("Hello")
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                        )
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                        )
                        .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World Reactor")
                .verifyComplete();
    }

Here the first flagmap cannot read the context inside the second flagmap.

Summary

Reactor implements functions similar to thread threadlocal by providing Context, which is very powerful and worthy of careful consideration.

doc