How to Perform Asynchronous Operation after Database Transaction Commits Successfully

  async, spring, transaction

Problem

Business scenario

There are often some edge operations on business requirements, such as main process operation A: users sign up for courses for warehousing, and edge operation B: sending emails or short message notifications.

Business requirements

  • After operation A fails to operate the database and the transaction rolls back, operation B cannot be executed.

  • After operation a is executed successfully, operation b must also be executed successfully.

How to implement

  • The normal execution of A, followed by execution of B, is an operation that can meet the requirement 1, and usually requires design compensation for the requirement 2.

  • General edge operations are usually set to asynchronous to improve performance. For example, MQ is sent. The business system is responsible for successful message sending after successful transaction, and then the receiving system is responsible for ensuring successful notification completion.

The content of this article

How to perform asynchronous operations after a spring transaction is committed? These asynchronous operations must be performed after the transaction is successfully committed, and rollback is not performed.

essential

  • How to operate after a spring transaction is committed

  • How to Asynchronous Operation

Implementation plan

Use Transactional SynchronizationManager to operate after a transaction is committed

public void insert(TechBook techBook){
        bookMapper.insert(techBook);
       // send after tx commit but is async
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                System.out.println("send email after transaction commit...");
            }
        }
       );
        ThreadLocalRandom random = ThreadLocalRandom.current();
        if(random.nextInt() % 2 ==0){
            throw new RuntimeException("test email transaction");
        }
        System.out.println("service end");
    }

The method can realize the operation after the transaction is committed

Asynchronous operation

Use mq or thread pool for asynchronization, such as thread pool:

private final ExecutorService executorService = Executors.newFixedThreadPool(5);
    public void insert(TechBook techBook){
        bookMapper.insert(techBook);
 
//        send after tx commit but is async
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("send email after transaction commit...");
                        try {
                            Thread.sleep(10*1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("complete send email after transaction commit...");
                    }
                });
            }
        }
        );
 
//        async work but tx not work, execute even when tx is rollback
//        asyncService.executeAfterTxComplete();
 
        ThreadLocalRandom random = ThreadLocalRandom.current();
        if(random.nextInt() % 2 ==0){
            throw new RuntimeException("test email transaction");
        }
        System.out.println("service end");
    }

Package the above two steps

For the second step, if there are many such methods, they are too repetitive to be written. Therefore, they are abstracted as follows:
This place has been renovated.azagorneanuThe code for:

public interface AfterCommitExecutor extends Executor {
}
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
@Component
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);
    private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
    private ExecutorService threadPool = Executors.newFixedThreadPool(5);
 
    @Override
    public void execute(Runnable runnable) {
        LOGGER.info("Submitting new runnable {} to run after commit", runnable);
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
            runnable.run();
            return;
        }
        List<Runnable> threadRunnables = RUNNABLES.get();
        if (threadRunnables == null) {
            threadRunnables = new ArrayList<Runnable>();
            RUNNABLES.set(threadRunnables);
            TransactionSynchronizationManager.registerSynchronization(this);
        }
        threadRunnables.add(runnable);
    }
 
    @Override
    public void afterCommit() {
        List<Runnable> threadRunnables = RUNNABLES.get();
        LOGGER.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
        for (int i = 0; i < threadRunnables.size(); i++) {
            Runnable runnable = threadRunnables.get(i);
            LOGGER.info("Executing runnable {}", runnable);
            try {
                threadPool.execute(runnable);
            } catch (RuntimeException e) {
                LOGGER.error("Failed to execute runnable " + runnable, e);
            }
        }
    }
 
    @Override
    public void afterCompletion(int status) {
        LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
        RUNNABLES.remove();
    }
 
}
public void insert(TechBook techBook){
        bookMapper.insert(techBook);
 
//        send after tx commit but is async
//        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
//            @Override
//            public void afterCommit() {
//                executorService.submit(new Runnable() {
//                    @Override
//                    public void run() {
//                        System.out.println("send email after transaction commit...");
//                        try {
//                            Thread.sleep(10*1000);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }
//                        System.out.println("complete send email after transaction commit...");
//                    }
//                });
//            }
//        }
//        );
 
        //send after tx commit and is async
        afterCommitExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("send email after transactioin commit");
            }
        });
 
//        async work but tx not work, execute even when tx is rollback
//        asyncService.executeAfterTxComplete();
 
        ThreadLocalRandom random = ThreadLocalRandom.current();
        if(random.nextInt() % 2 ==0){
            throw new RuntimeException("test email transaction");
        }
        System.out.println("service end");
    }

Async on Spring

Spring provides @Async annotation by default to facilitate application to use thread pool for asynchronization. The thread pool can be used by the whole app, and only one @Async annotation is needed on the method, thus omitting the repeated submit operation. Some Points to Pay Attention to About async:

1. Configuration of async

<context:component-scan base-package="com.yami" />
   <!--配置@Async注解使用的线程池,这里的id随便命名,最后在task:annotation-driven executor= 指定上就可以-->
    <task:executor id="myExecutor" pool-size="5"/>
    <task:annotation-driven executor="myExecutor" />

This must be configured in the root context, and the web context cannot scan annotations outside the controller layer, otherwise it will be overwritten.

<context:component-scan base-package="com.yami.web.controller"/>
<mvc:annotation-driven/>

2. async Call Problem

The call of async method cannot be called from within the same kind of method, otherwise the interception will not take effect. This is spring’s default interception problem. An annotation method with async in another class must be called in another class to achieve asynchronous effect.

3. Affairs

If the async method starts a transaction, it should pay attention to transaction propagation and transaction overhead. Moreover, the use of Transactional Synchronization Manager. RegisterSynchronization as above in the async method does not work and is worth noting.