diff --git a/src/main/java/neatlogic/module/autoexec/api/job/AutoexecJobPhaseListApi.java b/src/main/java/neatlogic/module/autoexec/api/job/AutoexecJobPhaseListApi.java index a20079831bfb75bd0ede7bbda7c1e0a32bd367c7..a60a496cfa9ccc64298dfcdc6ff5fb5f8e0cf1f6 100644 --- a/src/main/java/neatlogic/module/autoexec/api/job/AutoexecJobPhaseListApi.java +++ b/src/main/java/neatlogic/module/autoexec/api/job/AutoexecJobPhaseListApi.java @@ -134,6 +134,7 @@ public class AutoexecJobPhaseListApi extends PrivateApiComponentBase { if (jobSourceTypeHandler != null) { result.put("extraInfo",jobSourceTypeHandler.getExtraRefreshJobInfo(jobVo)); } + result.put("waitingDetail",autoexecJobService.getAutoexecJobWaitingDetail(jobVo.getId())); return result; } diff --git a/src/main/java/neatlogic/module/autoexec/api/job/GetAutoexecJobWaitingDetailApi.java b/src/main/java/neatlogic/module/autoexec/api/job/GetAutoexecJobWaitingDetailApi.java new file mode 100644 index 0000000000000000000000000000000000000000..4eb3e095e4996bcc94cd184f41a4b6810ab6be49 --- /dev/null +++ b/src/main/java/neatlogic/module/autoexec/api/job/GetAutoexecJobWaitingDetailApi.java @@ -0,0 +1,66 @@ +/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package neatlogic.module.autoexec.api.job; + +import com.alibaba.fastjson.JSONObject; +import neatlogic.framework.auth.core.AuthAction; +import neatlogic.framework.autoexec.auth.AUTOEXEC_BASE; +import neatlogic.framework.common.constvalue.ApiParamType; +import neatlogic.framework.restful.annotation.Description; +import neatlogic.framework.restful.annotation.Input; +import neatlogic.framework.restful.annotation.OperationType; +import neatlogic.framework.restful.annotation.Param; +import neatlogic.framework.restful.constvalue.OperationTypeEnum; +import neatlogic.framework.restful.core.privateapi.PrivateApiComponentBase; +import neatlogic.module.autoexec.service.AutoexecJobService; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +@AuthAction(action = AUTOEXEC_BASE.class) +@OperationType(type = OperationTypeEnum.SEARCH) +public class GetAutoexecJobWaitingDetailApi extends PrivateApiComponentBase { + @Resource + AutoexecJobService autoexecJobService; + + @Override + public String getName() { + return "nmaaj.getautoexecjobqueuestatusapi.getname"; + } + + @Override + public String getConfig() { + return null; + } + + @Input({ + @Param(name = "jobId", type = ApiParamType.LONG, isRequired = true, desc = "term.autoexec.jobid"), + @Param(name = "groupSort", type = ApiParamType.LONG, desc = "term.autoexec.groupid"), + @Param(name = "phaseId", type = ApiParamType.LONG, desc = "term.process.phaseid"), + }) + @Description(desc = "nmaaj.getautoexecjobqueuestatusapi.getname") + @Override + public Object myDoService(JSONObject jsonObj) throws Exception { + Long jobId = jsonObj.getLong("jobId"); + return autoexecJobService.getAutoexecJobWaitingDetail(jobId); + } + + @Override + public String getToken() { + return "autoexec/job/waiting/detail/get"; + } +} diff --git a/src/main/java/neatlogic/module/autoexec/api/job/action/CreateAutoexecJobFromOperationApi.java b/src/main/java/neatlogic/module/autoexec/api/job/action/CreateAutoexecJobFromOperationApi.java index 4c7d279353e7a95cc29adf0714ab1d4317086ddd..db7264de1737bcfe9acf7d4fdd5d8769898c6bb5 100644 --- a/src/main/java/neatlogic/module/autoexec/api/job/action/CreateAutoexecJobFromOperationApi.java +++ b/src/main/java/neatlogic/module/autoexec/api/job/action/CreateAutoexecJobFromOperationApi.java @@ -106,7 +106,6 @@ public class CreateAutoexecJobFromOperationApi extends PrivateApiComponentBase { AutoexecJobVo jobVo = JSON.toJavaObject(jsonObj, AutoexecJobVo.class); jobVo.setRunTimeParamList(combopVo.getConfig().getRuntimeParamList() == null ? new ArrayList<>() : combopVo.getConfig().getRuntimeParamList()); jobVo.setOperationType(type); - jobVo.setIsFirstFire(1); jobVo.setAction(JobAction.FIRE.getValue()); jobVo.setInvokeId(jobVo.getOperationId()); jobVo.setRouteId(jobVo.getOperationId().toString()); diff --git a/src/main/java/neatlogic/module/autoexec/api/job/exec/FireAutoexecJobNextGroupApi.java b/src/main/java/neatlogic/module/autoexec/api/job/exec/FireAutoexecJobNextGroupApi.java index 5d70b04b1068a0b0c373a44dd2d81b83ffb16da8..dc8e29ab10822ee9c024df8787fd07fc54398bac 100644 --- a/src/main/java/neatlogic/module/autoexec/api/job/exec/FireAutoexecJobNextGroupApi.java +++ b/src/main/java/neatlogic/module/autoexec/api/job/exec/FireAutoexecJobNextGroupApi.java @@ -103,6 +103,7 @@ public class FireAutoexecJobNextGroupApi extends PrivateApiComponentBase { AutoexecJobGroupVo nextGroupVo = autoexecJobMapper.getJobGroupByJobIdAndSort(jobId, groupSort + 1); if (nextGroupVo != null) { jobVo.setExecuteJobGroupVo(nextGroupVo); + jobVo.setIsFirstFire(0); IAutoexecJobActionHandler fireAction = AutoexecJobActionHandlerFactory.getAction(JobAction.FIRE.getValue()); fireAction.doService(jobVo); } diff --git a/src/main/java/neatlogic/module/autoexec/api/job/exec/UpdateAutoexecJobPhaseStatusApi.java b/src/main/java/neatlogic/module/autoexec/api/job/exec/UpdateAutoexecJobPhaseStatusApi.java index 90cf617843dd7e1464792bcd9c21ac507affe886..3aca50b3d30d16b00e0400c7f2f8df66cac568dd 100644 --- a/src/main/java/neatlogic/module/autoexec/api/job/exec/UpdateAutoexecJobPhaseStatusApi.java +++ b/src/main/java/neatlogic/module/autoexec/api/job/exec/UpdateAutoexecJobPhaseStatusApi.java @@ -99,6 +99,7 @@ public class UpdateAutoexecJobPhaseStatusApi extends PrivateApiComponentBase { String phaseRunnerStatus = jsonObj.getString("status"); Integer phaseRunnerWarnCount = jsonObj.getInteger("warnCount"); JSONObject passThroughEnv = jsonObj.getJSONObject("passThroughEnv"); + Integer isFirstFire = 0; Long runnerId = 0L; if (MapUtils.isNotEmpty(passThroughEnv)) { if (!passThroughEnv.containsKey("runnerId")) { @@ -106,11 +107,15 @@ public class UpdateAutoexecJobPhaseStatusApi extends PrivateApiComponentBase { } else { runnerId = passThroughEnv.getLong("runnerId"); } + if (passThroughEnv.containsKey("isFirstFire")) { + isFirstFire = passThroughEnv.getInteger("isFirstFire"); + } } AutoexecJobVo jobVo = autoexecJobMapper.getJobLockByJobId(jobId); if (jobVo == null) { throw new AutoexecJobNotFoundException(jobId.toString()); } + jobVo.setIsFirstFire(isFirstFire); //更新执行用户上下文 autoexecJobActionService.initExecuteUserContext(jobVo); @@ -165,7 +170,7 @@ public class UpdateAutoexecJobPhaseStatusApi extends PrivateApiComponentBase { finalJobPhaseStatus = JobPhaseStatus.COMPLETED.getValue(); } else if (statusCountMap.get(JobPhaseStatus.WAIT_INPUT.getValue()) > 0) { finalJobPhaseStatus = JobPhaseStatus.WAIT_INPUT.getValue(); - } else if (statusCountMap.get(JobPhaseStatus.RUNNING.getValue()) > 0) { + } else if (statusCountMap.get(JobPhaseStatus.RUNNING.getValue()) > 0 || statusCountMap.get(JobPhaseStatus.COMPLETED.getValue())>0) { finalJobPhaseStatus = JobPhaseStatus.RUNNING.getValue(); } else if (statusCountMap.get(JobPhaseStatus.FAILED.getValue()) > 0) { finalJobPhaseStatus = JobPhaseStatus.FAILED.getValue(); @@ -173,6 +178,8 @@ public class UpdateAutoexecJobPhaseStatusApi extends PrivateApiComponentBase { finalJobPhaseStatus = JobPhaseStatus.ABORTED.getValue(); } else if (statusCountMap.get(JobPhaseStatus.PAUSED.getValue()) > 0) { finalJobPhaseStatus = JobPhaseStatus.PAUSED.getValue(); + } else if (statusCountMap.get(JobPhaseStatus.WAITING.getValue()) > 0) { + finalJobPhaseStatus = JobPhaseStatus.WAITING.getValue(); } else { finalJobPhaseStatus = JobPhaseStatus.PENDING.getValue(); } diff --git a/src/main/java/neatlogic/module/autoexec/api/job/runner/AutoexecJobProcessStatusUpdateApi.java b/src/main/java/neatlogic/module/autoexec/api/job/runner/AutoexecJobProcessStatusUpdateApi.java index 85acd5961fdaa657c48fe4ce1a1fb769af6fab7f..61a64429ed161c135e908f04fa9465bc33f758f6 100644 --- a/src/main/java/neatlogic/module/autoexec/api/job/runner/AutoexecJobProcessStatusUpdateApi.java +++ b/src/main/java/neatlogic/module/autoexec/api/job/runner/AutoexecJobProcessStatusUpdateApi.java @@ -124,14 +124,16 @@ public class AutoexecJobProcessStatusUpdateApi extends PrivateApiComponentBase { List> phaseAbortingCountMapList = autoexecJobMapper.getJobPhaseRunnerAbortingCountMapCountByJobId(jobId); HashMap phaseIdAbortingCountMap = new HashMap<>(); for (HashMap phaseAbortingCountMap : phaseAbortingCountMapList) { - phaseIdAbortingCountMap.put(phaseAbortingCountMap.get("job_phase_id"), Integer.valueOf(phaseAbortingCountMap.get("count"))); + phaseIdAbortingCountMap.put(String.valueOf(phaseAbortingCountMap.get("job_phase_id")), Integer.valueOf(String.valueOf(phaseAbortingCountMap.get("count")))); } for (Long phaseId : jobPhaseIdList) { if (phaseIdAbortingCountMap.get(phaseId.toString()) == 0) { jobPhaseIdAbortedList.add(phaseId); } } - autoexecJobMapper.updateJobPhaseRunnerStatusBatch(jobPhaseIdAbortedList, JobPhaseStatus.ABORTED.getValue(), runnerId); + if(CollectionUtils.isNotEmpty(jobPhaseIdAbortedList)) { + autoexecJobMapper.updateJobPhaseRunnerStatusBatch(jobPhaseIdAbortedList, JobPhaseStatus.ABORTED.getValue(), runnerId); + } } if (StringUtils.isNotBlank(status)) { //4 diff --git a/src/main/java/neatlogic/module/autoexec/autoconfig/handler/AutoexecJobCleaner.java b/src/main/java/neatlogic/module/autoexec/autoconfig/handler/AutoexecJobCleaner.java index b4cadcfc9587636093e572e595f24c381abed815..020d010efabd57df1e05aab2645218a55c535661 100644 --- a/src/main/java/neatlogic/module/autoexec/autoconfig/handler/AutoexecJobCleaner.java +++ b/src/main/java/neatlogic/module/autoexec/autoconfig/handler/AutoexecJobCleaner.java @@ -83,7 +83,7 @@ public class AutoexecJobCleaner extends AuditCleanerBase { paramJson.put("passThroughEnv", new JSONObject() {{ put("runnerId", runner.getRunnerMapId()); }}); - HttpRequestUtil.post(url).setConnectTimeout(5000).setReadTimeout(10000).setAuthType(AuthenticateType.BUILDIN).setPayload(paramJson.toJSONString()).sendRequest(); + HttpRequestUtil.post(url).setAuthType(AuthenticateType.BUILDIN).setPayload(paramJson.toJSONString()).sendRequest(); } } } diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobAbortHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobAbortHandler.java index 1d954cb5a23912772cda04d4245ce2ce6158eca3..0b27df5b40d8fb50967fd4e7d98420fbcb7bf825 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobAbortHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobAbortHandler.java @@ -15,6 +15,7 @@ along with this program. If not, see .*/ package neatlogic.module.autoexec.job.action.handler; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import neatlogic.framework.asynchronization.threadlocal.TenantContext; import neatlogic.framework.asynchronization.threadlocal.UserContext; @@ -64,37 +65,44 @@ public class AutoexecJobAbortHandler extends AutoexecJobActionHandlerBase { } @Override - public boolean isNeedExecuteAuthCheck(){ + public boolean isNeedExecuteAuthCheck() { return true; } @Override public JSONObject doMyService(AutoexecJobVo jobVo) { + List abortingPhaseIdList = new ArrayList<>(); //更新job状态 为中止中 jobVo.setStatus(JobPhaseStatus.ABORTING.getValue()); autoexecJobMapper.updateJobStatus(jobVo); //更新phase状态 为中止中 jobVo.setPhaseList(autoexecJobMapper.getJobPhaseListWithGroupByJobId(jobVo.getId())); for (AutoexecJobPhaseVo jobPhase : jobVo.getPhaseList()) { - if (Arrays.asList(JobPhaseStatus.RUNNING.getValue(),JobPhaseStatus.WAITING.getValue(),JobPhaseStatus.WAIT_INPUT.getValue()).contains(jobPhase.getStatus())) { + if (Objects.equals(JobPhaseStatus.ABORTING.getValue(), jobPhase.getStatus())) { + abortingPhaseIdList.add(jobPhase.getId()); + } else if (Arrays.asList(JobPhaseStatus.RUNNING.getValue(), JobPhaseStatus.WAITING.getValue(), JobPhaseStatus.WAIT_INPUT.getValue()).contains(jobPhase.getStatus())) { jobPhase.setStatus(JobStatus.ABORTING.getValue()); + abortingPhaseIdList.add(jobPhase.getId()); autoexecJobMapper.updateJobPhaseStatus(jobPhase); autoexecJobMapper.updateBatchJobPhaseRunnerStatus(jobPhase.getId(), JobPhaseStatus.ABORTING.getValue()); } } //更新node状态 为中止中 - List nodeVoList = autoexecJobMapper.getJobPhaseNodeListByJobIdAndNodeStatusList(jobVo.getId(), Collections.singletonList(JobNodeStatus.RUNNING.getValue())); + List nodeVoList = autoexecJobMapper.getJobPhaseNodeListByJobIdAndNodeStatusList(jobVo.getId(), Arrays.asList(JobPhaseStatus.WAITING.getValue(), JobNodeStatus.RUNNING.getValue())); for (AutoexecJobPhaseNodeVo nodeVo : nodeVoList) { nodeVo.setStatus(JobNodeStatus.ABORTING.getValue()); autoexecJobMapper.updateJobPhaseNodeStatus(nodeVo); } - List runnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdListAndStatus(jobVo.getId(), jobVo.getPhaseIdList(),JobNodeStatus.ABORTING.getValue()); + List runnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdListAndStatus(jobVo.getId(), jobVo.getPhaseIdList(), JobNodeStatus.ABORTING.getValue()); if (CollectionUtils.isEmpty(runnerVos)) { + if (CollectionUtils.isNotEmpty(abortingPhaseIdList)) { + autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(abortingPhaseIdList, JobPhaseStatus.ABORTED.getValue()); + } jobVo.setStatus(JobStatus.ABORTED.getValue()); autoexecJobMapper.updateJobStatus(jobVo); - }else { - runnerVos = runnerVos.stream().filter(o->StringUtils.isNotBlank(o.getUrl())).collect(collectingAndThen(toCollection(() -> new TreeSet<>( Comparator.comparing(RunnerMapVo::getUrl))), ArrayList::new)); + } else { + runnerVos = runnerVos.stream().filter(o -> StringUtils.isNotBlank(o.getUrl())).collect(collectingAndThen(toCollection(() -> new TreeSet<>(Comparator.comparing(RunnerMapVo::getUrl))), ArrayList::new)); autoexecJobService.checkRunnerHealth(runnerVos); JSONObject paramJson = new JSONObject(); paramJson.put("jobId", jobVo.getId()); @@ -111,15 +119,22 @@ public class AutoexecJobAbortHandler extends AutoexecJobActionHandlerBase { url = runner.getUrl() + "api/rest/job/abort"; RestVo restVo = new RestVo.Builder(url, AuthenticateType.BUILDIN.getValue()).setPayload(paramJson).build(); result = RestUtil.sendPostRequest(restVo); - JSONObject resultJson = JSONObject.parseObject(result); + JSONObject resultJson = JSON.parseObject(result); if (!resultJson.containsKey("Status") || !"OK".equals(resultJson.getString("Status"))) { throw new RunnerHttpRequestException(restVo.getUrl() + ":" + resultJson.getString("Message")); } + autoexecJobMapper.updateJobPhaseRunnerStatus(abortingPhaseIdList, runner.getRunnerMapId(), JobPhaseStatus.ABORTED.getValue()); } } catch (Exception ex) { logger.error(ex.getMessage(), ex); throw new RunnerConnectRefusedException(url + " " + result); } + if (autoexecJobMapper.getJobPhaseRunnerStatusCountByJobIdAndStatus(jobVo.getId(), JobPhaseStatus.ABORTING.getValue()) == 0) { + if (CollectionUtils.isNotEmpty(abortingPhaseIdList)) { + autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(abortingPhaseIdList, JobPhaseStatus.ABORTED.getValue()); + } + autoexecJobMapper.updateJobStatus(new AutoexecJobVo(jobVo.getId(), JobPhaseStatus.ABORTED.getValue())); + } } return null; } diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobConsoleLogAuditListHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobConsoleLogAuditListHandler.java index 5630f6db631aa7f09da12f3ffe3877f63bbe6b21..82f41c2032d6a81b9e824786728214d0b886d615 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobConsoleLogAuditListHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobConsoleLogAuditListHandler.java @@ -75,7 +75,7 @@ public class AutoexecJobConsoleLogAuditListHandler extends AutoexecJobActionHand JSONObject result = new JSONObject(); JSONObject paramObj = jobVo.getActionParam(); String url = paramObj.getString("runnerUrl") + "/api/rest/job/console/log/audit/list"; - HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setConnectTimeout(5000).setReadTimeout(5000).setPayload(paramObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); + HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); if(StringUtils.isNotBlank(requestUtil.getError())){ throw new RunnerConnectRefusedException(url); } diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobFireHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobFireHandler.java index 9b525ca41229c1687bdd3f883e5e670f49c8803a..aabc892bc45ab35d8c3534d5df65660f1b44c0ff 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobFireHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobFireHandler.java @@ -56,6 +56,7 @@ public class AutoexecJobFireHandler extends AutoexecJobActionHandlerBase { @Override public JSONObject doMyService(AutoexecJobVo jobVo) { autoexecJobMapper.getJobLockByJobId(jobVo.getId()); + autoexecJobService.fireOrResetRefireWaiting(jobVo); autoexecJobService.executeGroup(jobVo); return new JSONObject(){{ put("jobId",jobVo.getId()); diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPauseHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPauseHandler.java index deb481670e7fdacb14d8d1c771d263ccf24ae8e0..1bbcc27ff5892cf2b6e3c889b4722ed3e1300d97 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPauseHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPauseHandler.java @@ -103,7 +103,7 @@ public class AutoexecJobPauseHandler extends AutoexecJobActionHandlerBase { //put("phaseSort", jobVo.getCurrentGroupSort()); }}); String url = runner.getUrl() + "api/rest/job/pause"; - HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000).sendRequest(); + HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); if (StringUtils.isNotBlank(requestUtil.getError())) { throw new RunnerHttpRequestException(url + ":" + requestUtil.getError()); } diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseReFireHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseReFireHandler.java index d991f84c9a2bfd4383b6b3c03568eaca5e0cc52d..8b401d7f2c84fa2adf45e02f4f9ccb07ce5306ee 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseReFireHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseReFireHandler.java @@ -22,6 +22,7 @@ import neatlogic.framework.autoexec.constvalue.JobPhaseStatus; import neatlogic.framework.autoexec.constvalue.JobStatus; import neatlogic.framework.autoexec.dao.mapper.AutoexecJobMapper; import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseNodeVo; +import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseRunnerVo; import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseVo; import neatlogic.framework.autoexec.dto.job.AutoexecJobVo; import neatlogic.framework.autoexec.exception.AutoexecJobPhaseRunnerNotFoundException; @@ -72,8 +73,13 @@ public class AutoexecJobPhaseReFireHandler extends AutoexecJobActionHandlerBase @Override public JSONObject doMyService(AutoexecJobVo jobVo) { AutoexecJobPhaseVo jobPhaseVo = jobVo.getExecuteJobPhaseList().get(0); - jobVo.setStatus(JobStatus.RUNNING.getValue()); - autoexecJobMapper.updateJobStatus(jobVo); + AutoexecJobPhaseVo phaseVo = autoexecJobMapper.getJobPhaseByJobIdAndPhaseStatus(jobVo.getId(), JobPhaseStatus.RUNNING.getValue()); + //存在进行中的阶段不修改作业状态 + if (phaseVo == null) { + jobVo.setStatus(JobStatus.WAITING.getValue()); + autoexecJobMapper.updateJobStatus(jobVo); + } + jobVo.setIsFirstFire(0); jobPhaseVo.setStatus(JobPhaseStatus.RUNNING.getValue()); autoexecJobMapper.updateJobPhaseStatus(jobPhaseVo); //如果是sqlfile类型的phase 需额外清除状态 @@ -84,6 +90,11 @@ public class AutoexecJobPhaseReFireHandler extends AutoexecJobActionHandlerBase resetPhase(jobVo); autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(jobVo.getExecuteJobPhaseList().stream().map(AutoexecJobPhaseVo::getId).collect(Collectors.toList()), JobPhaseStatus.PENDING.getValue()); autoexecJobService.refreshJobPhaseNodeList(jobVo.getId(), jobVo.getExecuteJobPhaseList()); + autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(jobPhaseVo.getId()), JobPhaseStatus.WAITING.getValue()); + List jobPhaseRunnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdList(jobVo.getId(), Collections.singletonList(jobPhaseVo.getId())); + for (AutoexecJobPhaseRunnerVo jobPhaseRunnerVo : jobPhaseRunnerVos) { + autoexecJobMapper.updateJobPhaseRunnerStatus(Collections.singletonList(jobPhaseVo.getId()), jobPhaseRunnerVo.getRunnerMapId(), JobPhaseStatus.WAITING.getValue()); + } } if (Objects.equals(jobVo.getAction(), JobAction.REFIRE.getValue())) { List needResetNodeList = autoexecJobMapper.getJobPhaseNodeListByJobIdAndPhaseIdAndExceptStatus(jobPhaseVo.getId(), Arrays.asList(JobNodeStatus.IGNORED.getValue(), JobNodeStatus.SUCCEED.getValue(), JobNodeStatus.INVALID.getValue())); @@ -94,13 +105,13 @@ public class AutoexecJobPhaseReFireHandler extends AutoexecJobActionHandlerBase throw new AutoexecJobPhaseRunnerNotFoundException(jobPhaseVo.getJobId(), jobPhaseVo.getName(), jobPhaseVo.getId()); } autoexecJobService.updateJobNodeStatus(runnerMapVos, jobVo, JobNodeStatus.PENDING.getValue()); - autoexecJobMapper.updateJobPhaseNodeListStatusByPhaseIdAndExceptStatus(jobPhaseVo.getId(), Arrays.asList(JobNodeStatus.IGNORED.getValue(), JobNodeStatus.SUCCEED.getValue(), JobNodeStatus.INVALID.getValue()),JobNodeStatus.PENDING.getValue()); + autoexecJobMapper.updateJobPhaseNodeListStatusByPhaseIdAndExceptStatus(jobPhaseVo.getId(), Arrays.asList(JobNodeStatus.IGNORED.getValue(), JobNodeStatus.SUCCEED.getValue(), JobNodeStatus.INVALID.getValue()), JobNodeStatus.PENDING.getValue()); jobVo.setExecuteJobNodeVoList(null); } } Integer pendingCount = autoexecJobMapper.isHasPendingNode(jobPhaseVo.getId()); - if(pendingCount == null){ - autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(jobPhaseVo.getId()),JobPhaseStatus.COMPLETED.getValue()); + if (pendingCount == null) { + autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(jobPhaseVo.getId()), JobPhaseStatus.COMPLETED.getValue()); return null; } jobPhaseVo.setJobGroupVo(autoexecJobMapper.getJobGroupById(jobPhaseVo.getGroupId())); @@ -128,7 +139,7 @@ public class AutoexecJobPhaseReFireHandler extends AutoexecJobActionHandlerBase put("runnerId", runner.getRunnerMapId()); }}); - HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setConnectTimeout(5000).setReadTimeout(5000).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); + HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); if (StringUtils.isNotBlank(requestUtil.getError())) { throw new RunnerConnectRefusedException(url); } diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseRoundInformHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseRoundInformHandler.java index 8362c52abd5c23e77aa00ccbe46118c3c6696e18..92d1ddf67b7304fd88af1be7750c0f00181a85bf 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseRoundInformHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseRoundInformHandler.java @@ -80,7 +80,7 @@ public class AutoexecJobPhaseRoundInformHandler extends AutoexecJobActionHandler for (RunnerMapVo runnerVo : runnerVos) { String url = String.format("%s/api/rest/job/phase/socket/write", runnerVo.getUrl()); String result = HttpRequestUtil.post(url) - .setPayload(jsonObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000) + .setPayload(jsonObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN) .sendRequest().getError(); if (StringUtils.isNotBlank(result)) { throw new RunnerHttpRequestException(url + ":" + result); diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobReFireHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobReFireHandler.java index 94d0469ef1b6ef6a707891a963d7e23bfe6c5ff0..13f0a579be6ace899c4c70b21a9056cb36c32083 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobReFireHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobReFireHandler.java @@ -17,9 +17,11 @@ package neatlogic.module.autoexec.job.action.handler; import com.alibaba.fastjson.JSONObject; import neatlogic.framework.autoexec.constvalue.JobAction; +import neatlogic.framework.autoexec.constvalue.JobNodeStatus; import neatlogic.framework.autoexec.constvalue.JobPhaseStatus; import neatlogic.framework.autoexec.constvalue.JobStatus; import neatlogic.framework.autoexec.dao.mapper.AutoexecJobMapper; +import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseRunnerVo; import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseVo; import neatlogic.framework.autoexec.dto.job.AutoexecJobVo; import neatlogic.framework.autoexec.exception.AutoexecJobActionInvalidException; @@ -40,6 +42,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -82,17 +85,17 @@ public class AutoexecJobReFireHandler extends AutoexecJobActionHandlerBase { jobVo.setExecuteJobGroupVo(autoexecJobMapper.getJobGroupByJobIdAndSort(jobVo.getId(), 0)); //重刷所有phase node autoexecJobService.refreshJobNodeList(jobVo.getId()); + autoexecJobService.fireOrResetRefireWaiting(jobVo); //更新没有删除的节点为"未开始"状态 //autoexecJobMapper.updateJobPhaseNodeStatusByJobIdAndIsDelete(jobVo.getId(), JobNodeStatus.PENDING.getValue(), 0); - jobVo.setIsFirstFire(1); } else if (Objects.equals(jobVo.getAction(), JobAction.REFIRE.getValue())) { /*寻找中止|暂停|失败的phase * 1、寻找pending|aborted|paused|failed phaseList * 2、没有满足1条件的,再寻找pending|aborted|paused|failed node 最小sort phaseList */ - List autoexecJobPhaseVos = autoexecJobMapper.getJobPhaseListByJobIdAndPhaseStatus(jobVo.getId(), Arrays.asList(JobPhaseStatus.PENDING.getValue(), JobPhaseStatus.ABORTED.getValue(), JobPhaseStatus.PAUSED.getValue(), JobPhaseStatus.FAILED.getValue())); + List autoexecJobPhaseVos = autoexecJobMapper.getJobPhaseListByJobIdAndPhaseStatus(jobVo.getId(), Arrays.asList(JobPhaseStatus.PENDING.getValue(), JobPhaseStatus.WAITING.getValue(),JobPhaseStatus.ABORTED.getValue(), JobPhaseStatus.PAUSED.getValue(), JobPhaseStatus.FAILED.getValue())); if (CollectionUtils.isEmpty(autoexecJobPhaseVos)) { - autoexecJobPhaseVos = autoexecJobMapper.getJobPhaseListByJobIdAndNodeStatusList(jobVo.getId(), Arrays.asList(JobPhaseStatus.PENDING.getValue(), JobPhaseStatus.ABORTED.getValue(), JobPhaseStatus.PAUSED.getValue(), JobPhaseStatus.FAILED.getValue())); + autoexecJobPhaseVos = autoexecJobMapper.getJobPhaseListByJobIdAndNodeStatusList(jobVo.getId(), Arrays.asList(JobNodeStatus.PENDING.getValue(),JobNodeStatus.WAITING.getValue(), JobNodeStatus.ABORTED.getValue(), JobNodeStatus.PAUSED.getValue(), JobNodeStatus.FAILED.getValue())); } //如果都成功了则无须重跑 if (CollectionUtils.isEmpty(autoexecJobPhaseVos)) { @@ -104,10 +107,17 @@ public class AutoexecJobReFireHandler extends AutoexecJobActionHandlerBase { autoexecJobMapper.updateJobStatus(jobVo); return null; } - jobVo.setStatus(JobStatus.PENDING.getValue()); + jobVo.setStatus(JobStatus.WAITING.getValue()); autoexecJobMapper.updateJobStatus(jobVo); autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(autoexecJobPhaseVos.stream().map(AutoexecJobPhaseVo::getId).collect(Collectors.toList()), JobPhaseStatus.PENDING.getValue()); jobVo.setExecuteJobGroupVo(autoexecJobPhaseVos.get(0).getJobGroupVo()); + //重置需要重跑的第一个phase的状态为waiting + AutoexecJobPhaseVo firstPhase = autoexecJobPhaseVos.get(0); + autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(firstPhase.getId()), JobPhaseStatus.WAITING.getValue()); + List jobPhaseRunnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdList(jobVo.getId(), Collections.singletonList(firstPhase.getId())); + for (AutoexecJobPhaseRunnerVo jobPhaseRunnerVo : jobPhaseRunnerVos) { + autoexecJobMapper.updateJobPhaseRunnerStatus(Collections.singletonList(firstPhase.getId()), jobPhaseRunnerVo.getRunnerMapId(), JobPhaseStatus.WAITING.getValue()); + } autoexecJobService.getAutoexecJobDetail(jobVo); if (CollectionUtils.isNotEmpty(jobVo.getPhaseList())) { new AutoexecJobAuthActionManager.Builder().addReFireJob().build().setAutoexecJobAction(jobVo); @@ -141,7 +151,7 @@ public class AutoexecJobReFireHandler extends AutoexecJobActionHandlerBase { //put("phaseSort", jobVo.getCurrentGroupSort()); }}); - HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000).sendRequest(); + HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); if (StringUtils.isNotBlank(requestUtil.getError())) { throw new RunnerHttpRequestException(url + ":" + requestUtil.getError()); } diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeOperationListHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeOperationListHandler.java index 4060fe8cc57d28f095e0c0ab33d84762e6ff1870..b595cffb6b7bdfc57ab03e6b2c0c2f665f282bf9 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeOperationListHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeOperationListHandler.java @@ -63,6 +63,7 @@ public class AutoexecJobNodeOperationListHandler extends AutoexecJobActionHandle paramJson.put("resourceId", nodeVo.getResourceId()); paramJson.put("phase", nodeVo.getJobPhaseName()); paramJson.put("phaseId", nodeVo.getJobPhaseId()); + paramJson.put("nodeId",nodeVo.getId()); paramJson.put("ip", nodeVo.getHost()); paramJson.put("port", nodeVo.getPort()); paramJson.put("runnerUrl", nodeVo.getRunnerUrl()); diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeReFireHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeReFireHandler.java index 444feacf5d9316378e414e36b7740fdf2100d36d..4e13f443e36dfe0df1a70f9ed454eac64fb3fd04 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeReFireHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeReFireHandler.java @@ -19,8 +19,10 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import neatlogic.framework.autoexec.constvalue.ExecMode; import neatlogic.framework.autoexec.constvalue.JobAction; +import neatlogic.framework.autoexec.constvalue.JobNodeStatus; import neatlogic.framework.autoexec.constvalue.JobPhaseStatus; import neatlogic.framework.autoexec.dao.mapper.AutoexecJobMapper; +import neatlogic.framework.autoexec.dto.job.AutoexecJobGroupVo; import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseNodeVo; import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseVo; import neatlogic.framework.autoexec.dto.job.AutoexecJobVo; @@ -61,13 +63,13 @@ public class AutoexecJobNodeReFireHandler extends AutoexecJobActionHandlerBase { currentPhaseIdValid(jobVo); JSONObject jsonObj = jobVo.getActionParam(); List resourceIdList = JSONObject.parseArray(jsonObj.getJSONArray("resourceIdList").toJSONString(), Long.class); - if(CollectionUtils.isEmpty(resourceIdList)){ + if (CollectionUtils.isEmpty(resourceIdList)) { throw new ParamIrregularException("resourceIdList"); } List nodeVoList; if (Objects.equals(jobVo.getCurrentPhase().getExecMode(), ExecMode.SQL.getValue())) { JSONArray sqlIdArray = jobVo.getActionParam().getJSONArray("sqlIdList"); - if(CollectionUtils.isEmpty(sqlIdArray)){ + if (CollectionUtils.isEmpty(sqlIdArray)) { throw new ParamIrregularException("sqlIdList"); } IAutoexecJobSource jobSource = AutoexecJobSourceFactory.getEnumInstance(jobVo.getSource()); @@ -76,10 +78,10 @@ public class AutoexecJobNodeReFireHandler extends AutoexecJobActionHandlerBase { } nodeVoList = AutoexecJobSourceTypeHandlerFactory.getAction(jobSource.getType()).getJobNodeListBySqlIdList(sqlIdArray.toJavaList(Long.class)); jobVo.setJobPhaseNodeSqlList(nodeVoList); - }else { + } else { nodeVoList = autoexecJobMapper.getJobPhaseNodeListByJobPhaseIdAndResourceIdList(jobVo.getCurrentPhaseId(), resourceIdList); - //重置节点开始和结束时间,以防 失败节点直接"重跑"导致耗时异常 - autoexecJobMapper.updateJobPhaseNodeResetStartTimeAndEndTimeByNodeIdList(nodeVoList.stream().map(AutoexecJobPhaseNodeVo::getId).collect(Collectors.toList())); + List nodeIdList = nodeVoList.stream().map(AutoexecJobPhaseNodeVo::getId).collect(Collectors.toList()); + autoexecJobMapper.updateJobPhaseNodeListStatus(nodeIdList, JobNodeStatus.WAITING.getValue()); } jobVo.setExecuteJobNodeVoList(nodeVoList); //校验是否和当前phaseId一致 @@ -100,10 +102,12 @@ public class AutoexecJobNodeReFireHandler extends AutoexecJobActionHandlerBase { public JSONObject doMyService(AutoexecJobVo jobVo) { //重跑单个节点无需激活下个phase jobVo.setIsNoFireNext(1); - //跟新phase状态为running + jobVo.setIsFirstFire(0); AutoexecJobPhaseVo phaseVo = jobVo.getCurrentPhase(); - phaseVo.setStatus(JobPhaseStatus.RUNNING.getValue()); + phaseVo.setStatus(JobPhaseStatus.WAITING.getValue()); autoexecJobMapper.updateJobPhaseStatus(phaseVo); + AutoexecJobGroupVo jobGroupVo = autoexecJobMapper.getJobGroupById(phaseVo.getGroupId()); + jobVo.setExecuteJobGroupVo(jobGroupVo); jobVo.setExecuteJobPhaseList(Collections.singletonList(phaseVo)); autoexecJobService.executeNode(jobVo); return null; diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeSubmitWaitInputHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeSubmitWaitInputHandler.java index 8e554d217879697120308a2d50d975e2d88ab3ae..6967acb9fc07d323f04b7dc6e61aa3991c9c23a1 100644 --- a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeSubmitWaitInputHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeSubmitWaitInputHandler.java @@ -92,7 +92,7 @@ public class AutoexecJobNodeSubmitWaitInputHandler extends AutoexecJobActionHand } String url = String.format("%s/api/rest/job/phase/node/submit/waitInput", nodeVo.getRunnerUrl()); String result = HttpRequestUtil.post(url) - .setPayload(paramObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000) + .setPayload(paramObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN) .sendRequest().getError(); if (StringUtils.isNotBlank(result)) { throw new RunnerHttpRequestException(url + ":" + result); diff --git a/src/main/java/neatlogic/module/autoexec/job/source/action/AutoexecJobSourceTypeHandler.java b/src/main/java/neatlogic/module/autoexec/job/source/action/AutoexecJobSourceTypeHandler.java index d0d4d3a44abc8e340191ef43a69a3f03c81ddea7..1994ab46a32d7bb24396559890802e7cca15b8e5 100644 --- a/src/main/java/neatlogic/module/autoexec/job/source/action/AutoexecJobSourceTypeHandler.java +++ b/src/main/java/neatlogic/module/autoexec/job/source/action/AutoexecJobSourceTypeHandler.java @@ -292,7 +292,7 @@ public class AutoexecJobSourceTypeHandler extends AutoexecJobSourceTypeHandlerBa throw new RunnerGroupRunnerNotFoundException(networkVo.getGroupId()); } if (CollectionUtils.isEmpty(groupVo.getRunnerMapList())) { - throw new RunnerGroupRunnerNotFoundException(groupVo.getName(), networkVo.getName() + "(" + networkVo.getGroupId() + ") "); + throw new RunnerGroupRunnerNotFoundException(groupVo.getName(), networkVo.getNetworkIp()); } runnerMapVos = groupVo.getRunnerMapList(); break; diff --git a/src/main/java/neatlogic/module/autoexec/process/stephandler/component/CreateJobProcessComponent.java b/src/main/java/neatlogic/module/autoexec/process/stephandler/component/CreateJobProcessComponent.java index 13eeb3ade665ca4fb898adc0045b5071514ecff1..60e683d0c9a11112cf06b63cd40b2735b98147da 100644 --- a/src/main/java/neatlogic/module/autoexec/process/stephandler/component/CreateJobProcessComponent.java +++ b/src/main/java/neatlogic/module/autoexec/process/stephandler/component/CreateJobProcessComponent.java @@ -299,7 +299,6 @@ public class CreateJobProcessComponent extends ProcessStepHandlerBase { jobVo.setInvokeId(processTaskStepVo.getId()); jobVo.setRouteId(processTaskStepVo.getId().toString()); jobVo.setSource(AutoExecJobProcessSource.ITSM.getValue()); - jobVo.setIsFirstFire(1); jobVo.setAssignExecUser(SystemUser.SYSTEM.getUserUuid()); try { autoexecJobActionService.validateCreateJob(jobVo); diff --git a/src/main/java/neatlogic/module/autoexec/schedule/plugin/AutoexecScheduleJob.java b/src/main/java/neatlogic/module/autoexec/schedule/plugin/AutoexecScheduleJob.java index 43615975ca4d0fe6b26d39f19003c43570c4621d..1e3f3ba2767b159682da69e955161a42e0a374f8 100644 --- a/src/main/java/neatlogic/module/autoexec/schedule/plugin/AutoexecScheduleJob.java +++ b/src/main/java/neatlogic/module/autoexec/schedule/plugin/AutoexecScheduleJob.java @@ -142,7 +142,6 @@ public class AutoexecScheduleJob extends JobBase { jobVo.setInvokeId(autoexecScheduleVo.getId()); jobVo.setRouteId(autoexecScheduleVo.getId().toString()); jobVo.setOperationType(CombopOperationType.COMBOP.getValue()); - jobVo.setIsFirstFire(1); UserVo fcuVo = userMapper.getUserByUuid(autoexecScheduleVo.getFcu()); AuthenticationInfoVo authenticationInfoVo = authenticationInfoService.getAuthenticationInfo(autoexecScheduleVo.getFcu()); UserContext.init(fcuVo, authenticationInfoVo, SystemUser.SYSTEM.getTimezone()); diff --git a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobActionServiceImpl.java b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobActionServiceImpl.java index 5ceccc33f07e39634c023d985ff7790f7d4dedde..a390170e37a78424c4b1180ea8bee61934af275e 100644 --- a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobActionServiceImpl.java +++ b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobActionServiceImpl.java @@ -468,7 +468,6 @@ public class AutoexecJobActionServiceImpl implements AutoexecJobActionService, I jobVo.setExecuteJobGroupVo(autoexecJobMapper.getJobGroupByJobIdAndSort(jobVo.getId(), 0)); autoexecJobService.getAutoexecJobDetail(jobVo); IAutoexecJobActionHandler fireAction = AutoexecJobActionHandlerFactory.getAction(JobAction.FIRE.getValue()); - jobVo.setIsFirstFire(1); fireAction.doService(jobVo); } } @@ -498,7 +497,6 @@ public class AutoexecJobActionServiceImpl implements AutoexecJobActionService, I if (jobVo.getTriggerType() == null || (Objects.equals(JobTriggerType.AUTO.getValue(), jobVo.getTriggerType()) && jobVo.getPlanStartTime().getTime() <= System.currentTimeMillis())) { IAutoexecJobActionHandler fireAction = AutoexecJobActionHandlerFactory.getAction(JobAction.FIRE.getValue()); jobVo.setAction(JobAction.FIRE.getValue()); - jobVo.setIsFirstFire(1); fireAction.doService(jobVo); } else if (Objects.equals(JobTriggerType.AUTO.getValue(), jobVo.getTriggerType()) && jobVo.getPlanStartTime() != null) { IJob jobHandler = SchedulerManager.getHandler(AutoexecJobAutoFireJob.class.getName()); diff --git a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobService.java b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobService.java index cf5a615a8026c1930e2bdc10b3c799a38e22341d..7ba3a799dd07d0cfb9474792f89590707b8fa9fc 100644 --- a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobService.java +++ b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobService.java @@ -15,6 +15,7 @@ along with this program. If not, see .*/ package neatlogic.module.autoexec.service; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import neatlogic.framework.autoexec.constvalue.JobAction; import neatlogic.framework.autoexec.dto.AutoexecParamVo; @@ -279,4 +280,18 @@ public interface AutoexecJobService { */ void updateJobPhaseNode(AutoexecJobVo jobVo, List resourceVoList, String userName, Long protocolId); + + /** + * 作业第一次跑和重置后重跑作业更新排队等待 + * @param jobVo 作业 + */ + void fireOrResetRefireWaiting(AutoexecJobVo jobVo); + + /** + * 根据作业id获取作业等待详情 + * + * @param jobId 作业id + */ + JSONArray getAutoexecJobWaitingDetail(Long jobId); + } diff --git a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobServiceImpl.java b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobServiceImpl.java index e157aa916eec4fc5495eee2390ecb92c3d6dc4c7..bb11b626254b4882273523960c2a8af70aa7f264 100644 --- a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobServiceImpl.java +++ b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobServiceImpl.java @@ -52,12 +52,10 @@ import neatlogic.framework.dao.mapper.runner.RunnerMapper; import neatlogic.framework.deploy.crossover.IDeploySqlCrossoverMapper; import neatlogic.framework.dto.RestVo; import neatlogic.framework.dto.runner.RunnerMapVo; +import neatlogic.framework.exception.core.ApiRuntimeException; import neatlogic.framework.exception.runner.*; import neatlogic.framework.integration.authentication.enums.AuthenticateType; -import neatlogic.framework.util.$; -import neatlogic.framework.util.HttpRequestUtil; -import neatlogic.framework.util.RestUtil; -import neatlogic.framework.util.SnowflakeUtil; +import neatlogic.framework.util.*; import neatlogic.module.autoexec.dao.mapper.AutoexecCombopVersionMapper; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -970,7 +968,6 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC } - /** * 跟新作业阶段阶段 * @@ -1268,8 +1265,15 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC autoexecJobVo.setConfigStr(jobContent.getContent()); List statusList = new ArrayList<>(); String url = paramJson.getString("runnerUrl") + "/api/rest/job/phase/node/status/get"; - JSONObject statusJson = JSONObject.parseObject(AutoexecUtil.requestRunner(url, paramJson)); - AutoexecJobPhaseNodeVo nodeVo = new AutoexecJobPhaseNodeVo(statusJson); + JSONObject statusJson = null; + AutoexecJobPhaseNodeVo nodeVo = autoexecJobMapper.getJobPhaseNodeInfoByJobNodeId(paramJson.getLong("nodeId")); + try { + statusJson = JSON.parseObject(AutoexecUtil.requestRunner(url, paramJson)); + } catch (Exception ignored) { + //ignored + } + + if (isNeedOperationList) { IAutoexecJobSource jobSource = AutoexecJobSourceFactory.getEnumInstance(autoexecJobVo.getSource()); if (jobSource == null) { @@ -1324,6 +1328,10 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC } } + if (MapUtils.isNotEmpty(statusJson)) { + nodeVo.setStatus(statusJson.getString("status")); + nodeVo.setInteractStr(statusJson.getString("interact")); + } nodeVo.setOperationStatusVoList(statusList.stream().sorted(Comparator.comparing(AutoexecJobPhaseNodeOperationStatusVo::getSort)).collect(toList())); } return nodeVo; @@ -1351,6 +1359,8 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC finalStatus = JobNodeStatus.ABORTED.getValue(); } else if (statusList.contains(JobNodeStatus.PAUSED.getValue())) { finalStatus = JobNodeStatus.PAUSED.getValue(); + } else if (statusList.contains(JobNodeStatus.WAITING.getValue())) { + finalStatus = JobNodeStatus.WAITING.getValue(); } else { finalStatus = JobNodeStatus.PENDING.getValue(); } @@ -1439,7 +1449,7 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC paramJson.put("nodeStatus", nodeStatus); for (RunnerMapVo runner : runnerVos) { String url = runner.getUrl() + "api/rest/job/phase/node/status/update"; - HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000).sendRequest(); + HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); if (StringUtils.isNotBlank(requestUtil.getError())) { throw new RunnerHttpRequestException(url + ":" + requestUtil.getError()); } @@ -1465,13 +1475,9 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC throw new AutoexecJobRunnerNotFoundException(runner.getRunnerMapId().toString()); } url = runner.getUrl() + "api/rest/health/check"; - HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setConnectTimeout(5000).setReadTimeout(5000).setPayload(new JSONObject().toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); + HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(new JSONObject().toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); if (StringUtils.isNotBlank(requestUtil.getError())) { - throw new RunnerConnectRefusedException(url, requestUtil.getError()); - } - JSONObject resultJson = requestUtil.getResultJson(); - if (resultJson == null || !resultJson.containsKey("Status") || !"OK".equals(resultJson.getString("Status"))) { - throw new RunnerConnectRefusedException(runner.getHost() + ":" + runner.getPort(), url + ", response code:" + requestUtil.getResponseCode()); + throw new ApiRuntimeException((StringUtils.isNotBlank(requestUtil.getErrorMsg()) ? requestUtil.getErrorMsg() : requestUtil.getError())); } } @@ -1520,59 +1526,63 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC if (CollectionUtils.isEmpty(runnerVos)) { throw new RunnerNotMatchException(); } - jobVo.setStatus(JobStatus.RUNNING.getValue()); autoexecJobMapper.updateJobStatus(jobVo); - + Integer isFirstFire = Arrays.asList(JobAction.FIRE.getValue(), JobAction.RESET_REFIRE.getValue()).contains(jobVo.getAction()) ? 1 : 0; JSONObject paramJson = new JSONObject(); paramJson.put("jobId", jobVo.getId()); paramJson.put("tenant", TenantContext.get().getTenantUuid()); paramJson.put("isNoFireNext", jobVo.getIsNoFireNext()); - paramJson.put("isFirstFire", jobVo.getIsFirstFire()); + paramJson.put("isFirstFire", isFirstFire); + if (jobVo.getExecuteJobGroupVo() == null) { + throw new AutoexecJobGroupNotFoundException(jobVo.getId()); + } if (CollectionUtils.isNotEmpty(jobVo.getExecuteJobPhaseList())) { paramJson.put("jobPhaseNameList", jobVo.getExecuteJobPhaseList().stream().map(AutoexecJobPhaseVo::getName).collect(Collectors.toList())); } - if (jobVo.getExecuteJobGroupVo() != null) { - paramJson.put("jobGroupIdList", Collections.singletonList(jobVo.getExecuteJobGroupVo().getSort())); - } + + paramJson.put("jobGroupSortList", Collections.singletonList(jobVo.getExecuteJobGroupVo().getSort())); if (jobVo.getCurrentPhase() != null && Objects.equals(jobVo.getCurrentPhase().getExecMode(), ExecMode.SQL.getValue())) { paramJson.put("jobPhaseNodeSqlList", jobVo.getJobPhaseNodeSqlList()); } else { paramJson.put("jobPhaseResourceIdList", jobVo.getExecuteResourceIdList()); } - RestVo restVo = null; - String result = StringUtils.EMPTY; - String url = StringUtils.EMPTY; runnerVos = runnerVos.stream().filter(o -> StringUtils.isNotBlank(o.getUrl())).collect(collectingAndThen(toCollection(() -> new TreeSet<>(Comparator.comparing(RunnerMapVo::getUrl))), ArrayList::new)); checkRunnerHealth(runnerVos); - try { - Long execid = SnowflakeUtil.uniqueLong(); - for (RunnerMapVo runner : runnerVos) { - jobVo.getEnvironment().put("RUNNER_ID", runner.getRunnerMapId()); - url = runner.getUrl() + "api/rest/job/exec"; - JSONObject passThroughEnv = jobVo.getPassThroughEnv(); - passThroughEnv.put("runnerId", runner.getRunnerMapId()); - if (jobVo.getExecuteJobGroupVo() != null) { - passThroughEnv.put("groupSort", jobVo.getExecuteJobGroupVo().getSort()); - } - if (CollectionUtils.isNotEmpty(jobVo.getExecuteJobPhaseList())) { - passThroughEnv.put("phaseSort", jobVo.getExecuteJobPhaseList().get(0).getSort()); - } - passThroughEnv.put("isFirstFire", jobVo.getIsFirstFire()); - passThroughEnv.put("EXECUSER_TOKEN", userMapper.getUserTokenByUser(UserContext.get().getUserId())); - paramJson.put("passThroughEnv", passThroughEnv); - paramJson.put("environment", jobVo.getEnvironment()); - paramJson.put("execid", String.valueOf(execid)); - restVo = new RestVo.Builder(url, AuthenticateType.BUILDIN.getValue()).setPayload(paramJson).build(); - result = RestUtil.sendPostRequest(restVo); - JSONObject resultJson = JSONObject.parseObject(result); - if (!resultJson.containsKey("Status") || !"OK".equals(resultJson.getString("Status"))) { - throw new RunnerHttpRequestException(restVo.getUrl() + ":" + resultJson.getString("Message")); - } + Long execid = SnowflakeUtil.uniqueLong(); + for (RunnerMapVo runner : runnerVos) { + jobVo.getEnvironment().put("RUNNER_ID", runner.getRunnerMapId()); + String url = runner.getUrl() + "api/rest/job/exec"; + JSONObject passThroughEnv = jobVo.getPassThroughEnv(); + passThroughEnv.put("runnerId", runner.getRunnerMapId()); + passThroughEnv.put("groupSort", jobVo.getExecuteJobGroupVo().getSort()); + if (CollectionUtils.isNotEmpty(jobVo.getExecuteJobPhaseList())) { + passThroughEnv.put("phaseSort", jobVo.getExecuteJobPhaseList().get(0).getSort()); } - } catch (Exception ex) { - logger.error(ex.getMessage(), ex); - throw new RunnerConnectRefusedException(url + " " + result); + passThroughEnv.put("isFirstFire", isFirstFire); + passThroughEnv.put("EXECUSER_TOKEN", userMapper.getUserTokenByUser(UserContext.get().getUserId())); + paramJson.put("passThroughEnv", passThroughEnv); + paramJson.put("environment", jobVo.getEnvironment()); + paramJson.put("execid", String.valueOf(execid)); + HttpRequestUtil httpRequestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); + + if (StringUtils.isNotBlank(httpRequestUtil.getError())) { + throw new ApiRuntimeException((StringUtils.isNotBlank(httpRequestUtil.getErrorMsg()) ? httpRequestUtil.getErrorMsg() : httpRequestUtil.getError())); + } + } + + } + + @Override + public void fireOrResetRefireWaiting(AutoexecJobVo jobVo) { + jobVo.setStatus(JobStatus.WAITING.getValue()); + autoexecJobMapper.updateJobStatus(jobVo); + //找到第一个组的第一个phase状态更新成排队中 + AutoexecJobPhaseVo firstPhase = autoexecJobMapper.getJobFirstPhaseByGroupId(jobVo.getExecuteJobGroupVo().getId()); + autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(firstPhase.getId()), JobPhaseStatus.WAITING.getValue()); + List jobPhaseRunnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdList(jobVo.getId(), Collections.singletonList(firstPhase.getId())); + for (AutoexecJobPhaseRunnerVo jobPhaseRunnerVo : jobPhaseRunnerVos) { + autoexecJobMapper.updateJobPhaseRunnerStatus(Collections.singletonList(firstPhase.getId()), jobPhaseRunnerVo.getRunnerMapId(), JobPhaseStatus.WAITING.getValue()); } } @@ -1609,4 +1619,51 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC refireAction.doService(job); } } + + /** + * 根据作业id获取作业等待详情 + * + * @param jobId 作业id + */ + @Override + public JSONArray getAutoexecJobWaitingDetail(Long jobId) { + JSONArray queueStatusArray = new JSONArray(); + //作业基本信息 + AutoexecJobVo jobVo = autoexecJobMapper.getJobInfo(jobId); + if (jobVo == null) { + throw new AutoexecJobNotFoundException(jobId.toString()); + } + List runnerVos = autoexecJobMapper.getJobPhaseRunnerMapByJobId(jobId); + + JSONObject params = new JSONObject(); + params.put("jobId", jobId); + checkRunnerHealth(runnerVos); + for (RunnerMapVo runner : runnerVos) { + String url = runner.getUrl() + "api/rest/job/waiting/detail/get"; + HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(params.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest(); + if (StringUtils.isNotBlank(requestUtil.getError())) { + throw new RunnerHttpRequestException(url + ":" + requestUtil.getError()); + } + JSONObject resultJson = requestUtil.getResultJson(); + JSONObject queueJson = resultJson.getJSONObject("Return"); + if (MapUtils.isNotEmpty(queueJson)) { + for (Map.Entry entry : queueJson.entrySet()) { + String key = entry.getKey(); + if (Objects.equals(key, "count")) { + continue; + } + JSONObject value = JSON.parseObject(entry.getValue().toString()); + JSONObject queueStatus = new JSONObject(); + queueStatus.put("sort", key + "/" + queueJson.getString("count")); + queueStatus.put("command", value.getString("command")); + queueStatus.put("fcd", TimeUtil.convertDateToString(new Date(value.getLong("fcd")), TimeUtil.YYYY_MM_DD_HH_MM_SS)); + queueStatus.put("runner", runner.getName() + ":" + runner.getPort()); + queueStatus.put("runnerId", runner.getId()); + queueStatus.put("groupSortList",value.getJSONArray("groupSortList")); + queueStatusArray.add(queueStatus); + } + } + } + return queueStatusArray; + } } diff --git a/src/main/java/neatlogic/module/autoexec/stephandler/component/AutoexecProcessComponent.java b/src/main/java/neatlogic/module/autoexec/stephandler/component/AutoexecProcessComponent.java index 8d4e3e833952edba4334843fabbecfefc578f441..999feb3996e906b4ed0636c068aefca3e13327a9 100644 --- a/src/main/java/neatlogic/module/autoexec/stephandler/component/AutoexecProcessComponent.java +++ b/src/main/java/neatlogic/module/autoexec/stephandler/component/AutoexecProcessComponent.java @@ -247,7 +247,6 @@ public class AutoexecProcessComponent extends ProcessStepHandlerBase { jobObj.put("operationType", jobVo.getOperationType()); jobObj.put("invokeId", jobVo.getInvokeId()); jobObj.put("routeId", jobVo.getRouteId()); - jobObj.put("isFirstFire", jobVo.getIsFirstFire()); jobObj.put("assignExecUser", jobVo.getAssignExecUser()); logger.error(jobObj.toJSONString()); JSONObject errorMessageObj = new JSONObject(); @@ -490,7 +489,6 @@ public class AutoexecProcessComponent extends ProcessStepHandlerBase { jobVo.setOperationType(CombopOperationType.COMBOP.getValue()); jobVo.setInvokeId(currentProcessTaskStepVo.getId()); jobVo.setRouteId(currentProcessTaskStepVo.getId().toString()); - jobVo.setIsFirstFire(1); jobVo.setAssignExecUser(SystemUser.SYSTEM.getUserUuid()); return jobVo; } @@ -538,7 +536,6 @@ public class AutoexecProcessComponent extends ProcessStepHandlerBase { jobVo.setOperationType(CombopOperationType.COMBOP.getValue()); jobVo.setInvokeId(currentProcessTaskStepVo.getId()); jobVo.setRouteId(currentProcessTaskStepVo.getId().toString()); - jobVo.setIsFirstFire(1); jobVo.setAssignExecUser(SystemUser.SYSTEM.getUserUuid()); JSONObject tbodyObj = tbodyList.getJSONObject(index); // 场景 @@ -1252,7 +1249,6 @@ public class AutoexecProcessComponent extends ProcessStepHandlerBase { jobVo.setOperationType(CombopOperationType.COMBOP.getValue()); jobVo.setInvokeId(currentProcessTaskStepVo.getId()); jobVo.setRouteId(currentProcessTaskStepVo.getId().toString()); - jobVo.setIsFirstFire(1); jobVo.setAssignExecUser(SystemUser.SYSTEM.getUserUuid()); return jobVo; } diff --git a/src/main/resources/neatlogic/resources/autoexec/changelog/2025-04-10/neatlogic_tenant.sql b/src/main/resources/neatlogic/resources/autoexec/changelog/2025-04-10/neatlogic_tenant.sql new file mode 100644 index 0000000000000000000000000000000000000000..5c96246d5c731a65ac68ed5953d52c544133cdbd --- /dev/null +++ b/src/main/resources/neatlogic/resources/autoexec/changelog/2025-04-10/neatlogic_tenant.sql @@ -0,0 +1,8 @@ +ALTER TABLE `autoexec_job` +MODIFY COLUMN `status` enum('running','pausing','paused','completed','pending','aborting','aborted','succeed','failed','waitInput','ready','revoked','saved','checked','waiting') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '作业状态' AFTER `name`; + +ALTER TABLE `autoexec_job_phase_node` +MODIFY COLUMN `status` enum('succeed','pending','failed','ignored','running','aborted','aborting','waitInput','pausing','paused','invalid','waiting') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '状态' AFTER `end_time`; + +ALTER TABLE `autoexec_job_phase_runner` +MODIFY COLUMN `status` enum('pending','completed','failed','paused','aborted','running','aborting','pausing','waitInput','ignored','waiting') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT 'pending' COMMENT '状态' AFTER `runner_map_id`; \ No newline at end of file