From ed75ce633b9e416e5fee02591d50b203f0a7989e Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Sun, 26 May 2024 15:31:50 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D=E5=8A=A1-?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E5=90=8C=E6=97=B6=E6=BF=80=E6=B4=BB=E5=A4=9A?= =?UTF-8?q?=E4=B8=AA=E6=AD=A5=E9=AA=A4=E6=97=B6=EF=BC=8C=E6=AD=A5=E9=AA=A4?= =?UTF-8?q?=E4=B9=8B=E9=97=B4=E6=8C=89=E6=AD=A3=E5=90=91=E8=BF=9E=E7=BA=BF?= =?UTF-8?q?=E5=85=88=E5=90=8E=E9=A1=BA=E5=BA=8F=E6=89=A7=E8=A1=8C=E6=BF=80?= =?UTF-8?q?=E6=B4=BB=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1165179927101440]IT服务-流程同时激活多个步骤时,步骤之间按正向连线先后顺序执行激活操作 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1165179927101440 --- .../thread/NeatLogicThread.java | 15 ++++ .../threadpool/CachedThreadPool.java | 5 ++ .../transaction/core/AfterTransactionJob.java | 75 ++++++++++++++++++- 3 files changed, 94 insertions(+), 1 deletion(-) diff --git a/src/main/java/neatlogic/framework/asynchronization/thread/NeatLogicThread.java b/src/main/java/neatlogic/framework/asynchronization/thread/NeatLogicThread.java index 71a6e07df..12fc31465 100644 --- a/src/main/java/neatlogic/framework/asynchronization/thread/NeatLogicThread.java +++ b/src/main/java/neatlogic/framework/asynchronization/thread/NeatLogicThread.java @@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CountDownLatch; + public abstract class NeatLogicThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(NeatLogicThread.class); protected UserContext userContext; @@ -34,6 +36,8 @@ public abstract class NeatLogicThread implements Runnable { private String threadName; private boolean isUnique = false; + private CountDownLatch countDownLatch; + /*public NeatLogicThread() { userContext = UserContext.get(); tenantContext = TenantContext.get(); @@ -84,6 +88,9 @@ public abstract class NeatLogicThread implements Runnable { } catch (Exception ex) { logger.error(ex.getMessage(), ex); } finally { + if (countDownLatch != null) { + countDownLatch.countDown(); + } // 清除所有threadlocal if (TenantContext.get() != null) { TenantContext.get().release(); @@ -118,4 +125,12 @@ public abstract class NeatLogicThread implements Runnable { public String getThreadName() { return threadName; } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } } diff --git a/src/main/java/neatlogic/framework/asynchronization/threadpool/CachedThreadPool.java b/src/main/java/neatlogic/framework/asynchronization/threadpool/CachedThreadPool.java index 4485a267e..9ce62b791 100644 --- a/src/main/java/neatlogic/framework/asynchronization/threadpool/CachedThreadPool.java +++ b/src/main/java/neatlogic/framework/asynchronization/threadpool/CachedThreadPool.java @@ -40,6 +40,11 @@ public class CachedThreadPool { 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueLen), new ThreadPoolExecutor.AbortPolicy()); + public static void execute(NeatLogicThread command, CountDownLatch countDownLatch) { + command.setCountDownLatch(countDownLatch); + execute(command); + } + public static void execute(NeatLogicThread command) { try { boolean isExists = false; diff --git a/src/main/java/neatlogic/framework/transaction/core/AfterTransactionJob.java b/src/main/java/neatlogic/framework/transaction/core/AfterTransactionJob.java index 9e734fa5c..fdd8de12d 100644 --- a/src/main/java/neatlogic/framework/transaction/core/AfterTransactionJob.java +++ b/src/main/java/neatlogic/framework/transaction/core/AfterTransactionJob.java @@ -17,20 +17,27 @@ package neatlogic.framework.transaction.core; import neatlogic.framework.asynchronization.thread.NeatLogicThread; import neatlogic.framework.asynchronization.threadpool.CachedThreadPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class AfterTransactionJob { public enum EVENT { COMMITTED(), COMPLETED() } - + private final static Logger logger = LoggerFactory.getLogger(AfterTransactionJob.class); private final ThreadLocal> THREADLOCAL = new ThreadLocal<>(); private final ThreadLocal> T_THREADLOCAL = new ThreadLocal<>(); + private final ThreadLocal> LIST_THREADLOCAL = new ThreadLocal<>(); private final String threadName; public AfterTransactionJob(String _threadName) { @@ -120,6 +127,72 @@ public class AfterTransactionJob { } } + /** + * 事务提交后按顺序执行线程任务 + * + * @param t 线程任务 + */ + public void executeInOrder(NeatLogicThread t) { + executeInOrder(t, EVENT.COMMITTED); + } + + /** + * 事务提交或完成后按顺序执行线程任务 + * + * @param t 线程任务 + * @param event 事务时间,提交或完成 + */ + public void executeInOrder(NeatLogicThread t, EVENT event) { + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + CachedThreadPool.execute(t); + } else { + List tList = LIST_THREADLOCAL.get(); + if (tList == null) { + tList = new ArrayList<>(); + LIST_THREADLOCAL.set(tList); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + if (event == EVENT.COMMITTED) { + List tList = LIST_THREADLOCAL.get(); + for (int i = 0; i < tList.size(); i++) { + NeatLogicThread t = tList.get(i); + CountDownLatch latch = new CountDownLatch(1); + CachedThreadPool.execute(t, latch); + if (i < tList.size() - 1) { + try { + boolean flag = latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + } + } + } + + @Override + public void afterCompletion(int status) { + if (event == EVENT.COMPLETED) { + List tList = LIST_THREADLOCAL.get(); + for (int i = 0; i < tList.size(); i++) { + NeatLogicThread t = tList.get(i); + CountDownLatch latch = new CountDownLatch(1); + CachedThreadPool.execute(t, latch); + if (i < tList.size() - 1) { + try { + boolean flag = latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + } + } + } + }); + } + tList.add(t); + } + } /* * @Description: -- Gitee