Execution thread of CompletableFuture

  java

The thread pool used by default

Common pool () is used by default when executor is not passed.

IntStream.range(0, 15).parallel().forEach(i -> {
            System.out.println(Thread.currentThread());
        });

Output

Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
  • commonPool

This pool is statically constructed; its run state is unaffected by attempts to shutdown() or shutdownNow() . However this pool and any ongoing processing are automatically terminated upon program System.exit(int) . Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

ThenRun test case 1

Do not set executor

    @Test
    public void testRunOnCommonPool() throws InterruptedException {

        CompletionStage<Void> futurePrice = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("test1:1 - runAsync(runnable), job thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
                }
        );

        System.out.println("test1:flag1");

        futurePrice.thenRun(() -> {
            System.out.println("test1:2 - thenRun(runnable)), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        });

        System.out.println("test1:flag2");

        futurePrice.thenRunAsync(() -> {
            System.out.println("test1:3 - thenRunAsync(runnable), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        });

        TimeUnit.SECONDS.sleep(100);

    }

Output

test1:flag1
test1:flag2
test1:1 - runAsync(runnable), job thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:2 - thenRun(runnable)), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:3 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]

Set executor

    @Test
    public void testRunOnExecutors() throws InterruptedException {
        CompletionStage<Void> futurePrice = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("test2:1 - runAsync(runnable, executor), job thread: " + Thread.currentThread());
            //Thread[pool-1-thread-1,5,main]
        }, executor);

        System.out.println("test2:flag1");

        futurePrice.thenRunAsync(() -> {
            System.out.println("test2:2 - thenRunAsync(runnable), action thread: " + Thread.currentThread());
            //Thread[pool-1-thread-1,5,main]
        });

        System.out.println("test2:flag2");

        futurePrice.thenRun(() -> {
            System.out.println("test2:3 - thenRun(runnable), action thread: " + Thread.currentThread());
            //Thread[pool-1-thread-2,5,main]
        });

        futurePrice.thenRunAsync(() -> {
            System.out.println("test2:4 - thenRunAsync(runnable, executor), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        }, executor);

        TimeUnit.SECONDS.sleep(100);
    }

Output

test2:flag1
test2:flag2
test2:1 - runAsync(runnable, executor), job thread: Thread[pool-1-thread-1,5,main]
test2:3 - thenRun(runnable), action thread: Thread[pool-1-thread-1,5,main]
test2:4 - thenRunAsync(runnable, executor), action thread: Thread[pool-1-thread-2,5,main]
test2:2 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]

ThenRun test case 2

No sleep

@Test
    public void testThenRun(){
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("f1 thread:"+Thread.currentThread().getName());
            return "zero";
        }, executor);
        f1.thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("then run thread:"+Thread.currentThread().getName());
                System.out.println("finished");
            }
        });
        TimeUnit.SECONDS.sleep(10);
    }
  • Use executor’s Output

f1 thread:pool-1-thread-1
then run thread:main
finished
  • Output without executor

f1 thread:ForkJoinPool.commonPool-worker-1
then run thread:main
finished

Plus sleep

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("f1 thread:"+Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "zero";
        }, executor);
        f1.thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("then run thread:"+Thread.currentThread().getName());
                System.out.println("finished");
            }
        });
        TimeUnit.SECONDS.sleep(10);
  • Use executor’s Output

f1 thread:pool-1-thread-1
then run thread:pool-1-thread-1
finished
  • Output without executor

f1 thread:ForkJoinPool.commonPool-worker-1
then run thread:ForkJoinPool.commonPool-worker-1
finished

Summary

The thenRun () method without async is still an asynchronous method, which may be a thread using main, commonPool, or executor.

doc