Java-异步多线程专题-AfterCommitExecutor-事务提交后异步执行
AfterCommitExecutor.java
ackage cn.jiangjiesheng.edu.core.executor;
import java.util.concurrent.Executor;
/**
*
* 一定要有事务才会异步执行,否则会立即执行
* if (!TransactionSynchronizationManager.isSynchronizationActive()) {
* LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
* runnable.run();
* return;
* }
* 刷数据场景下,如果有取threadlocal中数据的话,需要提前set好
* 异步线程中的异常不会抛出,所以手动catch,加日志再抛出
*
* 对于需要在提交事务后异步更新的方法,
* 可以同时在当前方法加@Transactional(timeout = 2),来触发异步,
* 如果不需要关注提交事务后,可以使用@Async(需要@EnableAsync开启)
* 无论那种方法,都要注意有没有使用到ThreadLocal,或者注意
* ttl(TransmittableThreadLocal,可靠) 还是itl(InheritableThreadLocal)
* jiangjiesheng 210513
*/
public interface AfterCommitExecutor extends Executor {
}
--------------------------------------------------------------------------------
AfterCommitExecutorImpl.java
/*
* Copyright (c) 2018. Lorem ipsum dolor sit amet, consectetur adipiscing elit.
* Morbi non lorem porttitor neque feugiat blandit. Ut vitae ipsum eget quam lacinia accumsan.
* Etiam sed turpis ac ipsum condimentum fringilla. Maecenas magna.
* Proin dapibus sapien vel ante. Aliquam erat volutpat. Pellentesque sagittis ligula eget metus.
* Vestibulum commodo. Ut rhoncus gravida arcu.
*/
package cn.jiangjiesheng.edu.core.executor;
import com.alibaba.ttl.TtlRunnable;
import cn.jiangjiesheng.edu.core.AyStatic;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.NamedThreadLocal;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {
//private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);
private static final ThreadLocal<List<Runnable>> RUNNABLES = new NamedThreadLocal<>("AfterCommitRunnable");
private ExecutorService threadPool;
@Value("${aftercommit.thread}")
private int threadCount;
@Override
public void execute(Runnable runnable) {
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
log.warn("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
runnable = TtlRunnable.get(runnable);
threadRunnables.add(runnable);
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
for (Runnable runnable : threadRunnables) {
try {
threadPool.execute(runnable);
} catch (RuntimeException e) {
log.error("异步任务执行runnable出现异常:" + runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
RUNNABLES.remove();
}
@PostConstruct
public void init() {
threadPool = Executors.newFixedThreadPool(threadCount);
}
@PreDestroy
public void shutDown() {
threadPool.shutdown();
log.info("异步任务线程池关闭");
}
}
--------------------------------------------------------------------------------
AfterCommitConfig.java
package cn.jiangjiesheng.edu.config;
import cn.jiangjiesheng.edu.core.executor.AfterCommitExecutor;
import cn.jiangjiesheng.edu.core.executor.AfterCommitExecutorImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AfterCommitConfig {
@Bean
public AfterCommitExecutor afterCommitExecutor() {
return new AfterCommitExecutorImpl();
}
}
--------------------------------------------------------------------------------
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.6.1</version>
</dependency>