diff --git a/src/main/java/neatlogic/framework/asynchronization/thread/NeatLogicThread.java b/src/main/java/neatlogic/framework/asynchronization/thread/NeatLogicThread.java index 71a6e07df63b758d96b5158e741d41a7ac7b1111..12fc314656ae946272ccb91a123cbf27901e1cdd 100644 --- a/src/main/java/neatlogic/framework/asynchronization/thread/NeatLogicThread.java +++ b/src/main/java/neatlogic/framework/asynchronization/thread/NeatLogicThread.java @@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; + public abstract class NeatLogicThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(NeatLogicThread.class); protected UserContext userContext; @@ -34,6 +36,8 @@ public abstract class NeatLogicThread implements Runnable { private String threadName; private boolean isUnique = false; + private CountDownLatch countDownLatch; + /*public NeatLogicThread() { userContext = UserContext.get(); tenantContext = TenantContext.get(); @@ -84,6 +88,9 @@ public abstract class NeatLogicThread implements Runnable { } catch (Exception ex) { logger.error(ex.getMessage(), ex); } finally { + if (countDownLatch != null) { + countDownLatch.countDown(); + } // 清除所有threadlocal if (TenantContext.get() != null) { TenantContext.get().release(); @@ -118,4 +125,12 @@ public abstract class NeatLogicThread implements Runnable { public String getThreadName() { return threadName; } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } } diff --git a/src/main/java/neatlogic/framework/asynchronization/threadpool/CachedThreadPool.java b/src/main/java/neatlogic/framework/asynchronization/threadpool/CachedThreadPool.java index 4485a267ea87bef98cb512f419299d934022d016..9ce62b791284b44ba6cf8ab10baf37ad1452fac9 100644 --- a/src/main/java/neatlogic/framework/asynchronization/threadpool/CachedThreadPool.java +++ b/src/main/java/neatlogic/framework/asynchronization/threadpool/CachedThreadPool.java @@ -40,6 +40,11 @@ public class CachedThreadPool { 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueLen), new ThreadPoolExecutor.AbortPolicy()); + public static void execute(NeatLogicThread command, CountDownLatch countDownLatch) { + command.setCountDownLatch(countDownLatch); + execute(command); + } + public static void execute(NeatLogicThread command) { try { boolean isExists = false; diff --git a/src/main/java/neatlogic/framework/transaction/core/AfterTransactionJob.java b/src/main/java/neatlogic/framework/transaction/core/AfterTransactionJob.java index 9e734fa5cd20330fabeb4b3ca2d38209737ccc03..fdd8de12d8cb53abb9766e467bb541bf5709c28f 100644 --- a/src/main/java/neatlogic/framework/transaction/core/AfterTransactionJob.java +++ b/src/main/java/neatlogic/framework/transaction/core/AfterTransactionJob.java @@ -17,20 +17,27 @@ package neatlogic.framework.transaction.core; import neatlogic.framework.asynchronization.thread.NeatLogicThread; import neatlogic.framework.asynchronization.threadpool.CachedThreadPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class AfterTransactionJob { public enum EVENT { COMMITTED(), COMPLETED() } - + private final static Logger logger = LoggerFactory.getLogger(AfterTransactionJob.class); private final ThreadLocal> THREADLOCAL = new ThreadLocal<>(); private final ThreadLocal> T_THREADLOCAL = new ThreadLocal<>(); + private final ThreadLocal> LIST_THREADLOCAL = new ThreadLocal<>(); private final String threadName; public AfterTransactionJob(String _threadName) { @@ -120,6 +127,72 @@ public class AfterTransactionJob { } } + /** + * 事务提交后按顺序执行线程任务 + * + * @param t 线程任务 + */ + public void executeInOrder(NeatLogicThread t) { + executeInOrder(t, EVENT.COMMITTED); + } + + /** + * 事务提交或完成后按顺序执行线程任务 + * + * @param t 线程任务 + * @param event 事务时间,提交或完成 + */ + public void executeInOrder(NeatLogicThread t, EVENT event) { + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + CachedThreadPool.execute(t); + } else { + List tList = LIST_THREADLOCAL.get(); + if (tList == null) { + tList = new ArrayList<>(); + LIST_THREADLOCAL.set(tList); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + if (event == EVENT.COMMITTED) { + List tList = LIST_THREADLOCAL.get(); + for (int i = 0; i < tList.size(); i++) { + NeatLogicThread t = tList.get(i); + CountDownLatch latch = new CountDownLatch(1); + CachedThreadPool.execute(t, latch); + if (i < tList.size() - 1) { + try { + boolean flag = latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + } + } + } + + @Override + public void afterCompletion(int status) { + if (event == EVENT.COMPLETED) { + List tList = LIST_THREADLOCAL.get(); + for (int i = 0; i < tList.size(); i++) { + NeatLogicThread t = tList.get(i); + CountDownLatch latch = new CountDownLatch(1); + CachedThreadPool.execute(t, latch); + if (i < tList.size() - 1) { + try { + boolean flag = latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + } + } + } + }); + } + tList.add(t); + } + } /* * @Description: