diff --git a/src/main/java/neatlogic/framework/process/stephandler/core/ProcessStepHandlerBase.java b/src/main/java/neatlogic/framework/process/stephandler/core/ProcessStepHandlerBase.java index 445a8dcc4ce9a9751f0b9db415bf7b9acb48e9e5..b578f34b7b021b6843f61b5073d1dd9d50033c13 100644 --- a/src/main/java/neatlogic/framework/process/stephandler/core/ProcessStepHandlerBase.java +++ b/src/main/java/neatlogic/framework/process/stephandler/core/ProcessStepHandlerBase.java @@ -20,7 +20,6 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONPath; import neatlogic.framework.asynchronization.threadlocal.UserContext; -import neatlogic.framework.asynchronization.threadpool.TransactionSynchronizationPool; import neatlogic.framework.common.constvalue.GroupSearch; import neatlogic.framework.common.constvalue.SystemUser; import neatlogic.framework.crossover.CrossoverServiceFactory; @@ -59,6 +58,7 @@ import neatlogic.framework.process.processtaskserialnumberpolicy.core.ProcessTas import neatlogic.framework.process.workerpolicy.core.IWorkerPolicyHandler; import neatlogic.framework.process.workerpolicy.core.WorkerPolicyHandlerFactory; import neatlogic.framework.service.AuthenticationInfoService; +import neatlogic.framework.transaction.core.AfterTransactionJob; import neatlogic.framework.worktime.dao.mapper.WorktimeMapper; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; @@ -191,8 +191,6 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { @Override public final int active(ProcessTaskStepVo currentProcessTaskStepVo) { - long beforeTime = System.currentTimeMillis(); - logger.warn("开始 active 步骤:" + currentProcessTaskStepVo.getName() + ", time:" + beforeTime); IProcessStepHandlerCrossoverUtil processStepHandlerCrossoverUtil = CrossoverServiceFactory.getApi(IProcessStepHandlerCrossoverUtil.class); try { IProcessTaskCrossoverMapper processTaskCrossoverMapper = CrossoverServiceFactory.getApi(IProcessTaskCrossoverMapper.class); @@ -302,9 +300,6 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { } }); } - logger.warn("结束激活步骤: " + currentProcessTaskStepVo.getName() + ", time: " + (System.currentTimeMillis() - beforeTime)); - // deleteProcessTaskStepInOperationByProcessTaskStepId(currentProcessTaskStepVo.getId(), - // ProcessTaskOperationType.ACTIVE); } return 1; } @@ -798,8 +793,6 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { @Override public final int complete(ProcessTaskStepVo currentProcessTaskStepVo) { - long beforeTime = System.currentTimeMillis(); - logger.warn("开始 complete 步骤: " + currentProcessTaskStepVo.getName() + ", time: " + beforeTime); IProcessStepHandlerCrossoverUtil processStepHandlerCrossoverUtil = CrossoverServiceFactory.getApi(IProcessStepHandlerCrossoverUtil.class); IProcessTaskCrossoverMapper processTaskCrossoverMapper = CrossoverServiceFactory.getApi(IProcessTaskCrossoverMapper.class); /* 锁定当前流程 **/ @@ -912,6 +905,7 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { } //将不流转的步骤的正向输入连线的isHit设置为-1 identifyPostInvalidStepRelIsHit(currentProcessTaskStepVo.getProcessTaskId(), currentProcessTaskStepVo.getId(), nextStepIdSet); + List processTaskStepThreadList = new ArrayList<>(); List nextStepIdList = new ArrayList<>(nextStepIdSet); List nextStepList = processTaskCrossoverMapper.getProcessTaskStepListByIdList(nextStepIdList); for (ProcessTaskStepVo nextStep : nextStepList) { @@ -925,14 +919,25 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { nextStep.setFromProcessTaskStepId(currentProcessTaskStepVo.getId()); nextStep.setStartProcessTaskStepId(currentProcessTaskStepVo.getStartProcessTaskStepId()); nextStep.setParallelActivateStepIdList(nextStepIdList); - doNext(ProcessTaskOperationType.STEP_ACTIVE, new ProcessStepThread(nextStep) { + ProcessTaskStepThread thread = new ProcessTaskStepThread(ProcessTaskOperationType.STEP_ACTIVE, nextStep, nextStepHandler.getMode()) { @Override - public void myExecute() { - nextStepHandler.active(nextStep); + protected void myExecute(ProcessTaskStepVo processTaskStepVo) { + IProcessStepHandler processStepHandler = ProcessStepHandlerFactory.getHandler(processTaskStepVo.getHandler()); + if (processStepHandler != null) { + processStepHandler.active(processTaskStepVo); + } } - }); + }; + processTaskStepThreadList.add(thread); +// doNext(ProcessTaskOperationType.STEP_ACTIVE, new ProcessStepThread(nextStep) { +// @Override +// public void myExecute() { +// nextStepHandler.active(nextStep); +// } +// }); } } + doNext(processTaskStepThreadList); /* 触发通知 **/ processStepHandlerCrossoverUtil.notify(currentProcessTaskStepVo, notifyTriggerType); @@ -978,7 +983,6 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { /* 计算SLA **/ processStepHandlerCrossoverUtil.calculateSla(new ProcessTaskVo(currentProcessTaskStepVo.getProcessTaskId())); } - logger.warn("结束 complete 步骤: " + currentProcessTaskStepVo.getName() + ", time: " + (System.currentTimeMillis() - beforeTime)); } } @@ -2015,8 +2019,6 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { @Override public final int startProcess(ProcessTaskStepVo currentProcessTaskStepVo) { - long beforeTime = System.currentTimeMillis(); - logger.warn("开始 startProcess 步骤: " + currentProcessTaskStepVo.getName() + ", time: " + beforeTime); IProcessStepHandlerCrossoverUtil processStepHandlerCrossoverUtil = CrossoverServiceFactory.getApi(IProcessStepHandlerCrossoverUtil.class); IProcessTaskCrossoverMapper processTaskCrossoverMapper = CrossoverServiceFactory.getApi(IProcessTaskCrossoverMapper.class); // 锁定当前流程 @@ -2064,6 +2066,7 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { updateProcessTaskStepStatus(currentProcessTaskStepVo); /* 流转到下一步 **/ + List processTaskStepThreadList = new ArrayList<>(); Set nextStepIdSet = getNext(currentProcessTaskStepVo); List nextStepList = processTaskCrossoverMapper.getProcessTaskStepListByIdList(new ArrayList<>(nextStepIdSet)); for (ProcessTaskStepVo nextStep : nextStepList) { @@ -2076,14 +2079,25 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { processTaskCrossoverMapper.updateProcessTaskStepRelIsHit(processTaskStepRelVo); nextStep.setFromProcessTaskStepId(currentProcessTaskStepVo.getId()); nextStep.setStartProcessTaskStepId(currentProcessTaskStepVo.getStartProcessTaskStepId()); - doNext(ProcessTaskOperationType.STEP_ACTIVE, new ProcessStepThread(nextStep) { + ProcessTaskStepThread thread = new ProcessTaskStepThread(ProcessTaskOperationType.STEP_ACTIVE, nextStep, nextStepHandler.getMode()) { @Override - public void myExecute() { - nextStepHandler.active(nextStep); + protected void myExecute(ProcessTaskStepVo processTaskStepVo) { + IProcessStepHandler processStepHandler = ProcessStepHandlerFactory.getHandler(processTaskStepVo.getHandler()); + if (processStepHandler != null) { + processStepHandler.active(processTaskStepVo); + } } - }); + }; + processTaskStepThreadList.add(thread); +// doNext(ProcessTaskOperationType.STEP_ACTIVE, new ProcessStepThread(nextStep) { +// @Override +// public void myExecute() { +// nextStepHandler.active(nextStep); +// } +// }); } } + doNext(processTaskStepThreadList); /* 写入时间审计 **/ processStepHandlerCrossoverUtil.timeAudit(currentProcessTaskStepVo, ProcessTaskOperationType.STEP_ACTIVE); processStepHandlerCrossoverUtil.timeAudit(currentProcessTaskStepVo, ProcessTaskOperationType.STEP_START); @@ -2140,7 +2154,6 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { } else { processStepHandlerCrossoverUtil.audit(currentProcessTaskStepVo, ProcessTaskAuditType.STARTPROCESS); } - logger.warn("结束startProcess 步骤: " + currentProcessTaskStepVo.getName() + ", time: " + (System.currentTimeMillis() - beforeTime)); } return 0; } @@ -2423,24 +2436,38 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { // } // } //// } + List processTaskStepThreadList = new ArrayList<>(); IProcessTaskCrossoverMapper processTaskCrossoverMapper = CrossoverServiceFactory.getApi(IProcessTaskCrossoverMapper.class); List toStepIdList = processTaskCrossoverMapper.getToProcessTaskStepIdListByFromIdAndType(currentProcessTaskStepVo.getId(), ProcessFlowDirection.FORWARD.getValue()); - for (Long toStepId : toStepIdList) { - ProcessTaskStepVo toStepVo = processTaskCrossoverMapper.getProcessTaskStepBaseInfoById(toStepId); - if (toStepVo != null && !Objects.equals(toStepVo.getIsActive(), 0)) { - IProcessStepHandler handler = ProcessStepHandlerFactory.getHandler(toStepVo.getHandler()); - if (handler != null) { - toStepVo.setStartProcessTaskStepId(currentProcessTaskStepVo.getStartProcessTaskStepId()); - toStepVo.setFromProcessTaskStepId(currentProcessTaskStepVo.getId()); - doNext(ProcessTaskOperationType.STEP_HANG, new ProcessStepThread(toStepVo) { - @Override - protected void myExecute() { - handler.hang(toStepVo); - } - }); + if (CollectionUtils.isNotEmpty(toStepIdList)) { + List toStepList = processTaskCrossoverMapper.getProcessTaskStepListByIdList(toStepIdList); + for (ProcessTaskStepVo toStepVo : toStepList) { + if (!Objects.equals(toStepVo.getIsActive(), 0)) { + IProcessStepHandler handler = ProcessStepHandlerFactory.getHandler(toStepVo.getHandler()); + if (handler != null) { + toStepVo.setStartProcessTaskStepId(currentProcessTaskStepVo.getStartProcessTaskStepId()); + toStepVo.setFromProcessTaskStepId(currentProcessTaskStepVo.getId()); + ProcessTaskStepThread thread = new ProcessTaskStepThread(ProcessTaskOperationType.STEP_HANG, toStepVo, handler.getMode()) { + @Override + protected void myExecute(ProcessTaskStepVo processTaskStepVo) { + IProcessStepHandler processStepHandler = ProcessStepHandlerFactory.getHandler(processTaskStepVo.getHandler()); + if (processStepHandler != null) { + processStepHandler.hang(processTaskStepVo); + } + } + }; + processTaskStepThreadList.add(thread); +// doNext(ProcessTaskOperationType.STEP_HANG, new ProcessStepThread(toStepVo) { +// @Override +// protected void myExecute() { +// handler.hang(toStepVo); +// } +// }); + } } } } + doNext(processTaskStepThreadList); } // private void resetConvergeInfo(ProcessTaskStepVo nextStepVo) { @@ -2600,6 +2627,33 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { return nextStepIdSet; } + protected synchronized static void doNext(List processTaskStepThreadList) { + if (processTaskStepThreadList.size() > 1) { + IProcessTaskCrossoverMapper processTaskCrossoverMapper = CrossoverServiceFactory.getApi(IProcessTaskCrossoverMapper.class); + List processTaskStepRelList = processTaskCrossoverMapper.getProcessTaskStepRelByProcessTaskId(processTaskStepThreadList.get(0).getProcessTaskId()); + List stepList = processTaskCrossoverMapper.getProcessTaskStepByProcessTaskIdAndType(processTaskStepThreadList.get(0).getProcessTaskId(), ProcessStepType.END.getValue()); + ProcessTaskStepThreadComparator comparator = new ProcessTaskStepThreadComparator(processTaskStepRelList, stepList.get(0).getId()); + processTaskStepThreadList.sort(comparator); + } + AfterTransactionJob afterTransactionJob = new AfterTransactionJob<>("PROCESSTASK-STEP-HANDLER"); + for (ProcessTaskStepThread processTaskStepThread : processTaskStepThreadList) { + String operationTypeValue = ""; + if (processTaskStepThread.getOperationType() != null) { + operationTypeValue = processTaskStepThread.getOperationType().getValue(); + } + ProcessTaskStepInOperationVo processTaskStepInOperationVo = new ProcessTaskStepInOperationVo( + processTaskStepThread.getProcessTaskId(), + processTaskStepThread.getProcessTaskStepId(), + operationTypeValue + ); + /** 后台异步操作步骤前,在`processtask_step_in_operation`表中插入一条数据,标识该步骤正在后台处理中,异步处理完删除 **/ + IProcessTaskCrossoverMapper processTaskCrossoverMapper = CrossoverServiceFactory.getApi(IProcessTaskCrossoverMapper.class); + processTaskCrossoverMapper.insertProcessTaskStepInOperation(processTaskStepInOperationVo); + processTaskStepThread.setInOperationId(processTaskStepInOperationVo.getId()); + afterTransactionJob.executeInOrder(processTaskStepThread); + } + } + protected synchronized static void doNext(ProcessTaskOperationType operationType, ProcessStepThread thread) { String operationTypeValue = ""; if (operationType != null) { @@ -2622,7 +2676,9 @@ public abstract class ProcessStepHandlerBase implements IProcessStepHandler { processTaskCrossoverMapper.deleteProcessTaskStepInOperationById(processTaskStepInOperationVo.getId()); return 1; }); - TransactionSynchronizationPool.execute(thread); +// TransactionSynchronizationPool.execute(thread); + AfterTransactionJob afterTransactionJob = new AfterTransactionJob<>("PROCESSTASK-STEP-HANDLER"); + afterTransactionJob.execute(thread); } /** diff --git a/src/main/java/neatlogic/framework/process/stephandler/core/ProcessTaskStepThread.java b/src/main/java/neatlogic/framework/process/stephandler/core/ProcessTaskStepThread.java new file mode 100644 index 0000000000000000000000000000000000000000..df2606956a177c1dd9fda8b6971876b4a1955f2a --- /dev/null +++ b/src/main/java/neatlogic/framework/process/stephandler/core/ProcessTaskStepThread.java @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package neatlogic.framework.process.stephandler.core; + +import neatlogic.framework.asynchronization.thread.NeatLogicThread; +import neatlogic.framework.crossover.CrossoverServiceFactory; +import neatlogic.framework.process.constvalue.ProcessStepMode; +import neatlogic.framework.process.constvalue.ProcessTaskOperationType; +import neatlogic.framework.process.crossover.IProcessTaskCrossoverMapper; +import neatlogic.framework.process.dto.ProcessTaskStepVo; + +public abstract class ProcessTaskStepThread extends NeatLogicThread { + + private final ProcessTaskStepVo processTaskStepVo; + + private final Long processTaskId; + + private final Long processTaskStepId; + + private final ProcessStepMode mode; + + private final ProcessTaskOperationType operationType; + + private Long inOperationId; + + public ProcessTaskStepThread(ProcessTaskOperationType operationType, ProcessTaskStepVo processTaskStepVo, ProcessStepMode mode) { + super("PROCESSTASK-STEP-" + (operationType != null ? operationType.getValue() : "空") + (processTaskStepVo != null ? "-" + processTaskStepVo.getName() + "_" +processTaskStepVo.getId() : "")); + this.processTaskStepVo = processTaskStepVo; + this.processTaskId = processTaskStepVo.getProcessTaskId(); + this.processTaskStepId = processTaskStepVo.getId(); + this.mode = mode; + this.operationType = operationType; + } + @Override + protected void execute() { + try { + myExecute(processTaskStepVo); + } finally { + if (inOperationId != null) { + IProcessTaskCrossoverMapper processTaskCrossoverMapper = CrossoverServiceFactory.getApi(IProcessTaskCrossoverMapper.class); + processTaskCrossoverMapper.deleteProcessTaskStepInOperationById(inOperationId); + } + } + } + + protected abstract void myExecute(ProcessTaskStepVo processTaskStepVo); + + public ProcessTaskStepVo getProcessTaskStepVo() { + return processTaskStepVo; + } + + public Long getProcessTaskId() { + return processTaskId; + } + + public void setInOperationId(Long inOperationId) { + this.inOperationId = inOperationId; + } + + public Long getProcessTaskStepId() { + return processTaskStepId; + } + + public ProcessStepMode getMode() { + return mode; + } + + public ProcessTaskOperationType getOperationType() { + return operationType; + } +} diff --git a/src/main/java/neatlogic/framework/process/stephandler/core/ProcessTaskStepThreadComparator.java b/src/main/java/neatlogic/framework/process/stephandler/core/ProcessTaskStepThreadComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..60356cf5275e3dcfcbdfbc5771b376a8700098eb --- /dev/null +++ b/src/main/java/neatlogic/framework/process/stephandler/core/ProcessTaskStepThreadComparator.java @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package neatlogic.framework.process.stephandler.core; + +import neatlogic.framework.process.constvalue.ProcessFlowDirection; +import neatlogic.framework.process.constvalue.ProcessStepMode; +import neatlogic.framework.process.constvalue.ProcessTaskOperationType; +import neatlogic.framework.process.dto.ProcessTaskStepRelVo; + +import java.util.*; + +public class ProcessTaskStepThreadComparator implements Comparator { + + private final List processTaskStepRelList; + + private final Long endProcessTaskStepId; + + public ProcessTaskStepThreadComparator(List processTaskStepRelList, Long endProcessTaskStepId) { + this.processTaskStepRelList = processTaskStepRelList; + this.endProcessTaskStepId = endProcessTaskStepId; + } + + /** + * e1是新添加元素,e2是已存在的元素 + * @param e1 the first object to be compared. + * @param e2 the second object to be compared. + * @return 返回-1时,e1排在e2前面,否则e2排在e1前面 + */ + @Override + public int compare(ProcessTaskStepThread e1, ProcessTaskStepThread e2) { + if (e1.getOperationType() != e2.getOperationType()) { + if (e1.getOperationType() != ProcessTaskOperationType.STEP_ACTIVE) { + return 1; + } + if (e2.getOperationType() != ProcessTaskOperationType.STEP_ACTIVE) { + return -1; + } + } + if (Objects.equals(e1.getProcessTaskStepId(), endProcessTaskStepId)) { + return 1; + } + if (Objects.equals(e2.getProcessTaskStepId(), endProcessTaskStepId)) { + return -1; + } + // 如果e1是e2的后继步骤,则e2先排在e1前面 + if (checkIsSubsequentStep(e2.getProcessTaskStepId(), e1.getProcessTaskStepId())) { + return 1; + } + // 如果e2是e1的后继步骤,则e1先排在e2前面 + if (checkIsSubsequentStep(e1.getProcessTaskStepId(), e2.getProcessTaskStepId())) { + return -1; + } + if (e1.getMode() == ProcessStepMode.MT && e2.getMode() == ProcessStepMode.AT) { + return -1; + } else if (e1.getMode() == ProcessStepMode.AT && e2.getMode() == ProcessStepMode.MT) { + return 1; + } + return 0; + } + + /** + * 判断目标步骤是不是当前步骤地后继步骤 + * @param currentProcessTaskStepId 当前步骤 + * @param targetProcessTaskStepId 目标步骤 + * @return + */ + private boolean checkIsSubsequentStep(Long currentProcessTaskStepId, Long targetProcessTaskStepId) { + List> routeList = new ArrayList<>(); + List routeStepList = new ArrayList<>(); + routeList.add(routeStepList); + getAllRouteList(currentProcessTaskStepId, routeList, routeStepList, endProcessTaskStepId, processTaskStepRelList); + // 如果最后一个步骤不是结束节点的路由全部删掉,因为这是回环路由 + Iterator> routeStepIt = routeList.iterator(); + List subsequentStepIdList = new ArrayList<>(); + while (routeStepIt.hasNext()) { + List rsList = routeStepIt.next(); + if (!rsList.get(rsList.size() - 1).equals(endProcessTaskStepId)) { + routeStepIt.remove(); + } else { + for (Long id : rsList) { + if (!subsequentStepIdList.contains(id)) { + subsequentStepIdList.add(id); + } + } + } + } +// System.out.println(currentProcessTaskStep.getName() + " subsequentStepIdList = " + JSON.toJSONString(subsequentStepIdList)); + return subsequentStepIdList.contains(targetProcessTaskStepId); + } + + /** + * + * @param processTaskStepId 当前步骤ID + * @param routeList 收集所有后置路经列表 + * @param routeStepList 遍历过的步骤列表 + * @param endProcessTaskStepId 结束步骤ID + * @param allProcessTaskStepRelList 所有连线列表 + */ + private void getAllRouteList(Long processTaskStepId, List> routeList, List routeStepList, Long endProcessTaskStepId, List allProcessTaskStepRelList) { + if (!routeStepList.contains(processTaskStepId)) { + routeStepList.add(processTaskStepId); + if (!processTaskStepId.equals(endProcessTaskStepId)) { + List toProcessTaskStepIdList = new ArrayList<>(); + for (ProcessTaskStepRelVo processTaskStepRelVo : allProcessTaskStepRelList) { + if (Objects.equals(processTaskStepRelVo.getFromProcessTaskStepId(), processTaskStepId) && Objects.equals(processTaskStepRelVo.getType(), ProcessFlowDirection.FORWARD.getValue())) { + toProcessTaskStepIdList.add(processTaskStepRelVo.getToProcessTaskStepId()); + } + } + List tmpRouteStepList = new ArrayList<>(routeStepList); + for (int i = 0; i < toProcessTaskStepIdList.size(); i++) { + Long toProcessTaskStepId = toProcessTaskStepIdList.get(i); + if (i == 0) { + getAllRouteList(toProcessTaskStepId, routeList, routeStepList, endProcessTaskStepId, allProcessTaskStepRelList); + } else { + List newRouteStepList = new ArrayList<>(tmpRouteStepList); + routeList.add(newRouteStepList); + getAllRouteList(toProcessTaskStepId, routeList, newRouteStepList, endProcessTaskStepId, allProcessTaskStepRelList); + } + } + } + } + } + +}