From d87fbccf7166d5bb4600faaa47fba58cf7632e01 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Fri, 2 Aug 2024 21:38:30 +0800 Subject: [PATCH 01/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../changelog/2024-08-02/neatlogic_tenant.sql | 14 ++++++++++++++ .../process/changelog/2024-08-02/version.json | 10 ++++++++++ .../resources/process/sqlscript/ddl.sql | 18 ++++++++++++++++++ 3 files changed, 42 insertions(+) create mode 100644 src/main/resources/neatlogic/resources/process/changelog/2024-08-02/neatlogic_tenant.sql create mode 100644 src/main/resources/neatlogic/resources/process/changelog/2024-08-02/version.json diff --git a/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/neatlogic_tenant.sql b/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/neatlogic_tenant.sql new file mode 100644 index 000000000..35c77a149 --- /dev/null +++ b/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/neatlogic_tenant.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS `processtask_async_create` ( + `id` bigint NOT NULL COMMENT '主键ID', + `processtask_id` bigint DEFAULT NULL COMMENT '工单ID', + `title` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '标题', + `status` enum('doing','done','failed','aborted','redo') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '状态', + `config` longtext COLLATE utf8mb4_general_ci NOT NULL COMMENT '配置信息', + `error` longtext COLLATE utf8mb4_general_ci COMMENT '异常信息', + `try_count` int NOT NULL DEFAULT '0' COMMENT '尝试次数', + `server_id` int NOT NULL COMMENT '服务器ID', + `fcu` char(32) COLLATE utf8mb4_general_ci NOT NULL COMMENT '创建人', + `fcd` timestamp(3) NOT NULL COMMENT '创建时间', + `lcd` timestamp(3) NULL DEFAULT NULL COMMENT '修改时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT = '异步创建工单表'; \ No newline at end of file diff --git a/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/version.json b/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/version.json new file mode 100644 index 000000000..6608c91f6 --- /dev/null +++ b/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/version.json @@ -0,0 +1,10 @@ +{ + "content":[ + { + "type":"新增功能", + "detail":[ + {"msg":"1.通过changelog刷新channel表和channel_authority表数据"} + ] + } + ] +} diff --git a/src/main/resources/neatlogic/resources/process/sqlscript/ddl.sql b/src/main/resources/neatlogic/resources/process/sqlscript/ddl.sql index c397a8cc5..b035849f6 100644 --- a/src/main/resources/neatlogic/resources/process/sqlscript/ddl.sql +++ b/src/main/resources/neatlogic/resources/process/sqlscript/ddl.sql @@ -653,6 +653,24 @@ CREATE TABLE IF NOT EXISTS `processtask_auto_score` ( PRIMARY KEY (`processtask_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '工单自动评分表'; +-- ---------------------------- +-- Table structure for processtask_async_create +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `processtask_async_create` ( + `id` bigint NOT NULL COMMENT '主键ID', + `processtask_id` bigint DEFAULT NULL COMMENT '工单ID', + `title` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '标题', + `status` enum('doing','done','failed','aborted','redo') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '状态', + `config` longtext COLLATE utf8mb4_general_ci NOT NULL COMMENT '配置信息', + `error` longtext COLLATE utf8mb4_general_ci COMMENT '异常信息', + `try_count` int NOT NULL DEFAULT '0' COMMENT '尝试次数', + `server_id` int NOT NULL COMMENT '服务器ID', + `fcu` char(32) COLLATE utf8mb4_general_ci NOT NULL COMMENT '创建人', + `fcd` timestamp(3) NOT NULL COMMENT '创建时间', + `lcd` timestamp(3) NULL DEFAULT NULL COMMENT '修改时间', + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT = '异步创建工单表'; + -- ---------------------------- -- Table structure for processtask_config -- ---------------------------- -- Gitee From 01586c15a0346eb854fa0d0dc78ccb093bc2e518 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Fri, 2 Aug 2024 21:40:29 +0800 Subject: [PATCH 02/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../ProcessTaskAsyncCreateMapper.java | 36 ++++++++ .../ProcessTaskAsyncCreateMapper.xml | 90 +++++++++++++++++++ .../ProcessTaskAsyncCreateService.java | 36 ++++++++ 3 files changed, 162 insertions(+) create mode 100644 src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.java create mode 100644 src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml create mode 100644 src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java diff --git a/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.java b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.java new file mode 100644 index 000000000..3919238af --- /dev/null +++ b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package neatlogic.module.process.dao.mapper.processtask; + +import neatlogic.framework.process.crossover.IProcessTaskAsyncCreateCrossoverMapper; +import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; + +import java.util.List; + +public interface ProcessTaskAsyncCreateMapper extends IProcessTaskAsyncCreateCrossoverMapper { + + ProcessTaskAsyncCreateVo getProcessTaskAsyncCreateById(Long id); + + List getProcessTaskAsyncCreateList(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); + + int getProcessTaskAsyncCreateCount(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); + + int insertProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); + + int updateProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); +} diff --git a/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml new file mode 100644 index 000000000..473b83c74 --- /dev/null +++ b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml @@ -0,0 +1,90 @@ + + + + + + + + + + + + + + INSERT INTO `processtask_async_create` ( + `id`, + `processtask_id`, + `title`, + `status`, + `config`, + `try_count`, + `server_id`, + `fcu`, + `fcd` + ) + VALUES + ( + #{id}, + #{processTaskId}, + #{title}, + #{status}, + #{configStr}, + 0, + #{serverId}, + #{fcu}, + NOW(3) + ) + + + + UPDATE `processtask_async_create` + SET + `processtask_id` = #{processTaskId}, + `status` = #{status}, + `error` = #{error}, + `try_count` = `try_count` + 1, + `lcd` = NOW(3) + WHERE `id` = #{id} + + \ No newline at end of file diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java new file mode 100644 index 000000000..71931b65d --- /dev/null +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package neatlogic.module.process.service; + +import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; + +public interface ProcessTaskAsyncCreateService { + /** + * 添加新的工单信息到阻塞队列 + * @param processTaskAsyncCreateVo + * @throws InterruptedException + */ + void addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException; + + /** + * 添加需要重新执行的工单信息到阻塞队列 + * @param processTaskAsyncCreateVo + * @throws InterruptedException + */ + void addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException; +} -- Gitee From 1453641a2a552dc49fdd8a3fd9400c9ed6dd0de5 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Fri, 2 Aug 2024 21:46:39 +0800 Subject: [PATCH 03/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../api/processtask/ProcessTaskCreateApi.java | 17 +- .../ProcessTaskAsyncCreateServiceImpl.java | 158 ++++++++++++++++++ .../ProcessTaskCreatePublicServiceImpl.java | 76 ++++----- 3 files changed, 212 insertions(+), 39 deletions(-) create mode 100644 src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java diff --git a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java index 1a5673e55..07e173fe0 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java @@ -4,13 +4,16 @@ import com.alibaba.fastjson.JSONObject; import neatlogic.framework.auth.core.AuthAction; import neatlogic.framework.common.constvalue.ApiParamType; import neatlogic.framework.process.auth.PROCESS_BASE; +import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; import neatlogic.framework.restful.annotation.*; import neatlogic.framework.restful.constvalue.OperationTypeEnum; import neatlogic.framework.restful.core.privateapi.PrivateApiComponentBase; +import neatlogic.module.process.service.ProcessTaskAsyncCreateService; import neatlogic.module.process.service.ProcessTaskCreatePublicService; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.Objects; @AuthAction(action = PROCESS_BASE.class) @Service @@ -21,6 +24,9 @@ public class ProcessTaskCreateApi extends PrivateApiComponentBase { @Resource private ProcessTaskCreatePublicService processTaskCreatePublicService; + @Resource + private ProcessTaskAsyncCreateService processTaskAsyncCreateService; + @Override public String getToken() { return "processtask/create"; @@ -63,6 +69,15 @@ public class ProcessTaskCreateApi extends PrivateApiComponentBase { @Description(desc = "nmpap.processtaskcreateapi.getname") @Override public Object myDoService(JSONObject jsonObj) throws Exception { - return processTaskCreatePublicService.createProcessTask(jsonObj); + Integer isAsync = jsonObj.getInteger("isAsync"); + if (Objects.equals(isAsync, 1)) { + Long newProcessTaskId = jsonObj.getLong("newProcessTaskId"); + processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(jsonObj)); + JSONObject resultObj = new JSONObject(); + resultObj.put("processTaskId", newProcessTaskId); + return resultObj; + } else { + return processTaskCreatePublicService.createProcessTask(jsonObj); + } } } diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java new file mode 100644 index 000000000..8a6d65ec9 --- /dev/null +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package neatlogic.module.process.service; + +import com.alibaba.fastjson.JSONObject; +import neatlogic.framework.asynchronization.thread.NeatLogicThread; +import neatlogic.framework.asynchronization.threadlocal.TenantContext; +import neatlogic.framework.asynchronization.threadlocal.UserContext; +import neatlogic.framework.common.config.Config; +import neatlogic.framework.crossover.CrossoverServiceFactory; +import neatlogic.framework.dao.mapper.TenantMapper; +import neatlogic.framework.dto.TenantVo; +import neatlogic.framework.process.crossover.IProcessTaskAsyncCreateCrossoverService; +import neatlogic.framework.process.crossover.IProcessTaskCreatePublicCrossoverService; +import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; +import neatlogic.module.process.dao.mapper.processtask.ProcessTaskAsyncCreateMapper; +import neatlogic.module.process.dao.mapper.processtask.ProcessTaskMapper; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +@Service +public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreateService, IProcessTaskAsyncCreateCrossoverService { + private static final Logger logger = LoggerFactory.getLogger(ProcessTaskAsyncCreateServiceImpl.class); + + private final static BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); + + @Resource + private ProcessTaskAsyncCreateMapper processTaskAsyncCreateMapper; + + @Resource + private ProcessTaskMapper processTaskMapper; + + @Resource + private TenantMapper tenantMapper; + + @PostConstruct + public void init() throws InterruptedException { + // 启动服务器时加载数据库中`processtask_async_create`表status为doing,server_id为Config.SCHEDULE_SERVER_ID的数据到blockingQueue中 + TenantContext.get().setUseDefaultDatasource(true); + List tenantList = tenantMapper.getAllActiveTenant(); + for (TenantVo tenantVo : tenantList) { + TenantContext.get().switchTenant(tenantVo.getUuid()); + ProcessTaskAsyncCreateVo searchVo = new ProcessTaskAsyncCreateVo(); + searchVo.setStatus("doing"); + searchVo.setServerId(Config.SCHEDULE_SERVER_ID); + int rowNum = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateCount(searchVo); + if (rowNum > 0) { + searchVo.setRowNum(rowNum); + searchVo.setPageSize(100); + Integer pageCount = searchVo.getPageCount(); + List doneList = new ArrayList<>(); + for (int currentPage = 1; currentPage <= pageCount; currentPage++) { + searchVo.setCurrentPage(currentPage); + List list = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateList(searchVo); + List processTaskIdList = list.stream().map(ProcessTaskAsyncCreateVo::getProcessTaskId).filter(Objects::nonNull).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(processTaskIdList)) { + processTaskIdList = processTaskMapper.checkProcessTaskIdListIsExists(processTaskIdList); + } + for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : list) { + if (processTaskIdList.contains(processTaskAsyncCreateVo.getProcessTaskId())) { + processTaskAsyncCreateVo.setStatus("done"); + doneList.add(processTaskAsyncCreateVo); + } else { + processTaskAsyncCreateVo.setTenantUuid(tenantVo.getUuid()); + blockingQueue.put(processTaskAsyncCreateVo); + } + } + } + for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doneList) { + processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); + } + } + } + TenantContext.get().setUseDefaultDatasource(true); + + Thread t = new Thread(new NeatLogicThread("ASYNC-CREATE-PROCESSTASK-MANAGER") { + @Override + protected void execute() { + IProcessTaskCreatePublicCrossoverService processTaskCreatePublicCrossoverService = CrossoverServiceFactory.getApi(IProcessTaskCreatePublicCrossoverService.class); + while (!Thread.currentThread().isInterrupted()) { + ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = null; + try { + processTaskAsyncCreateVo = blockingQueue.take(); + TenantContext.get().switchTenant(processTaskAsyncCreateVo.getTenantUuid()); + JSONObject resultObj = processTaskCreatePublicCrossoverService.createProcessTask(processTaskAsyncCreateVo.getConfig()); + Long processTaskId = resultObj.getLong("processTaskId"); + processTaskAsyncCreateVo.setProcessTaskId(processTaskId); + processTaskAsyncCreateVo.setStatus("done"); + } catch (InterruptedException e) { + if (processTaskAsyncCreateVo != null) { + processTaskAsyncCreateVo.setStatus("failed"); + processTaskAsyncCreateVo.setError(ExceptionUtils.getStackTrace(e)); + } + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + if (processTaskAsyncCreateVo != null) { + processTaskAsyncCreateVo.setStatus("failed"); + processTaskAsyncCreateVo.setError(ExceptionUtils.getStackTrace(e)); + } + logger.error(e.getMessage(), e); + } finally { + if (processTaskAsyncCreateVo != null) { + processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); + } + } + } + } + }); + t.setDaemon(true); + t.start(); + } + + @Override + public void addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + processTaskAsyncCreateVo.setTenantUuid(TenantContext.get().getTenantUuid()); + processTaskAsyncCreateVo.setTitle(processTaskAsyncCreateVo.getConfig().getString("title")); + processTaskAsyncCreateVo.setProcessTaskId(processTaskAsyncCreateVo.getConfig().getLong("newProcessTaskId")); + processTaskAsyncCreateVo.setStatus("doing"); + processTaskAsyncCreateVo.setFcu(UserContext.get().getUserUuid()); + processTaskAsyncCreateVo.setServerId(Config.SCHEDULE_SERVER_ID); + processTaskAsyncCreateMapper.insertProcessTaskAsyncCreate(processTaskAsyncCreateVo); + blockingQueue.put(processTaskAsyncCreateVo); + } + + @Override + public void addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + processTaskAsyncCreateVo.setTenantUuid(TenantContext.get().getTenantUuid()); + blockingQueue.put(processTaskAsyncCreateVo); + } +} diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java index 45b33a88f..1d0d57e4d 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java @@ -2,9 +2,7 @@ package neatlogic.module.process.service; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import neatlogic.framework.asynchronization.thread.NeatLogicThread; import neatlogic.framework.asynchronization.threadlocal.UserContext; -import neatlogic.framework.asynchronization.threadpool.CachedThreadPool; import neatlogic.framework.common.constvalue.SystemUser; import neatlogic.framework.dao.mapper.UserMapper; import neatlogic.framework.dao.mapper.region.RegionMapper; @@ -34,7 +32,6 @@ import neatlogic.framework.process.exception.processtask.ProcessTaskNextStepIlle import neatlogic.framework.process.exception.processtask.ProcessTaskNextStepOverOneException; import neatlogic.framework.service.AuthenticationInfoService; import neatlogic.framework.service.RegionService; -import neatlogic.framework.util.SnowflakeUtil; import neatlogic.module.process.dao.mapper.catalog.ChannelMapper; import neatlogic.module.process.dao.mapper.catalog.PriorityMapper; import neatlogic.module.process.dao.mapper.process.ProcessMapper; @@ -47,7 +44,10 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; @Service public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePublicService, IProcessTaskCreatePublicCrossoverService { @@ -285,39 +285,39 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl } Long processTaskId = null; - Integer isAsync = paramObj.getInteger("isAsync"); - if (Objects.equals(isAsync, 1)) { - Long newProcessTaskId = SnowflakeUtil.uniqueLong(); - NeatLogicThread neatLogicThread = new NeatLogicThread("PUBLIC_CREATE_PROCESSTASK_" + newProcessTaskId, true) { - @Override - protected void execute() { - try { - //暂存 - //TODO isNeedValid 参数是否需要??? - paramObj.put("isNeedValid", 1); - JSONObject saveResultObj = processTaskService.saveProcessTaskDraft(paramObj, newProcessTaskId); - - //查询可执行下一 步骤 - Long processTaskId = saveResultObj.getLong("processTaskId"); - List nextStepIdList = processTaskMapper.getToProcessTaskStepIdListByFromIdAndType(saveResultObj.getLong("processTaskStepId"), ProcessFlowDirection.FORWARD.getValue()); - if (nextStepIdList.isEmpty()) { - throw new ProcessTaskNextStepIllegalException(processTaskId); - } - if (nextStepIdList.size() != 1) { - throw new ProcessTaskNextStepOverOneException(processTaskId); - } - saveResultObj.put("nextStepId", nextStepIdList.get(0)); - - //流转 - processTaskService.startProcessProcessTask(saveResultObj); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - }; - CachedThreadPool.execute(neatLogicThread); - processTaskId = newProcessTaskId; - } else { +// Integer isAsync = paramObj.getInteger("isAsync"); +// if (Objects.equals(isAsync, 1)) { +// Long newProcessTaskId = SnowflakeUtil.uniqueLong(); +// NeatLogicThread neatLogicThread = new NeatLogicThread("PUBLIC_CREATE_PROCESSTASK_" + newProcessTaskId, true) { +// @Override +// protected void execute() { +// try { +// //暂存 +// //TODO isNeedValid 参数是否需要??? +// paramObj.put("isNeedValid", 1); +// JSONObject saveResultObj = processTaskService.saveProcessTaskDraft(paramObj, newProcessTaskId); +// +// //查询可执行下一 步骤 +// Long processTaskId = saveResultObj.getLong("processTaskId"); +// List nextStepIdList = processTaskMapper.getToProcessTaskStepIdListByFromIdAndType(saveResultObj.getLong("processTaskStepId"), ProcessFlowDirection.FORWARD.getValue()); +// if (nextStepIdList.isEmpty()) { +// throw new ProcessTaskNextStepIllegalException(processTaskId); +// } +// if (nextStepIdList.size() != 1) { +// throw new ProcessTaskNextStepOverOneException(processTaskId); +// } +// saveResultObj.put("nextStepId", nextStepIdList.get(0)); +// +// //流转 +// processTaskService.startProcessProcessTask(saveResultObj); +// } catch (Exception e) { +// logger.error(e.getMessage(), e); +// } +// } +// }; +// CachedThreadPool.execute(neatLogicThread); +// processTaskId = newProcessTaskId; +// } else { //暂存 //TODO isNeedValid 参数是否需要??? paramObj.put("isNeedValid", 1); @@ -337,7 +337,7 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl //流转 processTaskService.startProcessProcessTask(saveResultObj); - } +// } result.put("processTaskId", processTaskId); return result; -- Gitee From 361d8b0b2268c18bdbe0efd1d43c94c9de20f901 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Fri, 2 Aug 2024 21:50:42 +0800 Subject: [PATCH 04/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../RedoAsyncCreateProcessTaskApi.java | 121 ++++++++++++++++++ .../test/TestAsyncCreateProcessTaskApi.java | 97 ++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java create mode 100644 src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java diff --git a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java new file mode 100644 index 000000000..50e003895 --- /dev/null +++ b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package neatlogic.module.process.api.processtask; + +import com.alibaba.fastjson.JSONObject; +import neatlogic.framework.auth.core.AuthAction; +import neatlogic.framework.common.constvalue.ApiParamType; +import neatlogic.framework.process.auth.PROCESSTASK_MODIFY; +import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; +import neatlogic.framework.restful.annotation.*; +import neatlogic.framework.restful.constvalue.OperationTypeEnum; +import neatlogic.framework.restful.core.privateapi.PrivateApiComponentBase; +import neatlogic.module.process.dao.mapper.processtask.ProcessTaskAsyncCreateMapper; +import neatlogic.module.process.dao.mapper.processtask.ProcessTaskMapper; +import neatlogic.module.process.service.ProcessTaskAsyncCreateService; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +@Service +@AuthAction(action = PROCESSTASK_MODIFY.class) +@OperationType(type = OperationTypeEnum.CREATE) +public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { + + @Resource + private ProcessTaskAsyncCreateMapper processTaskAsyncCreateMapper; + + @Resource + private ProcessTaskMapper processTaskMapper; + + @Resource + private ProcessTaskAsyncCreateService processTaskAsyncCreateService; + + @Override + public String getName() { + return "nmpap.redoasynccreateprocesstaskapi.getname"; + } + + @Input({ + @Param(name = "id", type = ApiParamType.LONG, desc = "id"), + @Param(name = "serverId", type = ApiParamType.INTEGER, desc = "term.framework.serverid"), + }) + @Output({ + @Param(name = "processTaskIdList", type = ApiParamType.LONG, desc = "term.itsm.processtaskidlist") + }) + @Description(desc = "nmpap.redoasynccreateprocesstaskapi.getname") + @Override + public Object myDoService(JSONObject paramObj) throws Exception { + List processTaskIdArray = new ArrayList<>(); + Long id = paramObj.getLong("id"); + Integer serverId = paramObj.getInteger("serverId"); + if (id != null) { + ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateById(id); + if (processTaskAsyncCreateVo != null) { + processTaskIdArray.add(processTaskAsyncCreateVo.getProcessTaskId()); + processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); + } + + } else if (serverId != null) { + ProcessTaskAsyncCreateVo searchVo = new ProcessTaskAsyncCreateVo(); + searchVo.setStatus("redo"); + searchVo.setServerId(serverId); + int rowNum = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateCount(searchVo); + if (rowNum > 0) { + searchVo.setRowNum(rowNum); + searchVo.setPageSize(100); + Integer pageCount = searchVo.getPageCount(); + List doneList = new ArrayList<>(); + for (int currentPage = 1; currentPage <= pageCount; currentPage++) { + searchVo.setCurrentPage(currentPage); + List list = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateList(searchVo); + List processTaskIdList = list.stream().map(ProcessTaskAsyncCreateVo::getProcessTaskId).filter(Objects::nonNull).collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(processTaskIdList)) { + processTaskIdList = processTaskMapper.checkProcessTaskIdListIsExists(processTaskIdList); + } + for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : list) { + if (processTaskIdList.contains(processTaskAsyncCreateVo.getProcessTaskId())) { + processTaskAsyncCreateVo.setStatus("done"); + doneList.add(processTaskAsyncCreateVo); + } else { + processTaskIdList.add(processTaskAsyncCreateVo.getProcessTaskId()); + processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); + } + + } + } + for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doneList) { + processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); + } + } + } + JSONObject resultObj = new JSONObject(); + resultObj.put("processTaskIdList", processTaskIdArray); + return resultObj; + } + + @Override + public String getToken() { + return "processtask/asynccreate/redo"; + } +} diff --git a/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java new file mode 100644 index 000000000..e32fe95c0 --- /dev/null +++ b/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package neatlogic.module.process.api.processtask.test; + +import com.alibaba.fastjson.JSONObject; +import neatlogic.framework.auth.core.AuthAction; +import neatlogic.framework.common.constvalue.ApiParamType; +import neatlogic.framework.process.auth.PROCESSTASK_MODIFY; +import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; +import neatlogic.framework.restful.annotation.*; +import neatlogic.framework.restful.constvalue.OperationTypeEnum; +import neatlogic.framework.restful.core.privateapi.PrivateApiComponentBase; +import neatlogic.framework.util.SnowflakeUtil; +import neatlogic.module.process.service.ProcessTaskAsyncCreateService; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + +@Service +@AuthAction(action = PROCESSTASK_MODIFY.class) +@OperationType(type = OperationTypeEnum.CREATE) +public class TestAsyncCreateProcessTaskApi extends PrivateApiComponentBase { + + @Resource + private ProcessTaskAsyncCreateService processTaskAsyncCreateService; + + @Override + public String getName() { + return "nmpapt.testasynccreateprocesstaskapi.getname"; + } + + @Input({ + @Param(name = "channel", type = ApiParamType.STRING, isRequired = true, desc = "term.itsm.channel", help = "支持channelUuid和channelName入参"), + @Param(name = "title", type = ApiParamType.STRING, isRequired = true, maxLength = 80, desc = "common.title"), + @Param(name = "owner", type = ApiParamType.STRING, isRequired = true, desc = "term.itsm.owner", help = "上报人uuid和上报人id入参"), + @Param(name = "reporter", type = ApiParamType.STRING, desc = "term.itsm.reporter"), + @Param(name = "priority", type = ApiParamType.STRING, isRequired = true, desc = "common.priority"), + @Param(name = "formAttributeDataList", type = ApiParamType.JSONARRAY, desc = "term.itsm.formattributedatalist"), + @Param(name = "hidecomponentList", type = ApiParamType.JSONARRAY, desc = "term.itsm.hidecomponentlist"), + @Param(name = "readcomponentList", type = ApiParamType.JSONARRAY, desc = "term.itsm.readcomponentlist"), + @Param(name = "content", type = ApiParamType.STRING, desc = "common.content"), + @Param(name = "filePathPrefix", type = ApiParamType.STRING, defaultValue = "file:", desc = "common.filepathprefix"), + @Param(name = "filePathList", type = ApiParamType.JSONARRAY, desc = "common.filepathlist"), + @Param(name = "fileIdList", type = ApiParamType.JSONARRAY, desc = "common.fileidlist"), + @Param(name = "handlerStepInfo", type = ApiParamType.JSONOBJECT, desc = "term.itsm.handlerstepinfo"), + @Param(name = "source", type = ApiParamType.STRING, desc = "common.source"), +// @Param(name = "newProcessTaskId", type = ApiParamType.LONG, desc = "指定工单id,则会使用该id作为工单id"), + @Param(name = "region", type = ApiParamType.STRING, desc = "common.region", help = "全路径or地域id"), + @Param(name = "count", type = ApiParamType.INTEGER, isRequired = true, desc = "common.count") + }) + @Output({ + @Param(name = "processTaskIdList", type = ApiParamType.LONG, desc = "term.itsm.processtaskidlist") + }) + @Description(desc = "nmpapt.testasynccreateprocesstaskapi.getname") + @Override + public Object myDoService(JSONObject paramObj) throws Exception { + System.out.println("start = "); + Integer count = paramObj.getInteger("count"); + String title = paramObj.getString("title"); + String jsonString = paramObj.toJSONString(); + List processTaskIdList = new ArrayList<>(); + for (int i = 0; i < count; i++) { + JSONObject config = JSONObject.parseObject(jsonString); + Long newProcessTaskId = SnowflakeUtil.uniqueLong(); + config.put("newProcessTaskId", newProcessTaskId); + config.put("title", title + "-" + i); + processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(config)); + processTaskIdList.add(newProcessTaskId); + } + System.out.println("end = "); + JSONObject resultObj = new JSONObject(); + resultObj.put("processTaskIdList", processTaskIdList); + return resultObj; + } + + @Override + public String getToken() { + return "processtask/asynccreate/test"; + } +} -- Gitee From f2225b6cc1648414917a2ef71467c7fd816b62e3 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Sat, 3 Aug 2024 01:37:38 +0800 Subject: [PATCH 05/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../RedoAsyncCreateProcessTaskApi.java | 15 +++++---- .../ProcessTaskAsyncCreateService.java | 4 +-- .../ProcessTaskAsyncCreateServiceImpl.java | 32 +++++++++++++------ .../changelog/2024-08-02/neatlogic_tenant.sql | 5 +-- .../resources/process/sqlscript/ddl.sql | 5 +-- 5 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java index 50e003895..6a48833d7 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java @@ -66,16 +66,20 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { @Description(desc = "nmpap.redoasynccreateprocesstaskapi.getname") @Override public Object myDoService(JSONObject paramObj) throws Exception { + JSONObject resultObj = new JSONObject(); List processTaskIdArray = new ArrayList<>(); Long id = paramObj.getLong("id"); Integer serverId = paramObj.getInteger("serverId"); if (id != null) { ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateById(id); - if (processTaskAsyncCreateVo != null) { - processTaskIdArray.add(processTaskAsyncCreateVo.getProcessTaskId()); + if (processTaskAsyncCreateVo != null && Objects.equals(processTaskAsyncCreateVo.getStatus(), "redo")) { + Long processTaskId = processTaskAsyncCreateVo.getProcessTaskId(); + if (processTaskMapper.getProcessTaskById(processTaskId) == null) { + return resultObj; + } + processTaskIdArray.add(processTaskId); processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); } - } else if (serverId != null) { ProcessTaskAsyncCreateVo searchVo = new ProcessTaskAsyncCreateVo(); searchVo.setStatus("redo"); @@ -89,7 +93,7 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { for (int currentPage = 1; currentPage <= pageCount; currentPage++) { searchVo.setCurrentPage(currentPage); List list = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateList(searchVo); - List processTaskIdList = list.stream().map(ProcessTaskAsyncCreateVo::getProcessTaskId).filter(Objects::nonNull).collect(Collectors.toList()); + List processTaskIdList = list.stream().map(ProcessTaskAsyncCreateVo::getProcessTaskId).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(processTaskIdList)) { processTaskIdList = processTaskMapper.checkProcessTaskIdListIsExists(processTaskIdList); } @@ -98,7 +102,7 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { processTaskAsyncCreateVo.setStatus("done"); doneList.add(processTaskAsyncCreateVo); } else { - processTaskIdList.add(processTaskAsyncCreateVo.getProcessTaskId()); + processTaskIdArray.add(processTaskAsyncCreateVo.getProcessTaskId()); processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); } @@ -109,7 +113,6 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { } } } - JSONObject resultObj = new JSONObject(); resultObj.put("processTaskIdList", processTaskIdArray); return resultObj; } diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java index 71931b65d..54bada489 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java @@ -25,12 +25,12 @@ public interface ProcessTaskAsyncCreateService { * @param processTaskAsyncCreateVo * @throws InterruptedException */ - void addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException; + Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException; /** * 添加需要重新执行的工单信息到阻塞队列 * @param processTaskAsyncCreateVo * @throws InterruptedException */ - void addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException; + Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException; } diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java index 8a6d65ec9..e28f81a14 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -28,9 +28,11 @@ import neatlogic.framework.dto.TenantVo; import neatlogic.framework.process.crossover.IProcessTaskAsyncCreateCrossoverService; import neatlogic.framework.process.crossover.IProcessTaskCreatePublicCrossoverService; import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; +import neatlogic.framework.util.SnowflakeUtil; import neatlogic.module.process.dao.mapper.processtask.ProcessTaskAsyncCreateMapper; import neatlogic.module.process.dao.mapper.processtask.ProcessTaskMapper; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +42,6 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -79,7 +80,7 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate for (int currentPage = 1; currentPage <= pageCount; currentPage++) { searchVo.setCurrentPage(currentPage); List list = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateList(searchVo); - List processTaskIdList = list.stream().map(ProcessTaskAsyncCreateVo::getProcessTaskId).filter(Objects::nonNull).collect(Collectors.toList()); + List processTaskIdList = list.stream().map(ProcessTaskAsyncCreateVo::getProcessTaskId).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(processTaskIdList)) { processTaskIdList = processTaskMapper.checkProcessTaskIdListIsExists(processTaskIdList); } @@ -109,9 +110,7 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate try { processTaskAsyncCreateVo = blockingQueue.take(); TenantContext.get().switchTenant(processTaskAsyncCreateVo.getTenantUuid()); - JSONObject resultObj = processTaskCreatePublicCrossoverService.createProcessTask(processTaskAsyncCreateVo.getConfig()); - Long processTaskId = resultObj.getLong("processTaskId"); - processTaskAsyncCreateVo.setProcessTaskId(processTaskId); + processTaskCreatePublicCrossoverService.createProcessTask(processTaskAsyncCreateVo.getConfig()); processTaskAsyncCreateVo.setStatus("done"); } catch (InterruptedException e) { if (processTaskAsyncCreateVo != null) { @@ -139,20 +138,35 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate } @Override - public void addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + public Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + JSONObject config = processTaskAsyncCreateVo.getConfig(); + if (MapUtils.isEmpty(config)) { + return null; + } + Long processTaskId = config.getLong("newProcessTaskId"); + if (processTaskId != null) { + if (processTaskMapper.getProcessTaskById(processTaskId) != null) { + return null; + } + } else { + processTaskId = SnowflakeUtil.uniqueLong(); + config.put("newProcessTaskId", processTaskId); + } + processTaskAsyncCreateVo.setProcessTaskId(processTaskId); processTaskAsyncCreateVo.setTenantUuid(TenantContext.get().getTenantUuid()); - processTaskAsyncCreateVo.setTitle(processTaskAsyncCreateVo.getConfig().getString("title")); - processTaskAsyncCreateVo.setProcessTaskId(processTaskAsyncCreateVo.getConfig().getLong("newProcessTaskId")); + processTaskAsyncCreateVo.setTitle(config.getString("title")); processTaskAsyncCreateVo.setStatus("doing"); processTaskAsyncCreateVo.setFcu(UserContext.get().getUserUuid()); processTaskAsyncCreateVo.setServerId(Config.SCHEDULE_SERVER_ID); processTaskAsyncCreateMapper.insertProcessTaskAsyncCreate(processTaskAsyncCreateVo); blockingQueue.put(processTaskAsyncCreateVo); + return processTaskId; } @Override - public void addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + public Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { processTaskAsyncCreateVo.setTenantUuid(TenantContext.get().getTenantUuid()); blockingQueue.put(processTaskAsyncCreateVo); + return processTaskAsyncCreateVo.getProcessTaskId(); } } diff --git a/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/neatlogic_tenant.sql b/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/neatlogic_tenant.sql index 35c77a149..4b054c10e 100644 --- a/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/neatlogic_tenant.sql +++ b/src/main/resources/neatlogic/resources/process/changelog/2024-08-02/neatlogic_tenant.sql @@ -1,6 +1,6 @@ CREATE TABLE IF NOT EXISTS `processtask_async_create` ( `id` bigint NOT NULL COMMENT '主键ID', - `processtask_id` bigint DEFAULT NULL COMMENT '工单ID', + `processtask_id` bigint NOT NULL COMMENT '工单ID', `title` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '标题', `status` enum('doing','done','failed','aborted','redo') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '状态', `config` longtext COLLATE utf8mb4_general_ci NOT NULL COMMENT '配置信息', @@ -10,5 +10,6 @@ CREATE TABLE IF NOT EXISTS `processtask_async_create` ( `fcu` char(32) COLLATE utf8mb4_general_ci NOT NULL COMMENT '创建人', `fcd` timestamp(3) NOT NULL COMMENT '创建时间', `lcd` timestamp(3) NULL DEFAULT NULL COMMENT '修改时间', - PRIMARY KEY (`id`) + PRIMARY KEY (`id`), + UNIQUE KEY `idx_processtask_id` (`processtask_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT = '异步创建工单表'; \ No newline at end of file diff --git a/src/main/resources/neatlogic/resources/process/sqlscript/ddl.sql b/src/main/resources/neatlogic/resources/process/sqlscript/ddl.sql index b035849f6..a5ca21798 100644 --- a/src/main/resources/neatlogic/resources/process/sqlscript/ddl.sql +++ b/src/main/resources/neatlogic/resources/process/sqlscript/ddl.sql @@ -658,7 +658,7 @@ CREATE TABLE IF NOT EXISTS `processtask_auto_score` ( -- ---------------------------- CREATE TABLE IF NOT EXISTS `processtask_async_create` ( `id` bigint NOT NULL COMMENT '主键ID', - `processtask_id` bigint DEFAULT NULL COMMENT '工单ID', + `processtask_id` bigint NOT NULL COMMENT '工单ID', `title` varchar(255) COLLATE utf8mb4_general_ci NOT NULL COMMENT '标题', `status` enum('doing','done','failed','aborted','redo') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '状态', `config` longtext COLLATE utf8mb4_general_ci NOT NULL COMMENT '配置信息', @@ -668,7 +668,8 @@ CREATE TABLE IF NOT EXISTS `processtask_async_create` ( `fcu` char(32) COLLATE utf8mb4_general_ci NOT NULL COMMENT '创建人', `fcd` timestamp(3) NOT NULL COMMENT '创建时间', `lcd` timestamp(3) NULL DEFAULT NULL COMMENT '修改时间', - PRIMARY KEY (`id`) + PRIMARY KEY (`id`), + UNIQUE KEY `idx_processtask_id` (`processtask_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT = '异步创建工单表'; -- ---------------------------- -- Gitee From ebdbfbb0c18ea65d21ce245b42cf71d718aa0dec Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Sat, 3 Aug 2024 01:38:36 +0800 Subject: [PATCH 06/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../processtask/test/TestAsyncCreateProcessTaskApi.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java index e32fe95c0..999c53d18 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java @@ -25,7 +25,6 @@ import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; import neatlogic.framework.restful.annotation.*; import neatlogic.framework.restful.constvalue.OperationTypeEnum; import neatlogic.framework.restful.core.privateapi.PrivateApiComponentBase; -import neatlogic.framework.util.SnowflakeUtil; import neatlogic.module.process.service.ProcessTaskAsyncCreateService; import org.springframework.stereotype.Service; @@ -78,11 +77,11 @@ public class TestAsyncCreateProcessTaskApi extends PrivateApiComponentBase { List processTaskIdList = new ArrayList<>(); for (int i = 0; i < count; i++) { JSONObject config = JSONObject.parseObject(jsonString); - Long newProcessTaskId = SnowflakeUtil.uniqueLong(); - config.put("newProcessTaskId", newProcessTaskId); +// Long processTaskId = SnowflakeUtil.uniqueLong(); +// config.put("newProcessTaskId", processTaskId); config.put("title", title + "-" + i); - processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(config)); - processTaskIdList.add(newProcessTaskId); + Long processTaskId = processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(config)); + processTaskIdList.add(processTaskId); } System.out.println("end = "); JSONObject resultObj = new JSONObject(); -- Gitee From 2a254fdf346429bbe80c4a1dab22c3c5c2e440b5 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Sat, 3 Aug 2024 01:50:50 +0800 Subject: [PATCH 07/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../module/process/api/processtask/ProcessTaskCreateApi.java | 5 ++--- .../process/service/ProcessTaskAsyncCreateServiceImpl.java | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java index 07e173fe0..ba2f1732a 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java @@ -71,10 +71,9 @@ public class ProcessTaskCreateApi extends PrivateApiComponentBase { public Object myDoService(JSONObject jsonObj) throws Exception { Integer isAsync = jsonObj.getInteger("isAsync"); if (Objects.equals(isAsync, 1)) { - Long newProcessTaskId = jsonObj.getLong("newProcessTaskId"); - processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(jsonObj)); + Long processTaskId = processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(jsonObj)); JSONObject resultObj = new JSONObject(); - resultObj.put("processTaskId", newProcessTaskId); + resultObj.put("processTaskId", processTaskId); return resultObj; } else { return processTaskCreatePublicService.createProcessTask(jsonObj); diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java index e28f81a14..994cf1834 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -146,9 +146,10 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate Long processTaskId = config.getLong("newProcessTaskId"); if (processTaskId != null) { if (processTaskMapper.getProcessTaskById(processTaskId) != null) { - return null; + processTaskId = null; } - } else { + } + if (processTaskId == null) { processTaskId = SnowflakeUtil.uniqueLong(); config.put("newProcessTaskId", processTaskId); } -- Gitee From b2eeb35fe43cb7534745613581f0f6f07b0770b2 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Sat, 3 Aug 2024 15:38:34 +0800 Subject: [PATCH 08/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../RedoAsyncCreateProcessTaskApi.java | 16 +++++++++++----- .../test/TestAsyncCreateProcessTaskApi.java | 10 ++++++++-- .../processtask/ProcessTaskAsyncCreateMapper.xml | 1 + .../ProcessTaskAsyncCreateServiceImpl.java | 16 +++++++++++----- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java index 6a48833d7..ca457c3b1 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java @@ -33,6 +33,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -81,6 +82,8 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); } } else if (serverId != null) { + List doneList = new ArrayList<>(); + List redoList = new ArrayList<>(); ProcessTaskAsyncCreateVo searchVo = new ProcessTaskAsyncCreateVo(); searchVo.setStatus("redo"); searchVo.setServerId(serverId); @@ -89,7 +92,6 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { searchVo.setRowNum(rowNum); searchVo.setPageSize(100); Integer pageCount = searchVo.getPageCount(); - List doneList = new ArrayList<>(); for (int currentPage = 1; currentPage <= pageCount; currentPage++) { searchVo.setCurrentPage(currentPage); List list = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateList(searchVo); @@ -103,14 +105,18 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { doneList.add(processTaskAsyncCreateVo); } else { processTaskIdArray.add(processTaskAsyncCreateVo.getProcessTaskId()); - processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); + redoList.add(processTaskAsyncCreateVo); } } } - for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doneList) { - processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); - } + } + for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doneList) { + processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); + } + redoList.sort(Comparator.comparing(ProcessTaskAsyncCreateVo::getId)); + for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : redoList) { + processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); } } resultObj.put("processTaskIdList", processTaskIdArray); diff --git a/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java index 999c53d18..558edd43e 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java @@ -29,7 +29,9 @@ import neatlogic.module.process.service.ProcessTaskAsyncCreateService; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.List; @Service @@ -70,7 +72,9 @@ public class TestAsyncCreateProcessTaskApi extends PrivateApiComponentBase { @Description(desc = "nmpapt.testasynccreateprocesstaskapi.getname") @Override public Object myDoService(JSONObject paramObj) throws Exception { - System.out.println("start = "); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + Date startDate = new Date(); + System.out.println("start = " + dateFormat.format(startDate)); Integer count = paramObj.getInteger("count"); String title = paramObj.getString("title"); String jsonString = paramObj.toJSONString(); @@ -83,7 +87,9 @@ public class TestAsyncCreateProcessTaskApi extends PrivateApiComponentBase { Long processTaskId = processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(config)); processTaskIdList.add(processTaskId); } - System.out.println("end = "); + Date endDate = new Date(); + System.out.println("end = " + dateFormat.format(endDate)); + System.out.println("cost = " + (endDate.getTime() - startDate.getTime())); JSONObject resultObj = new JSONObject(); resultObj.put("processTaskIdList", processTaskIdList); return resultObj; diff --git a/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml index 473b83c74..fdfdafa5d 100644 --- a/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml +++ b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml @@ -40,6 +40,7 @@ FROM `processtask_async_create` WHERE `status` = #{status} AND `server_id` = #{serverId} + ORDER BY `id` DESC LIMIT #{startNum}, #{pageSize} diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java index 994cf1834..ddd1c05ec 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -41,6 +41,7 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -68,6 +69,8 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate List tenantList = tenantMapper.getAllActiveTenant(); for (TenantVo tenantVo : tenantList) { TenantContext.get().switchTenant(tenantVo.getUuid()); + List doneList = new ArrayList<>(); + List doingList = new ArrayList<>(); ProcessTaskAsyncCreateVo searchVo = new ProcessTaskAsyncCreateVo(); searchVo.setStatus("doing"); searchVo.setServerId(Config.SCHEDULE_SERVER_ID); @@ -76,7 +79,6 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate searchVo.setRowNum(rowNum); searchVo.setPageSize(100); Integer pageCount = searchVo.getPageCount(); - List doneList = new ArrayList<>(); for (int currentPage = 1; currentPage <= pageCount; currentPage++) { searchVo.setCurrentPage(currentPage); List list = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateList(searchVo); @@ -90,13 +92,17 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate doneList.add(processTaskAsyncCreateVo); } else { processTaskAsyncCreateVo.setTenantUuid(tenantVo.getUuid()); - blockingQueue.put(processTaskAsyncCreateVo); + doingList.add(processTaskAsyncCreateVo); } } } - for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doneList) { - processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); - } + } + for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doneList) { + processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); + } + doingList.sort(Comparator.comparing(ProcessTaskAsyncCreateVo::getId)); + for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doingList) { + blockingQueue.put(processTaskAsyncCreateVo); } } TenantContext.get().setUseDefaultDatasource(true); -- Gitee From 5c6dee208ff93b621aef3e1a65986d1ddcad1915 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Sat, 3 Aug 2024 15:53:36 +0800 Subject: [PATCH 09/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../process/service/ProcessTaskAsyncCreateServiceImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java index ddd1c05ec..f5d639f2b 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -145,6 +145,9 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate @Override public Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + if (processTaskAsyncCreateVo == null) { + return null; + } JSONObject config = processTaskAsyncCreateVo.getConfig(); if (MapUtils.isEmpty(config)) { return null; @@ -172,6 +175,9 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate @Override public Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + if (processTaskAsyncCreateVo == null) { + return null; + } processTaskAsyncCreateVo.setTenantUuid(TenantContext.get().getTenantUuid()); blockingQueue.put(processTaskAsyncCreateVo); return processTaskAsyncCreateVo.getProcessTaskId(); -- Gitee From 945c51793a2aa51b8b719600322a1fde0fce28bb Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Sat, 3 Aug 2024 18:06:06 +0800 Subject: [PATCH 10/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../ProcessTaskAsyncCreateService.java | 6 ++--- .../ProcessTaskAsyncCreateServiceImpl.java | 22 ++++++++++++++----- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java index 54bada489..926f5d664 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java @@ -23,14 +23,12 @@ public interface ProcessTaskAsyncCreateService { /** * 添加新的工单信息到阻塞队列 * @param processTaskAsyncCreateVo - * @throws InterruptedException */ - Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException; + Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); /** * 添加需要重新执行的工单信息到阻塞队列 * @param processTaskAsyncCreateVo - * @throws InterruptedException */ - Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException; + Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); } diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java index f5d639f2b..23c4bf52a 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -63,7 +63,7 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate private TenantMapper tenantMapper; @PostConstruct - public void init() throws InterruptedException { + public void init() { // 启动服务器时加载数据库中`processtask_async_create`表status为doing,server_id为Config.SCHEDULE_SERVER_ID的数据到blockingQueue中 TenantContext.get().setUseDefaultDatasource(true); List tenantList = tenantMapper.getAllActiveTenant(); @@ -102,7 +102,10 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate } doingList.sort(Comparator.comparing(ProcessTaskAsyncCreateVo::getId)); for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doingList) { - blockingQueue.put(processTaskAsyncCreateVo); + boolean offer = blockingQueue.offer(processTaskAsyncCreateVo); + if (!offer && logger.isDebugEnabled()) { + logger.debug("异步创建工单数据加入队列失败, processTaskAsyncCreateVo: " + JSONObject.toJSONString(processTaskAsyncCreateVo)); + } } } TenantContext.get().setUseDefaultDatasource(true); @@ -144,7 +147,7 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate } @Override - public Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + public Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) { if (processTaskAsyncCreateVo == null) { return null; } @@ -169,17 +172,24 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate processTaskAsyncCreateVo.setFcu(UserContext.get().getUserUuid()); processTaskAsyncCreateVo.setServerId(Config.SCHEDULE_SERVER_ID); processTaskAsyncCreateMapper.insertProcessTaskAsyncCreate(processTaskAsyncCreateVo); - blockingQueue.put(processTaskAsyncCreateVo); + boolean offer = blockingQueue.offer(processTaskAsyncCreateVo); + if (!offer && logger.isDebugEnabled()) { + logger.debug("异步创建工单数据加入队列失败, processTaskAsyncCreateVo: " + JSONObject.toJSONString(processTaskAsyncCreateVo)); + } return processTaskId; } @Override - public Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) throws InterruptedException { + public Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) { if (processTaskAsyncCreateVo == null) { return null; } processTaskAsyncCreateVo.setTenantUuid(TenantContext.get().getTenantUuid()); - blockingQueue.put(processTaskAsyncCreateVo); + + boolean offer = blockingQueue.offer(processTaskAsyncCreateVo); + if (!offer && logger.isDebugEnabled()) { + logger.debug("异步创建工单数据加入队列失败, processTaskAsyncCreateVo: " + JSONObject.toJSONString(processTaskAsyncCreateVo)); + } return processTaskAsyncCreateVo.getProcessTaskId(); } } -- Gitee From 473bdf18499c6cb0b2e266c80beeec7e71bf143b Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Tue, 6 Aug 2024 10:00:28 +0800 Subject: [PATCH 11/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../api/processtask/ProcessTaskCreateApi.java | 7 +- .../ProcessTaskImportFromExcelApi.java | 50 +++++---- .../RedoAsyncCreateProcessTaskApi.java | 23 ++-- .../test/TestAsyncCreateProcessTaskApi.java | 10 +- .../ProcessTaskAsyncCreateMapper.java | 4 + .../ProcessTaskAsyncCreateMapper.xml | 16 ++- .../ProcessTaskAsyncCreateService.java | 10 +- .../ProcessTaskAsyncCreateServiceImpl.java | 105 +++++++++++------- .../ProcessTaskCreatePublicService.java | 5 +- .../ProcessTaskCreatePublicServiceImpl.java | 36 ++++-- 10 files changed, 162 insertions(+), 104 deletions(-) diff --git a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java index ba2f1732a..75ed19448 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject; import neatlogic.framework.auth.core.AuthAction; import neatlogic.framework.common.constvalue.ApiParamType; import neatlogic.framework.process.auth.PROCESS_BASE; -import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; +import neatlogic.framework.process.dto.ProcessTaskCreateVo; import neatlogic.framework.restful.annotation.*; import neatlogic.framework.restful.constvalue.OperationTypeEnum; import neatlogic.framework.restful.core.privateapi.PrivateApiComponentBase; @@ -69,14 +69,15 @@ public class ProcessTaskCreateApi extends PrivateApiComponentBase { @Description(desc = "nmpap.processtaskcreateapi.getname") @Override public Object myDoService(JSONObject jsonObj) throws Exception { + ProcessTaskCreateVo processTaskCreateVo = jsonObj.toJavaObject(ProcessTaskCreateVo.class); Integer isAsync = jsonObj.getInteger("isAsync"); if (Objects.equals(isAsync, 1)) { - Long processTaskId = processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(jsonObj)); + Long processTaskId = processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(processTaskCreateVo); JSONObject resultObj = new JSONObject(); resultObj.put("processTaskId", processTaskId); return resultObj; } else { - return processTaskCreatePublicService.createProcessTask(jsonObj); + return processTaskCreatePublicService.createProcessTask(processTaskCreateVo); } } } diff --git a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskImportFromExcelApi.java b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskImportFromExcelApi.java index f40acc125..a6a47be8e 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskImportFromExcelApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskImportFromExcelApi.java @@ -1,5 +1,7 @@ package neatlogic.module.process.api.processtask; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import neatlogic.framework.auth.core.AuthAction; import neatlogic.framework.common.constvalue.ApiParamType; import neatlogic.framework.exception.file.*; @@ -11,9 +13,6 @@ import neatlogic.framework.form.exception.FormActiveVersionNotFoundExcepiton; import neatlogic.framework.form.exception.FormNotFoundException; import neatlogic.framework.process.auth.BATCH_REPORT_PROCESS_TASK; import neatlogic.framework.process.constvalue.ProcessTaskSource; -import neatlogic.module.process.dao.mapper.catalog.ChannelMapper; -import neatlogic.module.process.dao.mapper.catalog.PriorityMapper; -import neatlogic.module.process.dao.mapper.processtask.ProcessTaskMapper; import neatlogic.framework.process.dto.*; import neatlogic.framework.process.exception.channel.ChannelNotFoundException; import neatlogic.framework.process.exception.process.ProcessNotFoundException; @@ -21,10 +20,11 @@ import neatlogic.framework.restful.annotation.*; import neatlogic.framework.restful.constvalue.OperationTypeEnum; import neatlogic.framework.restful.core.privateapi.PrivateBinaryStreamApiComponentBase; import neatlogic.framework.util.ExcelUtil; +import neatlogic.module.process.dao.mapper.catalog.ChannelMapper; +import neatlogic.module.process.dao.mapper.catalog.PriorityMapper; import neatlogic.module.process.dao.mapper.process.ProcessMapper; +import neatlogic.module.process.dao.mapper.processtask.ProcessTaskMapper; import neatlogic.module.process.service.ProcessTaskCreatePublicService; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -192,12 +192,13 @@ public class ProcessTaskImportFromExcelApi extends PrivateBinaryStreamApiCompone int successCount = 0; /** 上报工单 */ for (Map map : contentList) { - JSONObject task = parseTask(channelUuid, formAttributeVoList, map, isNeedPriority); - task.put("source", ProcessTaskSource.IMPORT.getValue()); + ProcessTaskCreateVo task = parseTask(channelUuid, formAttributeVoList, map, isNeedPriority); +// task.put("source", ProcessTaskSource.IMPORT.getValue()); + task.setSource(ProcessTaskSource.IMPORT.getValue()); ProcessTaskImportAuditVo auditVo = new ProcessTaskImportAuditVo(); auditVo.setChannelUuid(channelUuid); - auditVo.setTitle(task.getString("title")); - auditVo.setOwner(task.getString("owner")); + auditVo.setTitle(task.getTitle()); + auditVo.setOwner(task.getOwner()); try { JSONObject resultObj = processTaskCreatePublicService.createProcessTask(task); auditVo.setProcessTaskId(resultObj.getLong("processTaskId")); @@ -247,25 +248,31 @@ public class ProcessTaskImportFromExcelApi extends PrivateBinaryStreamApiCompone * @param isNeedPriority 是否需要优先级 * @return */ - private JSONObject parseTask(String channelUuid, List formAttributeList, Map map, int isNeedPriority) { - JSONObject task = new JSONObject(); + private ProcessTaskCreateVo parseTask(String channelUuid, List formAttributeList, Map map, int isNeedPriority) { +// JSONObject task = new JSONObject(); + ProcessTaskCreateVo processTaskCreateVo = new ProcessTaskCreateVo(); JSONArray formAttributeDataList = new JSONArray(); - task.put("channel", channelUuid); +// task.put("channel", channelUuid); + processTaskCreateVo.setChannel(channelUuid); for (Map.Entry entry : map.entrySet()) { String key = entry.getKey().replace("(必填)", ""); if ("标题".equals(key)) { - task.put("title", entry.getValue()); +// task.put("title", entry.getValue()); + processTaskCreateVo.setTitle(entry.getValue()); } else if ("请求人".equals(key)) { if (StringUtils.isNotBlank(entry.getValue())) { - task.put("owner", entry.getValue()); +// task.put("owner", entry.getValue()); + processTaskCreateVo.setOwner(entry.getValue()); } } else if ("优先级".equals(key)) { PriorityVo priority = null; if (isNeedPriority == 1 && StringUtils.isNotBlank(entry.getValue()) && (priority = priorityMapper.getPriorityByName(entry.getValue())) != null) { - task.put("priority", priority.getUuid()); +// task.put("priority", priority.getUuid()); + processTaskCreateVo.setPriority(priority.getUuid()); } } else if ("描述".equals(key)) { - task.put("content", entry.getValue()); +// task.put("content", entry.getValue()); + processTaskCreateVo.setContent(entry.getValue()); } else { if (CollectionUtils.isNotEmpty(formAttributeList) && formAttributeList.stream().anyMatch(o -> Objects.equals(o.getLabel(), key))) { JSONObject formdata = new JSONObject(); @@ -284,10 +291,13 @@ public class ProcessTaskImportFromExcelApi extends PrivateBinaryStreamApiCompone } } } - task.put("formAttributeDataList", formAttributeDataList); - task.put("hidecomponentList", new JSONArray()); - task.put("readcomponentList", new JSONArray()); - return task; +// task.put("formAttributeDataList", formAttributeDataList); +// task.put("hidecomponentList", new JSONArray()); +// task.put("readcomponentList", new JSONArray()); + processTaskCreateVo.setFormAttributeDataList(formAttributeDataList); + processTaskCreateVo.setHidecomponentList(new JSONArray()); + processTaskCreateVo.setReadcomponentList(new JSONArray()); + return processTaskCreateVo; } private Map getTaskDataFromFirstSheet(MultipartFile file) throws Exception { diff --git a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java index ca457c3b1..7f7a72883 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java @@ -33,7 +33,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -79,11 +78,11 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { return resultObj; } processTaskIdArray.add(processTaskId); - processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); + processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo.getId()); } } else if (serverId != null) { - List doneList = new ArrayList<>(); - List redoList = new ArrayList<>(); + List doneIdList = new ArrayList<>(); + List redoIdList = new ArrayList<>(); ProcessTaskAsyncCreateVo searchVo = new ProcessTaskAsyncCreateVo(); searchVo.setStatus("redo"); searchVo.setServerId(serverId); @@ -101,22 +100,22 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { } for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : list) { if (processTaskIdList.contains(processTaskAsyncCreateVo.getProcessTaskId())) { - processTaskAsyncCreateVo.setStatus("done"); - doneList.add(processTaskAsyncCreateVo); +// processTaskAsyncCreateVo.setStatus("done"); + doneIdList.add(processTaskAsyncCreateVo.getId()); } else { processTaskIdArray.add(processTaskAsyncCreateVo.getProcessTaskId()); - redoList.add(processTaskAsyncCreateVo); + redoIdList.add(processTaskAsyncCreateVo.getId()); } } } } - for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doneList) { - processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); + if (CollectionUtils.isNotEmpty(doneIdList)) { + processTaskAsyncCreateMapper.deleteProcessTaskAsyncCreateByIdList(doneIdList); } - redoList.sort(Comparator.comparing(ProcessTaskAsyncCreateVo::getId)); - for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : redoList) { - processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(processTaskAsyncCreateVo); + redoIdList.sort(Long::compareTo); + for (Long redoId : redoIdList) { + processTaskAsyncCreateService.addRedoProcessTaskAsyncCreate(redoId); } } resultObj.put("processTaskIdList", processTaskIdArray); diff --git a/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java index 558edd43e..5aa05c3e3 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/test/TestAsyncCreateProcessTaskApi.java @@ -21,7 +21,7 @@ import com.alibaba.fastjson.JSONObject; import neatlogic.framework.auth.core.AuthAction; import neatlogic.framework.common.constvalue.ApiParamType; import neatlogic.framework.process.auth.PROCESSTASK_MODIFY; -import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; +import neatlogic.framework.process.dto.ProcessTaskCreateVo; import neatlogic.framework.restful.annotation.*; import neatlogic.framework.restful.constvalue.OperationTypeEnum; import neatlogic.framework.restful.core.privateapi.PrivateApiComponentBase; @@ -80,11 +80,13 @@ public class TestAsyncCreateProcessTaskApi extends PrivateApiComponentBase { String jsonString = paramObj.toJSONString(); List processTaskIdList = new ArrayList<>(); for (int i = 0; i < count; i++) { - JSONObject config = JSONObject.parseObject(jsonString); + ProcessTaskCreateVo processTaskCreateVo = paramObj.toJavaObject(ProcessTaskCreateVo.class); +// JSONObject config = JSONObject.parseObject(jsonString); // Long processTaskId = SnowflakeUtil.uniqueLong(); // config.put("newProcessTaskId", processTaskId); - config.put("title", title + "-" + i); - Long processTaskId = processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(new ProcessTaskAsyncCreateVo(config)); +// config.put("title", title + "-" + i); + processTaskCreateVo.setTitle(title + "-" + i); + Long processTaskId = processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(processTaskCreateVo); processTaskIdList.add(processTaskId); } Date endDate = new Date(); diff --git a/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.java b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.java index 3919238af..f263d9f3e 100644 --- a/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.java +++ b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.java @@ -33,4 +33,8 @@ public interface ProcessTaskAsyncCreateMapper extends IProcessTaskAsyncCreateCro int insertProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); int updateProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); + + int deleteProcessTaskAsyncCreateById(Long id); + + int deleteProcessTaskAsyncCreateByIdList(List idList); } diff --git a/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml index fdfdafa5d..e217b5982 100644 --- a/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml +++ b/src/main/java/neatlogic/module/process/dao/mapper/processtask/ProcessTaskAsyncCreateMapper.xml @@ -35,8 +35,7 @@ `id`, `processtask_id` AS processTaskId, `title`, - `status`, - `config` AS configStr + `status` FROM `processtask_async_create` WHERE `status` = #{status} AND `server_id` = #{serverId} @@ -81,11 +80,22 @@ UPDATE `processtask_async_create` SET - `processtask_id` = #{processTaskId}, `status` = #{status}, `error` = #{error}, `try_count` = `try_count` + 1, `lcd` = NOW(3) WHERE `id` = #{id} + + + DELETE FROM `processtask_async_create` WHERE `id` = #{value} + + + + DELETE FROM `processtask_async_create` + WHERE `id` IN + + #{id} + + \ No newline at end of file diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java index 926f5d664..8df7859d9 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateService.java @@ -17,18 +17,18 @@ package neatlogic.module.process.service; -import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; +import neatlogic.framework.process.dto.ProcessTaskCreateVo; public interface ProcessTaskAsyncCreateService { /** * 添加新的工单信息到阻塞队列 - * @param processTaskAsyncCreateVo + * @param processTaskCreateVo */ - Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); + Long addNewProcessTaskAsyncCreate(ProcessTaskCreateVo processTaskCreateVo); /** * 添加需要重新执行的工单信息到阻塞队列 - * @param processTaskAsyncCreateVo + * @param id */ - Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo); + void addRedoProcessTaskAsyncCreate(Long id); } diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java index 23c4bf52a..1cb3fcd12 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -28,11 +28,11 @@ import neatlogic.framework.dto.TenantVo; import neatlogic.framework.process.crossover.IProcessTaskAsyncCreateCrossoverService; import neatlogic.framework.process.crossover.IProcessTaskCreatePublicCrossoverService; import neatlogic.framework.process.dto.ProcessTaskAsyncCreateVo; +import neatlogic.framework.process.dto.ProcessTaskCreateVo; import neatlogic.framework.util.SnowflakeUtil; import neatlogic.module.process.dao.mapper.processtask.ProcessTaskAsyncCreateMapper; import neatlogic.module.process.dao.mapper.processtask.ProcessTaskMapper; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +41,6 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -51,7 +50,7 @@ import java.util.stream.Collectors; public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreateService, IProcessTaskAsyncCreateCrossoverService { private static final Logger logger = LoggerFactory.getLogger(ProcessTaskAsyncCreateServiceImpl.class); - private final static BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); + private final static BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); @Resource private ProcessTaskAsyncCreateMapper processTaskAsyncCreateMapper; @@ -69,8 +68,8 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate List tenantList = tenantMapper.getAllActiveTenant(); for (TenantVo tenantVo : tenantList) { TenantContext.get().switchTenant(tenantVo.getUuid()); - List doneList = new ArrayList<>(); - List doingList = new ArrayList<>(); + List doneIdList = new ArrayList<>(); + List doingIdList = new ArrayList<>(); ProcessTaskAsyncCreateVo searchVo = new ProcessTaskAsyncCreateVo(); searchVo.setStatus("doing"); searchVo.setServerId(Config.SCHEDULE_SERVER_ID); @@ -89,22 +88,21 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : list) { if (processTaskIdList.contains(processTaskAsyncCreateVo.getProcessTaskId())) { processTaskAsyncCreateVo.setStatus("done"); - doneList.add(processTaskAsyncCreateVo); + doneIdList.add(processTaskAsyncCreateVo.getId()); } else { - processTaskAsyncCreateVo.setTenantUuid(tenantVo.getUuid()); - doingList.add(processTaskAsyncCreateVo); + doingIdList.add(processTaskAsyncCreateVo.getId()); } } } } - for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doneList) { - processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); + if (CollectionUtils.isNotEmpty(doneIdList)) { + processTaskAsyncCreateMapper.deleteProcessTaskAsyncCreateByIdList(doneIdList); } - doingList.sort(Comparator.comparing(ProcessTaskAsyncCreateVo::getId)); - for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : doingList) { - boolean offer = blockingQueue.offer(processTaskAsyncCreateVo); + doingIdList.sort(Long::compareTo); + for (Long id : doingIdList) { + boolean offer = blockingQueue.offer(new Task(id)); if (!offer && logger.isDebugEnabled()) { - logger.debug("异步创建工单数据加入队列失败, processTaskAsyncCreateVo: " + JSONObject.toJSONString(processTaskAsyncCreateVo)); + logger.debug("异步创建工单数据加入队列失败, id: " + id); } } } @@ -115,29 +113,36 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate protected void execute() { IProcessTaskCreatePublicCrossoverService processTaskCreatePublicCrossoverService = CrossoverServiceFactory.getApi(IProcessTaskCreatePublicCrossoverService.class); while (!Thread.currentThread().isInterrupted()) { - ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = null; + Task task = null; try { - processTaskAsyncCreateVo = blockingQueue.take(); - TenantContext.get().switchTenant(processTaskAsyncCreateVo.getTenantUuid()); - processTaskCreatePublicCrossoverService.createProcessTask(processTaskAsyncCreateVo.getConfig()); - processTaskAsyncCreateVo.setStatus("done"); + task = blockingQueue.take(); + TenantContext.get().switchTenant(task.getTenantUuid()); + ProcessTaskAsyncCreateVo processTaskAsyncCreate = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateById(task.getId()); + if (processTaskAsyncCreate != null) { + processTaskCreatePublicCrossoverService.createProcessTask(processTaskAsyncCreate.getConfig()); + } +// processTaskAsyncCreateVo.setStatus("done"); + processTaskAsyncCreateMapper.deleteProcessTaskAsyncCreateById(task.getId()); + int i = 10 / 0; } catch (InterruptedException e) { - if (processTaskAsyncCreateVo != null) { + if (task != null) { + ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = new ProcessTaskAsyncCreateVo(); + processTaskAsyncCreateVo.setId(task.getId()); processTaskAsyncCreateVo.setStatus("failed"); processTaskAsyncCreateVo.setError(ExceptionUtils.getStackTrace(e)); + processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); } Thread.currentThread().interrupt(); break; } catch (Exception e) { - if (processTaskAsyncCreateVo != null) { + if (task != null) { + ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = new ProcessTaskAsyncCreateVo(); + processTaskAsyncCreateVo.setId(task.getId()); processTaskAsyncCreateVo.setStatus("failed"); processTaskAsyncCreateVo.setError(ExceptionUtils.getStackTrace(e)); - } - logger.error(e.getMessage(), e); - } finally { - if (processTaskAsyncCreateVo != null) { processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); } + logger.error(e.getMessage(), e); } } } @@ -146,16 +151,31 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate t.start(); } - @Override - public Long addNewProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) { - if (processTaskAsyncCreateVo == null) { - return null; + private static class Task { + + private final Long id; + private final String tenantUuid; + + public Task(Long id) { + this.id = id; + this.tenantUuid = TenantContext.get().getTenantUuid(); } - JSONObject config = processTaskAsyncCreateVo.getConfig(); - if (MapUtils.isEmpty(config)) { + + public Long getId() { + return id; + } + + public String getTenantUuid() { + return tenantUuid; + } + } + + @Override + public Long addNewProcessTaskAsyncCreate(ProcessTaskCreateVo processTaskCreateVo) { + if (processTaskCreateVo == null) { return null; } - Long processTaskId = config.getLong("newProcessTaskId"); + Long processTaskId = processTaskCreateVo.getNewProcessTaskId(); if (processTaskId != null) { if (processTaskMapper.getProcessTaskById(processTaskId) != null) { processTaskId = null; @@ -163,16 +183,17 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate } if (processTaskId == null) { processTaskId = SnowflakeUtil.uniqueLong(); - config.put("newProcessTaskId", processTaskId); + processTaskCreateVo.setNewProcessTaskId(processTaskId); } + ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = new ProcessTaskAsyncCreateVo(); processTaskAsyncCreateVo.setProcessTaskId(processTaskId); - processTaskAsyncCreateVo.setTenantUuid(TenantContext.get().getTenantUuid()); - processTaskAsyncCreateVo.setTitle(config.getString("title")); + processTaskAsyncCreateVo.setTitle(processTaskCreateVo.getTitle()); processTaskAsyncCreateVo.setStatus("doing"); processTaskAsyncCreateVo.setFcu(UserContext.get().getUserUuid()); processTaskAsyncCreateVo.setServerId(Config.SCHEDULE_SERVER_ID); + processTaskAsyncCreateVo.setConfig(processTaskCreateVo); processTaskAsyncCreateMapper.insertProcessTaskAsyncCreate(processTaskAsyncCreateVo); - boolean offer = blockingQueue.offer(processTaskAsyncCreateVo); + boolean offer = blockingQueue.offer(new Task(processTaskAsyncCreateVo.getId())); if (!offer && logger.isDebugEnabled()) { logger.debug("异步创建工单数据加入队列失败, processTaskAsyncCreateVo: " + JSONObject.toJSONString(processTaskAsyncCreateVo)); } @@ -180,16 +201,14 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate } @Override - public Long addRedoProcessTaskAsyncCreate(ProcessTaskAsyncCreateVo processTaskAsyncCreateVo) { - if (processTaskAsyncCreateVo == null) { - return null; + public void addRedoProcessTaskAsyncCreate(Long id) { + if (id == null) { + return; } - processTaskAsyncCreateVo.setTenantUuid(TenantContext.get().getTenantUuid()); - boolean offer = blockingQueue.offer(processTaskAsyncCreateVo); + boolean offer = blockingQueue.offer(new Task(id)); if (!offer && logger.isDebugEnabled()) { - logger.debug("异步创建工单数据加入队列失败, processTaskAsyncCreateVo: " + JSONObject.toJSONString(processTaskAsyncCreateVo)); + logger.debug("异步创建工单数据加入队列失败, id: " + id); } - return processTaskAsyncCreateVo.getProcessTaskId(); } } diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicService.java b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicService.java index 1d336f0f9..d9d9a4850 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicService.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicService.java @@ -1,14 +1,15 @@ package neatlogic.module.process.service; import com.alibaba.fastjson.JSONObject; +import neatlogic.framework.process.dto.ProcessTaskCreateVo; public interface ProcessTaskCreatePublicService { /** * 创建工单 * - * @param paramObj 创建工单所需参数 + * @param processTaskCreateVo 创建工单所需参数 * @return * @throws Exception */ - JSONObject createProcessTask(JSONObject paramObj) throws Exception; + JSONObject createProcessTask(ProcessTaskCreateVo processTaskCreateVo) throws Exception; } diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java index 1d0d57e4d..6ff34e05f 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java @@ -25,6 +25,7 @@ import neatlogic.framework.process.crossover.IProcessTaskCreatePublicCrossoverSe import neatlogic.framework.process.dto.ChannelVo; import neatlogic.framework.process.dto.PriorityVo; import neatlogic.framework.process.dto.ProcessFormVo; +import neatlogic.framework.process.dto.ProcessTaskCreateVo; import neatlogic.framework.process.exception.channel.ChannelNotFoundException; import neatlogic.framework.process.exception.priority.PriorityNotFoundException; import neatlogic.framework.process.exception.process.ProcessNotFoundException; @@ -89,15 +90,25 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl /** * 创建工单 * - * @param paramObj + * @param processTaskCreateVo * @return * @throws Exception */ @Override - public JSONObject createProcessTask(JSONObject paramObj) throws Exception { + public JSONObject createProcessTask(ProcessTaskCreateVo processTaskCreateVo) throws Exception { JSONObject result = new JSONObject(); + JSONObject paramObj = new JSONObject(); + paramObj.put("title", processTaskCreateVo.getTitle()); + paramObj.put("owner", processTaskCreateVo.getOwner()); + paramObj.put("reporter", processTaskCreateVo.getReporter()); + paramObj.put("hidecomponentList", processTaskCreateVo.getHidecomponentList()); + paramObj.put("readcomponentList", processTaskCreateVo.getReadcomponentList()); + paramObj.put("content", processTaskCreateVo.getContent()); + paramObj.put("fileIdList", processTaskCreateVo.getFileIdList());// + paramObj.put("handlerStepInfo", processTaskCreateVo.getHandlerStepInfo()); + paramObj.put("source", processTaskCreateVo.getSource()); //上报人,支持上报人uuid和上报人id入参 - String owner = paramObj.getString("owner"); + String owner = processTaskCreateVo.getOwner(); UserVo userVo = userMapper.getUserByUuid(owner); if (userVo == null) { userVo = userMapper.getUserByUserId(owner); @@ -107,7 +118,7 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl paramObj.put("owner", userVo.getUuid()); } //地域 - String region = paramObj.getString("region"); + String region = processTaskCreateVo.getRegion(); Long regionId = null; if(StringUtils.isNotBlank(region)){ RegionVo regionVo = regionMapper.getRegionByUpwardNamePath(region); @@ -130,7 +141,7 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl paramObj.put("regionId", regionId); //处理channel,支持channelUuid和channelName入参 - String channel = paramObj.getString("channel"); + String channel = processTaskCreateVo.getChannel(); ChannelVo channelVo = channelMapper.getChannelByUuid(channel); if (channelVo == null) { channelVo = channelMapper.getChannelByName(channel); @@ -140,7 +151,7 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl } paramObj.put("channelUuid", channelVo.getUuid()); //优先级 - String priority = paramObj.getString("priority"); + String priority = processTaskCreateVo.getPriority(); if (StringUtils.isNotBlank(priority)) { PriorityVo priorityVo = priorityMapper.getPriorityByUuid(priority); if (priorityVo == null) { @@ -152,9 +163,9 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl paramObj.put("priorityUuid", priorityVo.getUuid()); } // 附件传递文件路径 - JSONArray filePathList = paramObj.getJSONArray("filePathList"); - if( filePathList != null && filePathList.size() > 0 ){ - String filePathPrefix = paramObj.getString("filePathPrefix"); + JSONArray filePathList = processTaskCreateVo.getFilePathList(); + if( filePathList != null && !filePathList.isEmpty()){ + String filePathPrefix = processTaskCreateVo.getFilePathPrefix(); JSONArray fileIdList = new JSONArray(); // MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap(); for (Object filePath: filePathList ) { @@ -176,7 +187,7 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl throw new ProcessNotFoundException(processUuid); } //如果表单属性数据列表,使用的唯一标识是label时,需要转换成attributeUuid - JSONArray formAttributeDataList = paramObj.getJSONArray("formAttributeDataList"); + JSONArray formAttributeDataList = processTaskCreateVo.getFormAttributeDataList(); if (CollectionUtils.isNotEmpty(formAttributeDataList)) { int count = 0; for (int i = 0; i < formAttributeDataList.size(); i++) { @@ -262,9 +273,10 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl } } } + paramObj.put("formAttributeDataList", formAttributeDataList); //代报人,支持代报人uuid和代报人id入参 - String reporter = paramObj.getString("reporter"); + String reporter = processTaskCreateVo.getReporter(); if (StringUtils.isNotBlank(reporter)) { UserVo reporterUserVo = userMapper.getUserByUuid(reporter); if (reporterUserVo == null) { @@ -321,7 +333,7 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl //暂存 //TODO isNeedValid 参数是否需要??? paramObj.put("isNeedValid", 1); - Long newProcessTaskId = paramObj.getLong("newProcessTaskId"); + Long newProcessTaskId = processTaskCreateVo.getNewProcessTaskId(); JSONObject saveResultObj = processTaskService.saveProcessTaskDraft(paramObj, newProcessTaskId); //查询可执行下一 步骤 -- Gitee From e3e69b027e04c571622a5eecfa0f38308b3cb29e Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Thu, 8 Aug 2024 10:01:15 +0800 Subject: [PATCH 12/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20=E8=A1=A8=E5=8D=95?= =?UTF-8?q?=E4=B8=8B=E6=8B=89=E6=A1=86=E5=BC=95=E7=94=A8=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=E7=9F=A9=E9=98=B5=E5=92=8C=E8=A7=86=E5=9B=BE=E7=9F=A9?= =?UTF-8?q?=E9=98=B5=E6=97=B6=EF=BC=8C=E5=AD=97=E6=AE=B5=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E5=80=BC=E5=A4=AA=E5=A4=9A=E4=BC=9A=E5=BC=95=E5=8F=91=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF=E6=9F=A5=E8=AF=A2=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1213024554156032]表单下拉框引用自定义矩阵和视图矩阵时,字段重复值太多会引发循环查询数据库 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1213024554156032 --- .../process/api/processtask/RedoAsyncCreateProcessTaskApi.java | 1 - .../process/service/ProcessTaskAsyncCreateServiceImpl.java | 3 --- 2 files changed, 4 deletions(-) diff --git a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java index 7f7a72883..4517e6c63 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/RedoAsyncCreateProcessTaskApi.java @@ -100,7 +100,6 @@ public class RedoAsyncCreateProcessTaskApi extends PrivateApiComponentBase { } for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : list) { if (processTaskIdList.contains(processTaskAsyncCreateVo.getProcessTaskId())) { -// processTaskAsyncCreateVo.setStatus("done"); doneIdList.add(processTaskAsyncCreateVo.getId()); } else { processTaskIdArray.add(processTaskAsyncCreateVo.getProcessTaskId()); diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java index 1cb3fcd12..019b1367d 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -87,7 +87,6 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate } for (ProcessTaskAsyncCreateVo processTaskAsyncCreateVo : list) { if (processTaskIdList.contains(processTaskAsyncCreateVo.getProcessTaskId())) { - processTaskAsyncCreateVo.setStatus("done"); doneIdList.add(processTaskAsyncCreateVo.getId()); } else { doingIdList.add(processTaskAsyncCreateVo.getId()); @@ -121,9 +120,7 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate if (processTaskAsyncCreate != null) { processTaskCreatePublicCrossoverService.createProcessTask(processTaskAsyncCreate.getConfig()); } -// processTaskAsyncCreateVo.setStatus("done"); processTaskAsyncCreateMapper.deleteProcessTaskAsyncCreateById(task.getId()); - int i = 10 / 0; } catch (InterruptedException e) { if (task != null) { ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = new ProcessTaskAsyncCreateVo(); -- Gitee From 46d1235896187a477946dc699e9cbea6bf3d8b4f Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Thu, 8 Aug 2024 10:17:37 +0800 Subject: [PATCH 13/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../api/processtask/ProcessTaskImportFromExcelApi.java | 4 ++-- .../process/service/ProcessTaskCreatePublicService.java | 2 +- .../process/service/ProcessTaskCreatePublicServiceImpl.java | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskImportFromExcelApi.java b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskImportFromExcelApi.java index a6a47be8e..4461c4fba 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskImportFromExcelApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskImportFromExcelApi.java @@ -200,8 +200,8 @@ public class ProcessTaskImportFromExcelApi extends PrivateBinaryStreamApiCompone auditVo.setTitle(task.getTitle()); auditVo.setOwner(task.getOwner()); try { - JSONObject resultObj = processTaskCreatePublicService.createProcessTask(task); - auditVo.setProcessTaskId(resultObj.getLong("processTaskId")); + Long processTaskId = processTaskCreatePublicService.createProcessTask(task); + auditVo.setProcessTaskId(processTaskId); auditVo.setStatus(1); successCount++; } catch (Exception e) { diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicService.java b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicService.java index d9d9a4850..9ec30f6d4 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicService.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicService.java @@ -11,5 +11,5 @@ public interface ProcessTaskCreatePublicService { * @return * @throws Exception */ - JSONObject createProcessTask(ProcessTaskCreateVo processTaskCreateVo) throws Exception; + Long createProcessTask(ProcessTaskCreateVo processTaskCreateVo) throws Exception; } diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java index 6ff34e05f..4b94975a0 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskCreatePublicServiceImpl.java @@ -95,7 +95,7 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl * @throws Exception */ @Override - public JSONObject createProcessTask(ProcessTaskCreateVo processTaskCreateVo) throws Exception { + public Long createProcessTask(ProcessTaskCreateVo processTaskCreateVo) throws Exception { JSONObject result = new JSONObject(); JSONObject paramObj = new JSONObject(); paramObj.put("title", processTaskCreateVo.getTitle()); @@ -351,7 +351,6 @@ public class ProcessTaskCreatePublicServiceImpl implements ProcessTaskCreatePubl processTaskService.startProcessProcessTask(saveResultObj); // } - result.put("processTaskId", processTaskId); - return result; + return processTaskId; } } -- Gitee From 6eb988ffd4fc73911914edb0521ff73176d508e2 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Thu, 8 Aug 2024 10:39:58 +0800 Subject: [PATCH 14/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../process/api/processtask/ProcessTaskCreateApi.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java index 75ed19448..c26d12c4d 100644 --- a/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java +++ b/src/main/java/neatlogic/module/process/api/processtask/ProcessTaskCreateApi.java @@ -69,15 +69,16 @@ public class ProcessTaskCreateApi extends PrivateApiComponentBase { @Description(desc = "nmpap.processtaskcreateapi.getname") @Override public Object myDoService(JSONObject jsonObj) throws Exception { + JSONObject resultObj = new JSONObject(); ProcessTaskCreateVo processTaskCreateVo = jsonObj.toJavaObject(ProcessTaskCreateVo.class); Integer isAsync = jsonObj.getInteger("isAsync"); if (Objects.equals(isAsync, 1)) { Long processTaskId = processTaskAsyncCreateService.addNewProcessTaskAsyncCreate(processTaskCreateVo); - JSONObject resultObj = new JSONObject(); resultObj.put("processTaskId", processTaskId); - return resultObj; } else { - return processTaskCreatePublicService.createProcessTask(processTaskCreateVo); + Long processTaskId = processTaskCreatePublicService.createProcessTask(processTaskCreateVo); + resultObj.put("processTaskId", processTaskId); } + return resultObj; } } -- Gitee From 53fef2a5892b009e9c7e56d8cb091ef8d3fd4e91 Mon Sep 17 00:00:00 2001 From: "1437892690@qq.com" <1437892690@qq.com> Date: Thu, 8 Aug 2024 11:35:52 +0800 Subject: [PATCH 15/15] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=20IT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1-=E5=A2=9E=E5=8A=A0=E5=9F=BA=E4=BA=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E7=9A=84=E5=BC=82=E6=AD=A5=E4=B8=8A=E6=8A=A5=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 关联 #[1212437796192256]IT服务-增加基于队列的异步上报工单方式 http://192.168.0.96:8090/demo/rdm.html#/story-detail/939050947543040/939050947543042/1212437796192256 --- .../ProcessTaskAsyncCreateServiceImpl.java | 46 ++++++------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java index 019b1367d..8a348767e 100644 --- a/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java +++ b/src/main/java/neatlogic/module/process/service/ProcessTaskAsyncCreateServiceImpl.java @@ -18,6 +18,7 @@ package neatlogic.module.process.service; import com.alibaba.fastjson.JSONObject; +import neatlogic.framework.asynchronization.queue.NeatLogicBlockingQueue; import neatlogic.framework.asynchronization.thread.NeatLogicThread; import neatlogic.framework.asynchronization.threadlocal.TenantContext; import neatlogic.framework.asynchronization.threadlocal.UserContext; @@ -42,7 +43,6 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -50,7 +50,7 @@ import java.util.stream.Collectors; public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreateService, IProcessTaskAsyncCreateCrossoverService { private static final Logger logger = LoggerFactory.getLogger(ProcessTaskAsyncCreateServiceImpl.class); - private final static BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); + private final static NeatLogicBlockingQueue blockingQueue = new NeatLogicBlockingQueue<>(new LinkedBlockingQueue<>()); @Resource private ProcessTaskAsyncCreateMapper processTaskAsyncCreateMapper; @@ -99,7 +99,7 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate } doingIdList.sort(Long::compareTo); for (Long id : doingIdList) { - boolean offer = blockingQueue.offer(new Task(id)); + boolean offer = blockingQueue.offer(id); if (!offer && logger.isDebugEnabled()) { logger.debug("异步创建工单数据加入队列失败, id: " + id); } @@ -112,19 +112,18 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate protected void execute() { IProcessTaskCreatePublicCrossoverService processTaskCreatePublicCrossoverService = CrossoverServiceFactory.getApi(IProcessTaskCreatePublicCrossoverService.class); while (!Thread.currentThread().isInterrupted()) { - Task task = null; + Long id = null; try { - task = blockingQueue.take(); - TenantContext.get().switchTenant(task.getTenantUuid()); - ProcessTaskAsyncCreateVo processTaskAsyncCreate = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateById(task.getId()); + id = blockingQueue.take(); + ProcessTaskAsyncCreateVo processTaskAsyncCreate = processTaskAsyncCreateMapper.getProcessTaskAsyncCreateById(id); if (processTaskAsyncCreate != null) { processTaskCreatePublicCrossoverService.createProcessTask(processTaskAsyncCreate.getConfig()); } - processTaskAsyncCreateMapper.deleteProcessTaskAsyncCreateById(task.getId()); + processTaskAsyncCreateMapper.deleteProcessTaskAsyncCreateById(id); } catch (InterruptedException e) { - if (task != null) { + if (id != null) { ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = new ProcessTaskAsyncCreateVo(); - processTaskAsyncCreateVo.setId(task.getId()); + processTaskAsyncCreateVo.setId(id); processTaskAsyncCreateVo.setStatus("failed"); processTaskAsyncCreateVo.setError(ExceptionUtils.getStackTrace(e)); processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); @@ -132,9 +131,9 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate Thread.currentThread().interrupt(); break; } catch (Exception e) { - if (task != null) { + if (id != null) { ProcessTaskAsyncCreateVo processTaskAsyncCreateVo = new ProcessTaskAsyncCreateVo(); - processTaskAsyncCreateVo.setId(task.getId()); + processTaskAsyncCreateVo.setId(id); processTaskAsyncCreateVo.setStatus("failed"); processTaskAsyncCreateVo.setError(ExceptionUtils.getStackTrace(e)); processTaskAsyncCreateMapper.updateProcessTaskAsyncCreate(processTaskAsyncCreateVo); @@ -148,25 +147,6 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate t.start(); } - private static class Task { - - private final Long id; - private final String tenantUuid; - - public Task(Long id) { - this.id = id; - this.tenantUuid = TenantContext.get().getTenantUuid(); - } - - public Long getId() { - return id; - } - - public String getTenantUuid() { - return tenantUuid; - } - } - @Override public Long addNewProcessTaskAsyncCreate(ProcessTaskCreateVo processTaskCreateVo) { if (processTaskCreateVo == null) { @@ -190,7 +170,7 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate processTaskAsyncCreateVo.setServerId(Config.SCHEDULE_SERVER_ID); processTaskAsyncCreateVo.setConfig(processTaskCreateVo); processTaskAsyncCreateMapper.insertProcessTaskAsyncCreate(processTaskAsyncCreateVo); - boolean offer = blockingQueue.offer(new Task(processTaskAsyncCreateVo.getId())); + boolean offer = blockingQueue.offer(processTaskAsyncCreateVo.getId()); if (!offer && logger.isDebugEnabled()) { logger.debug("异步创建工单数据加入队列失败, processTaskAsyncCreateVo: " + JSONObject.toJSONString(processTaskAsyncCreateVo)); } @@ -203,7 +183,7 @@ public class ProcessTaskAsyncCreateServiceImpl implements ProcessTaskAsyncCreate return; } - boolean offer = blockingQueue.offer(new Task(id)); + boolean offer = blockingQueue.offer(id); if (!offer && logger.isDebugEnabled()) { logger.debug("异步创建工单数据加入队列失败, id: " + id); } -- Gitee