diff --git a/src/main/java/neatlogic/module/autoexec/api/job/AutoexecJobPhaseListApi.java b/src/main/java/neatlogic/module/autoexec/api/job/AutoexecJobPhaseListApi.java
index a20079831bfb75bd0ede7bbda7c1e0a32bd367c7..a60a496cfa9ccc64298dfcdc6ff5fb5f8e0cf1f6 100644
--- a/src/main/java/neatlogic/module/autoexec/api/job/AutoexecJobPhaseListApi.java
+++ b/src/main/java/neatlogic/module/autoexec/api/job/AutoexecJobPhaseListApi.java
@@ -134,6 +134,7 @@ public class AutoexecJobPhaseListApi extends PrivateApiComponentBase {
if (jobSourceTypeHandler != null) {
result.put("extraInfo",jobSourceTypeHandler.getExtraRefreshJobInfo(jobVo));
}
+ result.put("waitingDetail",autoexecJobService.getAutoexecJobWaitingDetail(jobVo.getId()));
return result;
}
diff --git a/src/main/java/neatlogic/module/autoexec/api/job/GetAutoexecJobWaitingDetailApi.java b/src/main/java/neatlogic/module/autoexec/api/job/GetAutoexecJobWaitingDetailApi.java
new file mode 100644
index 0000000000000000000000000000000000000000..4eb3e095e4996bcc94cd184f41a4b6810ab6be49
--- /dev/null
+++ b/src/main/java/neatlogic/module/autoexec/api/job/GetAutoexecJobWaitingDetailApi.java
@@ -0,0 +1,66 @@
+/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package neatlogic.module.autoexec.api.job;
+
+import com.alibaba.fastjson.JSONObject;
+import neatlogic.framework.auth.core.AuthAction;
+import neatlogic.framework.autoexec.auth.AUTOEXEC_BASE;
+import neatlogic.framework.common.constvalue.ApiParamType;
+import neatlogic.framework.restful.annotation.Description;
+import neatlogic.framework.restful.annotation.Input;
+import neatlogic.framework.restful.annotation.OperationType;
+import neatlogic.framework.restful.annotation.Param;
+import neatlogic.framework.restful.constvalue.OperationTypeEnum;
+import neatlogic.framework.restful.core.privateapi.PrivateApiComponentBase;
+import neatlogic.module.autoexec.service.AutoexecJobService;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+@Service
+@AuthAction(action = AUTOEXEC_BASE.class)
+@OperationType(type = OperationTypeEnum.SEARCH)
+public class GetAutoexecJobWaitingDetailApi extends PrivateApiComponentBase {
+ @Resource
+ AutoexecJobService autoexecJobService;
+
+ @Override
+ public String getName() {
+ return "nmaaj.getautoexecjobqueuestatusapi.getname";
+ }
+
+ @Override
+ public String getConfig() {
+ return null;
+ }
+
+ @Input({
+ @Param(name = "jobId", type = ApiParamType.LONG, isRequired = true, desc = "term.autoexec.jobid"),
+ @Param(name = "groupSort", type = ApiParamType.LONG, desc = "term.autoexec.groupid"),
+ @Param(name = "phaseId", type = ApiParamType.LONG, desc = "term.process.phaseid"),
+ })
+ @Description(desc = "nmaaj.getautoexecjobqueuestatusapi.getname")
+ @Override
+ public Object myDoService(JSONObject jsonObj) throws Exception {
+ Long jobId = jsonObj.getLong("jobId");
+ return autoexecJobService.getAutoexecJobWaitingDetail(jobId);
+ }
+
+ @Override
+ public String getToken() {
+ return "autoexec/job/waiting/detail/get";
+ }
+}
diff --git a/src/main/java/neatlogic/module/autoexec/api/job/action/CreateAutoexecJobFromOperationApi.java b/src/main/java/neatlogic/module/autoexec/api/job/action/CreateAutoexecJobFromOperationApi.java
index 4c7d279353e7a95cc29adf0714ab1d4317086ddd..db7264de1737bcfe9acf7d4fdd5d8769898c6bb5 100644
--- a/src/main/java/neatlogic/module/autoexec/api/job/action/CreateAutoexecJobFromOperationApi.java
+++ b/src/main/java/neatlogic/module/autoexec/api/job/action/CreateAutoexecJobFromOperationApi.java
@@ -106,7 +106,6 @@ public class CreateAutoexecJobFromOperationApi extends PrivateApiComponentBase {
AutoexecJobVo jobVo = JSON.toJavaObject(jsonObj, AutoexecJobVo.class);
jobVo.setRunTimeParamList(combopVo.getConfig().getRuntimeParamList() == null ? new ArrayList<>() : combopVo.getConfig().getRuntimeParamList());
jobVo.setOperationType(type);
- jobVo.setIsFirstFire(1);
jobVo.setAction(JobAction.FIRE.getValue());
jobVo.setInvokeId(jobVo.getOperationId());
jobVo.setRouteId(jobVo.getOperationId().toString());
diff --git a/src/main/java/neatlogic/module/autoexec/api/job/exec/FireAutoexecJobNextGroupApi.java b/src/main/java/neatlogic/module/autoexec/api/job/exec/FireAutoexecJobNextGroupApi.java
index 5d70b04b1068a0b0c373a44dd2d81b83ffb16da8..dc8e29ab10822ee9c024df8787fd07fc54398bac 100644
--- a/src/main/java/neatlogic/module/autoexec/api/job/exec/FireAutoexecJobNextGroupApi.java
+++ b/src/main/java/neatlogic/module/autoexec/api/job/exec/FireAutoexecJobNextGroupApi.java
@@ -103,6 +103,7 @@ public class FireAutoexecJobNextGroupApi extends PrivateApiComponentBase {
AutoexecJobGroupVo nextGroupVo = autoexecJobMapper.getJobGroupByJobIdAndSort(jobId, groupSort + 1);
if (nextGroupVo != null) {
jobVo.setExecuteJobGroupVo(nextGroupVo);
+ jobVo.setIsFirstFire(0);
IAutoexecJobActionHandler fireAction = AutoexecJobActionHandlerFactory.getAction(JobAction.FIRE.getValue());
fireAction.doService(jobVo);
}
diff --git a/src/main/java/neatlogic/module/autoexec/api/job/exec/UpdateAutoexecJobPhaseStatusApi.java b/src/main/java/neatlogic/module/autoexec/api/job/exec/UpdateAutoexecJobPhaseStatusApi.java
index 90cf617843dd7e1464792bcd9c21ac507affe886..3aca50b3d30d16b00e0400c7f2f8df66cac568dd 100644
--- a/src/main/java/neatlogic/module/autoexec/api/job/exec/UpdateAutoexecJobPhaseStatusApi.java
+++ b/src/main/java/neatlogic/module/autoexec/api/job/exec/UpdateAutoexecJobPhaseStatusApi.java
@@ -99,6 +99,7 @@ public class UpdateAutoexecJobPhaseStatusApi extends PrivateApiComponentBase {
String phaseRunnerStatus = jsonObj.getString("status");
Integer phaseRunnerWarnCount = jsonObj.getInteger("warnCount");
JSONObject passThroughEnv = jsonObj.getJSONObject("passThroughEnv");
+ Integer isFirstFire = 0;
Long runnerId = 0L;
if (MapUtils.isNotEmpty(passThroughEnv)) {
if (!passThroughEnv.containsKey("runnerId")) {
@@ -106,11 +107,15 @@ public class UpdateAutoexecJobPhaseStatusApi extends PrivateApiComponentBase {
} else {
runnerId = passThroughEnv.getLong("runnerId");
}
+ if (passThroughEnv.containsKey("isFirstFire")) {
+ isFirstFire = passThroughEnv.getInteger("isFirstFire");
+ }
}
AutoexecJobVo jobVo = autoexecJobMapper.getJobLockByJobId(jobId);
if (jobVo == null) {
throw new AutoexecJobNotFoundException(jobId.toString());
}
+ jobVo.setIsFirstFire(isFirstFire);
//更新执行用户上下文
autoexecJobActionService.initExecuteUserContext(jobVo);
@@ -165,7 +170,7 @@ public class UpdateAutoexecJobPhaseStatusApi extends PrivateApiComponentBase {
finalJobPhaseStatus = JobPhaseStatus.COMPLETED.getValue();
} else if (statusCountMap.get(JobPhaseStatus.WAIT_INPUT.getValue()) > 0) {
finalJobPhaseStatus = JobPhaseStatus.WAIT_INPUT.getValue();
- } else if (statusCountMap.get(JobPhaseStatus.RUNNING.getValue()) > 0) {
+ } else if (statusCountMap.get(JobPhaseStatus.RUNNING.getValue()) > 0 || statusCountMap.get(JobPhaseStatus.COMPLETED.getValue())>0) {
finalJobPhaseStatus = JobPhaseStatus.RUNNING.getValue();
} else if (statusCountMap.get(JobPhaseStatus.FAILED.getValue()) > 0) {
finalJobPhaseStatus = JobPhaseStatus.FAILED.getValue();
@@ -173,6 +178,8 @@ public class UpdateAutoexecJobPhaseStatusApi extends PrivateApiComponentBase {
finalJobPhaseStatus = JobPhaseStatus.ABORTED.getValue();
} else if (statusCountMap.get(JobPhaseStatus.PAUSED.getValue()) > 0) {
finalJobPhaseStatus = JobPhaseStatus.PAUSED.getValue();
+ } else if (statusCountMap.get(JobPhaseStatus.WAITING.getValue()) > 0) {
+ finalJobPhaseStatus = JobPhaseStatus.WAITING.getValue();
} else {
finalJobPhaseStatus = JobPhaseStatus.PENDING.getValue();
}
diff --git a/src/main/java/neatlogic/module/autoexec/api/job/runner/AutoexecJobProcessStatusUpdateApi.java b/src/main/java/neatlogic/module/autoexec/api/job/runner/AutoexecJobProcessStatusUpdateApi.java
index 85acd5961fdaa657c48fe4ce1a1fb769af6fab7f..61a64429ed161c135e908f04fa9465bc33f758f6 100644
--- a/src/main/java/neatlogic/module/autoexec/api/job/runner/AutoexecJobProcessStatusUpdateApi.java
+++ b/src/main/java/neatlogic/module/autoexec/api/job/runner/AutoexecJobProcessStatusUpdateApi.java
@@ -124,14 +124,16 @@ public class AutoexecJobProcessStatusUpdateApi extends PrivateApiComponentBase {
List> phaseAbortingCountMapList = autoexecJobMapper.getJobPhaseRunnerAbortingCountMapCountByJobId(jobId);
HashMap phaseIdAbortingCountMap = new HashMap<>();
for (HashMap phaseAbortingCountMap : phaseAbortingCountMapList) {
- phaseIdAbortingCountMap.put(phaseAbortingCountMap.get("job_phase_id"), Integer.valueOf(phaseAbortingCountMap.get("count")));
+ phaseIdAbortingCountMap.put(String.valueOf(phaseAbortingCountMap.get("job_phase_id")), Integer.valueOf(String.valueOf(phaseAbortingCountMap.get("count"))));
}
for (Long phaseId : jobPhaseIdList) {
if (phaseIdAbortingCountMap.get(phaseId.toString()) == 0) {
jobPhaseIdAbortedList.add(phaseId);
}
}
- autoexecJobMapper.updateJobPhaseRunnerStatusBatch(jobPhaseIdAbortedList, JobPhaseStatus.ABORTED.getValue(), runnerId);
+ if(CollectionUtils.isNotEmpty(jobPhaseIdAbortedList)) {
+ autoexecJobMapper.updateJobPhaseRunnerStatusBatch(jobPhaseIdAbortedList, JobPhaseStatus.ABORTED.getValue(), runnerId);
+ }
}
if (StringUtils.isNotBlank(status)) {
//4
diff --git a/src/main/java/neatlogic/module/autoexec/autoconfig/handler/AutoexecJobCleaner.java b/src/main/java/neatlogic/module/autoexec/autoconfig/handler/AutoexecJobCleaner.java
index b4cadcfc9587636093e572e595f24c381abed815..020d010efabd57df1e05aab2645218a55c535661 100644
--- a/src/main/java/neatlogic/module/autoexec/autoconfig/handler/AutoexecJobCleaner.java
+++ b/src/main/java/neatlogic/module/autoexec/autoconfig/handler/AutoexecJobCleaner.java
@@ -83,7 +83,7 @@ public class AutoexecJobCleaner extends AuditCleanerBase {
paramJson.put("passThroughEnv", new JSONObject() {{
put("runnerId", runner.getRunnerMapId());
}});
- HttpRequestUtil.post(url).setConnectTimeout(5000).setReadTimeout(10000).setAuthType(AuthenticateType.BUILDIN).setPayload(paramJson.toJSONString()).sendRequest();
+ HttpRequestUtil.post(url).setAuthType(AuthenticateType.BUILDIN).setPayload(paramJson.toJSONString()).sendRequest();
}
}
}
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobAbortHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobAbortHandler.java
index 1d954cb5a23912772cda04d4245ce2ce6158eca3..0b27df5b40d8fb50967fd4e7d98420fbcb7bf825 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobAbortHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobAbortHandler.java
@@ -15,6 +15,7 @@ along with this program. If not, see .*/
package neatlogic.module.autoexec.job.action.handler;
+import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import neatlogic.framework.asynchronization.threadlocal.TenantContext;
import neatlogic.framework.asynchronization.threadlocal.UserContext;
@@ -64,37 +65,44 @@ public class AutoexecJobAbortHandler extends AutoexecJobActionHandlerBase {
}
@Override
- public boolean isNeedExecuteAuthCheck(){
+ public boolean isNeedExecuteAuthCheck() {
return true;
}
@Override
public JSONObject doMyService(AutoexecJobVo jobVo) {
+ List abortingPhaseIdList = new ArrayList<>();
//更新job状态 为中止中
jobVo.setStatus(JobPhaseStatus.ABORTING.getValue());
autoexecJobMapper.updateJobStatus(jobVo);
//更新phase状态 为中止中
jobVo.setPhaseList(autoexecJobMapper.getJobPhaseListWithGroupByJobId(jobVo.getId()));
for (AutoexecJobPhaseVo jobPhase : jobVo.getPhaseList()) {
- if (Arrays.asList(JobPhaseStatus.RUNNING.getValue(),JobPhaseStatus.WAITING.getValue(),JobPhaseStatus.WAIT_INPUT.getValue()).contains(jobPhase.getStatus())) {
+ if (Objects.equals(JobPhaseStatus.ABORTING.getValue(), jobPhase.getStatus())) {
+ abortingPhaseIdList.add(jobPhase.getId());
+ } else if (Arrays.asList(JobPhaseStatus.RUNNING.getValue(), JobPhaseStatus.WAITING.getValue(), JobPhaseStatus.WAIT_INPUT.getValue()).contains(jobPhase.getStatus())) {
jobPhase.setStatus(JobStatus.ABORTING.getValue());
+ abortingPhaseIdList.add(jobPhase.getId());
autoexecJobMapper.updateJobPhaseStatus(jobPhase);
autoexecJobMapper.updateBatchJobPhaseRunnerStatus(jobPhase.getId(), JobPhaseStatus.ABORTING.getValue());
}
}
//更新node状态 为中止中
- List nodeVoList = autoexecJobMapper.getJobPhaseNodeListByJobIdAndNodeStatusList(jobVo.getId(), Collections.singletonList(JobNodeStatus.RUNNING.getValue()));
+ List nodeVoList = autoexecJobMapper.getJobPhaseNodeListByJobIdAndNodeStatusList(jobVo.getId(), Arrays.asList(JobPhaseStatus.WAITING.getValue(), JobNodeStatus.RUNNING.getValue()));
for (AutoexecJobPhaseNodeVo nodeVo : nodeVoList) {
nodeVo.setStatus(JobNodeStatus.ABORTING.getValue());
autoexecJobMapper.updateJobPhaseNodeStatus(nodeVo);
}
- List runnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdListAndStatus(jobVo.getId(), jobVo.getPhaseIdList(),JobNodeStatus.ABORTING.getValue());
+ List runnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdListAndStatus(jobVo.getId(), jobVo.getPhaseIdList(), JobNodeStatus.ABORTING.getValue());
if (CollectionUtils.isEmpty(runnerVos)) {
+ if (CollectionUtils.isNotEmpty(abortingPhaseIdList)) {
+ autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(abortingPhaseIdList, JobPhaseStatus.ABORTED.getValue());
+ }
jobVo.setStatus(JobStatus.ABORTED.getValue());
autoexecJobMapper.updateJobStatus(jobVo);
- }else {
- runnerVos = runnerVos.stream().filter(o->StringUtils.isNotBlank(o.getUrl())).collect(collectingAndThen(toCollection(() -> new TreeSet<>( Comparator.comparing(RunnerMapVo::getUrl))), ArrayList::new));
+ } else {
+ runnerVos = runnerVos.stream().filter(o -> StringUtils.isNotBlank(o.getUrl())).collect(collectingAndThen(toCollection(() -> new TreeSet<>(Comparator.comparing(RunnerMapVo::getUrl))), ArrayList::new));
autoexecJobService.checkRunnerHealth(runnerVos);
JSONObject paramJson = new JSONObject();
paramJson.put("jobId", jobVo.getId());
@@ -111,15 +119,22 @@ public class AutoexecJobAbortHandler extends AutoexecJobActionHandlerBase {
url = runner.getUrl() + "api/rest/job/abort";
RestVo restVo = new RestVo.Builder(url, AuthenticateType.BUILDIN.getValue()).setPayload(paramJson).build();
result = RestUtil.sendPostRequest(restVo);
- JSONObject resultJson = JSONObject.parseObject(result);
+ JSONObject resultJson = JSON.parseObject(result);
if (!resultJson.containsKey("Status") || !"OK".equals(resultJson.getString("Status"))) {
throw new RunnerHttpRequestException(restVo.getUrl() + ":" + resultJson.getString("Message"));
}
+ autoexecJobMapper.updateJobPhaseRunnerStatus(abortingPhaseIdList, runner.getRunnerMapId(), JobPhaseStatus.ABORTED.getValue());
}
} catch (Exception ex) {
logger.error(ex.getMessage(), ex);
throw new RunnerConnectRefusedException(url + " " + result);
}
+ if (autoexecJobMapper.getJobPhaseRunnerStatusCountByJobIdAndStatus(jobVo.getId(), JobPhaseStatus.ABORTING.getValue()) == 0) {
+ if (CollectionUtils.isNotEmpty(abortingPhaseIdList)) {
+ autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(abortingPhaseIdList, JobPhaseStatus.ABORTED.getValue());
+ }
+ autoexecJobMapper.updateJobStatus(new AutoexecJobVo(jobVo.getId(), JobPhaseStatus.ABORTED.getValue()));
+ }
}
return null;
}
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobConsoleLogAuditListHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobConsoleLogAuditListHandler.java
index 5630f6db631aa7f09da12f3ffe3877f63bbe6b21..82f41c2032d6a81b9e824786728214d0b886d615 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobConsoleLogAuditListHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobConsoleLogAuditListHandler.java
@@ -75,7 +75,7 @@ public class AutoexecJobConsoleLogAuditListHandler extends AutoexecJobActionHand
JSONObject result = new JSONObject();
JSONObject paramObj = jobVo.getActionParam();
String url = paramObj.getString("runnerUrl") + "/api/rest/job/console/log/audit/list";
- HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setConnectTimeout(5000).setReadTimeout(5000).setPayload(paramObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
+ HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
if(StringUtils.isNotBlank(requestUtil.getError())){
throw new RunnerConnectRefusedException(url);
}
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobFireHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobFireHandler.java
index 9b525ca41229c1687bdd3f883e5e670f49c8803a..aabc892bc45ab35d8c3534d5df65660f1b44c0ff 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobFireHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobFireHandler.java
@@ -56,6 +56,7 @@ public class AutoexecJobFireHandler extends AutoexecJobActionHandlerBase {
@Override
public JSONObject doMyService(AutoexecJobVo jobVo) {
autoexecJobMapper.getJobLockByJobId(jobVo.getId());
+ autoexecJobService.fireOrResetRefireWaiting(jobVo);
autoexecJobService.executeGroup(jobVo);
return new JSONObject(){{
put("jobId",jobVo.getId());
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPauseHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPauseHandler.java
index deb481670e7fdacb14d8d1c771d263ccf24ae8e0..1bbcc27ff5892cf2b6e3c889b4722ed3e1300d97 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPauseHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPauseHandler.java
@@ -103,7 +103,7 @@ public class AutoexecJobPauseHandler extends AutoexecJobActionHandlerBase {
//put("phaseSort", jobVo.getCurrentGroupSort());
}});
String url = runner.getUrl() + "api/rest/job/pause";
- HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000).sendRequest();
+ HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
if (StringUtils.isNotBlank(requestUtil.getError())) {
throw new RunnerHttpRequestException(url + ":" + requestUtil.getError());
}
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseReFireHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseReFireHandler.java
index d991f84c9a2bfd4383b6b3c03568eaca5e0cc52d..8b401d7f2c84fa2adf45e02f4f9ccb07ce5306ee 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseReFireHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseReFireHandler.java
@@ -22,6 +22,7 @@ import neatlogic.framework.autoexec.constvalue.JobPhaseStatus;
import neatlogic.framework.autoexec.constvalue.JobStatus;
import neatlogic.framework.autoexec.dao.mapper.AutoexecJobMapper;
import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseNodeVo;
+import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseRunnerVo;
import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseVo;
import neatlogic.framework.autoexec.dto.job.AutoexecJobVo;
import neatlogic.framework.autoexec.exception.AutoexecJobPhaseRunnerNotFoundException;
@@ -72,8 +73,13 @@ public class AutoexecJobPhaseReFireHandler extends AutoexecJobActionHandlerBase
@Override
public JSONObject doMyService(AutoexecJobVo jobVo) {
AutoexecJobPhaseVo jobPhaseVo = jobVo.getExecuteJobPhaseList().get(0);
- jobVo.setStatus(JobStatus.RUNNING.getValue());
- autoexecJobMapper.updateJobStatus(jobVo);
+ AutoexecJobPhaseVo phaseVo = autoexecJobMapper.getJobPhaseByJobIdAndPhaseStatus(jobVo.getId(), JobPhaseStatus.RUNNING.getValue());
+ //存在进行中的阶段不修改作业状态
+ if (phaseVo == null) {
+ jobVo.setStatus(JobStatus.WAITING.getValue());
+ autoexecJobMapper.updateJobStatus(jobVo);
+ }
+ jobVo.setIsFirstFire(0);
jobPhaseVo.setStatus(JobPhaseStatus.RUNNING.getValue());
autoexecJobMapper.updateJobPhaseStatus(jobPhaseVo);
//如果是sqlfile类型的phase 需额外清除状态
@@ -84,6 +90,11 @@ public class AutoexecJobPhaseReFireHandler extends AutoexecJobActionHandlerBase
resetPhase(jobVo);
autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(jobVo.getExecuteJobPhaseList().stream().map(AutoexecJobPhaseVo::getId).collect(Collectors.toList()), JobPhaseStatus.PENDING.getValue());
autoexecJobService.refreshJobPhaseNodeList(jobVo.getId(), jobVo.getExecuteJobPhaseList());
+ autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(jobPhaseVo.getId()), JobPhaseStatus.WAITING.getValue());
+ List jobPhaseRunnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdList(jobVo.getId(), Collections.singletonList(jobPhaseVo.getId()));
+ for (AutoexecJobPhaseRunnerVo jobPhaseRunnerVo : jobPhaseRunnerVos) {
+ autoexecJobMapper.updateJobPhaseRunnerStatus(Collections.singletonList(jobPhaseVo.getId()), jobPhaseRunnerVo.getRunnerMapId(), JobPhaseStatus.WAITING.getValue());
+ }
}
if (Objects.equals(jobVo.getAction(), JobAction.REFIRE.getValue())) {
List needResetNodeList = autoexecJobMapper.getJobPhaseNodeListByJobIdAndPhaseIdAndExceptStatus(jobPhaseVo.getId(), Arrays.asList(JobNodeStatus.IGNORED.getValue(), JobNodeStatus.SUCCEED.getValue(), JobNodeStatus.INVALID.getValue()));
@@ -94,13 +105,13 @@ public class AutoexecJobPhaseReFireHandler extends AutoexecJobActionHandlerBase
throw new AutoexecJobPhaseRunnerNotFoundException(jobPhaseVo.getJobId(), jobPhaseVo.getName(), jobPhaseVo.getId());
}
autoexecJobService.updateJobNodeStatus(runnerMapVos, jobVo, JobNodeStatus.PENDING.getValue());
- autoexecJobMapper.updateJobPhaseNodeListStatusByPhaseIdAndExceptStatus(jobPhaseVo.getId(), Arrays.asList(JobNodeStatus.IGNORED.getValue(), JobNodeStatus.SUCCEED.getValue(), JobNodeStatus.INVALID.getValue()),JobNodeStatus.PENDING.getValue());
+ autoexecJobMapper.updateJobPhaseNodeListStatusByPhaseIdAndExceptStatus(jobPhaseVo.getId(), Arrays.asList(JobNodeStatus.IGNORED.getValue(), JobNodeStatus.SUCCEED.getValue(), JobNodeStatus.INVALID.getValue()), JobNodeStatus.PENDING.getValue());
jobVo.setExecuteJobNodeVoList(null);
}
}
Integer pendingCount = autoexecJobMapper.isHasPendingNode(jobPhaseVo.getId());
- if(pendingCount == null){
- autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(jobPhaseVo.getId()),JobPhaseStatus.COMPLETED.getValue());
+ if (pendingCount == null) {
+ autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(jobPhaseVo.getId()), JobPhaseStatus.COMPLETED.getValue());
return null;
}
jobPhaseVo.setJobGroupVo(autoexecJobMapper.getJobGroupById(jobPhaseVo.getGroupId()));
@@ -128,7 +139,7 @@ public class AutoexecJobPhaseReFireHandler extends AutoexecJobActionHandlerBase
put("runnerId", runner.getRunnerMapId());
}});
- HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setConnectTimeout(5000).setReadTimeout(5000).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
+ HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
if (StringUtils.isNotBlank(requestUtil.getError())) {
throw new RunnerConnectRefusedException(url);
}
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseRoundInformHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseRoundInformHandler.java
index 8362c52abd5c23e77aa00ccbe46118c3c6696e18..92d1ddf67b7304fd88af1be7750c0f00181a85bf 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseRoundInformHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobPhaseRoundInformHandler.java
@@ -80,7 +80,7 @@ public class AutoexecJobPhaseRoundInformHandler extends AutoexecJobActionHandler
for (RunnerMapVo runnerVo : runnerVos) {
String url = String.format("%s/api/rest/job/phase/socket/write", runnerVo.getUrl());
String result = HttpRequestUtil.post(url)
- .setPayload(jsonObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000)
+ .setPayload(jsonObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN)
.sendRequest().getError();
if (StringUtils.isNotBlank(result)) {
throw new RunnerHttpRequestException(url + ":" + result);
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobReFireHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobReFireHandler.java
index 94d0469ef1b6ef6a707891a963d7e23bfe6c5ff0..13f0a579be6ace899c4c70b21a9056cb36c32083 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobReFireHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/AutoexecJobReFireHandler.java
@@ -17,9 +17,11 @@ package neatlogic.module.autoexec.job.action.handler;
import com.alibaba.fastjson.JSONObject;
import neatlogic.framework.autoexec.constvalue.JobAction;
+import neatlogic.framework.autoexec.constvalue.JobNodeStatus;
import neatlogic.framework.autoexec.constvalue.JobPhaseStatus;
import neatlogic.framework.autoexec.constvalue.JobStatus;
import neatlogic.framework.autoexec.dao.mapper.AutoexecJobMapper;
+import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseRunnerVo;
import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseVo;
import neatlogic.framework.autoexec.dto.job.AutoexecJobVo;
import neatlogic.framework.autoexec.exception.AutoexecJobActionInvalidException;
@@ -40,6 +42,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -82,17 +85,17 @@ public class AutoexecJobReFireHandler extends AutoexecJobActionHandlerBase {
jobVo.setExecuteJobGroupVo(autoexecJobMapper.getJobGroupByJobIdAndSort(jobVo.getId(), 0));
//重刷所有phase node
autoexecJobService.refreshJobNodeList(jobVo.getId());
+ autoexecJobService.fireOrResetRefireWaiting(jobVo);
//更新没有删除的节点为"未开始"状态
//autoexecJobMapper.updateJobPhaseNodeStatusByJobIdAndIsDelete(jobVo.getId(), JobNodeStatus.PENDING.getValue(), 0);
- jobVo.setIsFirstFire(1);
} else if (Objects.equals(jobVo.getAction(), JobAction.REFIRE.getValue())) {
/*寻找中止|暂停|失败的phase
* 1、寻找pending|aborted|paused|failed phaseList
* 2、没有满足1条件的,再寻找pending|aborted|paused|failed node 最小sort phaseList
*/
- List autoexecJobPhaseVos = autoexecJobMapper.getJobPhaseListByJobIdAndPhaseStatus(jobVo.getId(), Arrays.asList(JobPhaseStatus.PENDING.getValue(), JobPhaseStatus.ABORTED.getValue(), JobPhaseStatus.PAUSED.getValue(), JobPhaseStatus.FAILED.getValue()));
+ List autoexecJobPhaseVos = autoexecJobMapper.getJobPhaseListByJobIdAndPhaseStatus(jobVo.getId(), Arrays.asList(JobPhaseStatus.PENDING.getValue(), JobPhaseStatus.WAITING.getValue(),JobPhaseStatus.ABORTED.getValue(), JobPhaseStatus.PAUSED.getValue(), JobPhaseStatus.FAILED.getValue()));
if (CollectionUtils.isEmpty(autoexecJobPhaseVos)) {
- autoexecJobPhaseVos = autoexecJobMapper.getJobPhaseListByJobIdAndNodeStatusList(jobVo.getId(), Arrays.asList(JobPhaseStatus.PENDING.getValue(), JobPhaseStatus.ABORTED.getValue(), JobPhaseStatus.PAUSED.getValue(), JobPhaseStatus.FAILED.getValue()));
+ autoexecJobPhaseVos = autoexecJobMapper.getJobPhaseListByJobIdAndNodeStatusList(jobVo.getId(), Arrays.asList(JobNodeStatus.PENDING.getValue(),JobNodeStatus.WAITING.getValue(), JobNodeStatus.ABORTED.getValue(), JobNodeStatus.PAUSED.getValue(), JobNodeStatus.FAILED.getValue()));
}
//如果都成功了则无须重跑
if (CollectionUtils.isEmpty(autoexecJobPhaseVos)) {
@@ -104,10 +107,17 @@ public class AutoexecJobReFireHandler extends AutoexecJobActionHandlerBase {
autoexecJobMapper.updateJobStatus(jobVo);
return null;
}
- jobVo.setStatus(JobStatus.PENDING.getValue());
+ jobVo.setStatus(JobStatus.WAITING.getValue());
autoexecJobMapper.updateJobStatus(jobVo);
autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(autoexecJobPhaseVos.stream().map(AutoexecJobPhaseVo::getId).collect(Collectors.toList()), JobPhaseStatus.PENDING.getValue());
jobVo.setExecuteJobGroupVo(autoexecJobPhaseVos.get(0).getJobGroupVo());
+ //重置需要重跑的第一个phase的状态为waiting
+ AutoexecJobPhaseVo firstPhase = autoexecJobPhaseVos.get(0);
+ autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(firstPhase.getId()), JobPhaseStatus.WAITING.getValue());
+ List jobPhaseRunnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdList(jobVo.getId(), Collections.singletonList(firstPhase.getId()));
+ for (AutoexecJobPhaseRunnerVo jobPhaseRunnerVo : jobPhaseRunnerVos) {
+ autoexecJobMapper.updateJobPhaseRunnerStatus(Collections.singletonList(firstPhase.getId()), jobPhaseRunnerVo.getRunnerMapId(), JobPhaseStatus.WAITING.getValue());
+ }
autoexecJobService.getAutoexecJobDetail(jobVo);
if (CollectionUtils.isNotEmpty(jobVo.getPhaseList())) {
new AutoexecJobAuthActionManager.Builder().addReFireJob().build().setAutoexecJobAction(jobVo);
@@ -141,7 +151,7 @@ public class AutoexecJobReFireHandler extends AutoexecJobActionHandlerBase {
//put("phaseSort", jobVo.getCurrentGroupSort());
}});
- HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000).sendRequest();
+ HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
if (StringUtils.isNotBlank(requestUtil.getError())) {
throw new RunnerHttpRequestException(url + ":" + requestUtil.getError());
}
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeOperationListHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeOperationListHandler.java
index 4060fe8cc57d28f095e0c0ab33d84762e6ff1870..b595cffb6b7bdfc57ab03e6b2c0c2f665f282bf9 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeOperationListHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeOperationListHandler.java
@@ -63,6 +63,7 @@ public class AutoexecJobNodeOperationListHandler extends AutoexecJobActionHandle
paramJson.put("resourceId", nodeVo.getResourceId());
paramJson.put("phase", nodeVo.getJobPhaseName());
paramJson.put("phaseId", nodeVo.getJobPhaseId());
+ paramJson.put("nodeId",nodeVo.getId());
paramJson.put("ip", nodeVo.getHost());
paramJson.put("port", nodeVo.getPort());
paramJson.put("runnerUrl", nodeVo.getRunnerUrl());
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeReFireHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeReFireHandler.java
index 444feacf5d9316378e414e36b7740fdf2100d36d..4e13f443e36dfe0df1a70f9ed454eac64fb3fd04 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeReFireHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeReFireHandler.java
@@ -19,8 +19,10 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import neatlogic.framework.autoexec.constvalue.ExecMode;
import neatlogic.framework.autoexec.constvalue.JobAction;
+import neatlogic.framework.autoexec.constvalue.JobNodeStatus;
import neatlogic.framework.autoexec.constvalue.JobPhaseStatus;
import neatlogic.framework.autoexec.dao.mapper.AutoexecJobMapper;
+import neatlogic.framework.autoexec.dto.job.AutoexecJobGroupVo;
import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseNodeVo;
import neatlogic.framework.autoexec.dto.job.AutoexecJobPhaseVo;
import neatlogic.framework.autoexec.dto.job.AutoexecJobVo;
@@ -61,13 +63,13 @@ public class AutoexecJobNodeReFireHandler extends AutoexecJobActionHandlerBase {
currentPhaseIdValid(jobVo);
JSONObject jsonObj = jobVo.getActionParam();
List resourceIdList = JSONObject.parseArray(jsonObj.getJSONArray("resourceIdList").toJSONString(), Long.class);
- if(CollectionUtils.isEmpty(resourceIdList)){
+ if (CollectionUtils.isEmpty(resourceIdList)) {
throw new ParamIrregularException("resourceIdList");
}
List nodeVoList;
if (Objects.equals(jobVo.getCurrentPhase().getExecMode(), ExecMode.SQL.getValue())) {
JSONArray sqlIdArray = jobVo.getActionParam().getJSONArray("sqlIdList");
- if(CollectionUtils.isEmpty(sqlIdArray)){
+ if (CollectionUtils.isEmpty(sqlIdArray)) {
throw new ParamIrregularException("sqlIdList");
}
IAutoexecJobSource jobSource = AutoexecJobSourceFactory.getEnumInstance(jobVo.getSource());
@@ -76,10 +78,10 @@ public class AutoexecJobNodeReFireHandler extends AutoexecJobActionHandlerBase {
}
nodeVoList = AutoexecJobSourceTypeHandlerFactory.getAction(jobSource.getType()).getJobNodeListBySqlIdList(sqlIdArray.toJavaList(Long.class));
jobVo.setJobPhaseNodeSqlList(nodeVoList);
- }else {
+ } else {
nodeVoList = autoexecJobMapper.getJobPhaseNodeListByJobPhaseIdAndResourceIdList(jobVo.getCurrentPhaseId(), resourceIdList);
- //重置节点开始和结束时间,以防 失败节点直接"重跑"导致耗时异常
- autoexecJobMapper.updateJobPhaseNodeResetStartTimeAndEndTimeByNodeIdList(nodeVoList.stream().map(AutoexecJobPhaseNodeVo::getId).collect(Collectors.toList()));
+ List nodeIdList = nodeVoList.stream().map(AutoexecJobPhaseNodeVo::getId).collect(Collectors.toList());
+ autoexecJobMapper.updateJobPhaseNodeListStatus(nodeIdList, JobNodeStatus.WAITING.getValue());
}
jobVo.setExecuteJobNodeVoList(nodeVoList);
//校验是否和当前phaseId一致
@@ -100,10 +102,12 @@ public class AutoexecJobNodeReFireHandler extends AutoexecJobActionHandlerBase {
public JSONObject doMyService(AutoexecJobVo jobVo) {
//重跑单个节点无需激活下个phase
jobVo.setIsNoFireNext(1);
- //跟新phase状态为running
+ jobVo.setIsFirstFire(0);
AutoexecJobPhaseVo phaseVo = jobVo.getCurrentPhase();
- phaseVo.setStatus(JobPhaseStatus.RUNNING.getValue());
+ phaseVo.setStatus(JobPhaseStatus.WAITING.getValue());
autoexecJobMapper.updateJobPhaseStatus(phaseVo);
+ AutoexecJobGroupVo jobGroupVo = autoexecJobMapper.getJobGroupById(phaseVo.getGroupId());
+ jobVo.setExecuteJobGroupVo(jobGroupVo);
jobVo.setExecuteJobPhaseList(Collections.singletonList(phaseVo));
autoexecJobService.executeNode(jobVo);
return null;
diff --git a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeSubmitWaitInputHandler.java b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeSubmitWaitInputHandler.java
index 8e554d217879697120308a2d50d975e2d88ab3ae..6967acb9fc07d323f04b7dc6e61aa3991c9c23a1 100644
--- a/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeSubmitWaitInputHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/action/handler/node/AutoexecJobNodeSubmitWaitInputHandler.java
@@ -92,7 +92,7 @@ public class AutoexecJobNodeSubmitWaitInputHandler extends AutoexecJobActionHand
}
String url = String.format("%s/api/rest/job/phase/node/submit/waitInput", nodeVo.getRunnerUrl());
String result = HttpRequestUtil.post(url)
- .setPayload(paramObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000)
+ .setPayload(paramObj.toJSONString()).setAuthType(AuthenticateType.BUILDIN)
.sendRequest().getError();
if (StringUtils.isNotBlank(result)) {
throw new RunnerHttpRequestException(url + ":" + result);
diff --git a/src/main/java/neatlogic/module/autoexec/job/source/action/AutoexecJobSourceTypeHandler.java b/src/main/java/neatlogic/module/autoexec/job/source/action/AutoexecJobSourceTypeHandler.java
index d0d4d3a44abc8e340191ef43a69a3f03c81ddea7..1994ab46a32d7bb24396559890802e7cca15b8e5 100644
--- a/src/main/java/neatlogic/module/autoexec/job/source/action/AutoexecJobSourceTypeHandler.java
+++ b/src/main/java/neatlogic/module/autoexec/job/source/action/AutoexecJobSourceTypeHandler.java
@@ -292,7 +292,7 @@ public class AutoexecJobSourceTypeHandler extends AutoexecJobSourceTypeHandlerBa
throw new RunnerGroupRunnerNotFoundException(networkVo.getGroupId());
}
if (CollectionUtils.isEmpty(groupVo.getRunnerMapList())) {
- throw new RunnerGroupRunnerNotFoundException(groupVo.getName(), networkVo.getName() + "(" + networkVo.getGroupId() + ") ");
+ throw new RunnerGroupRunnerNotFoundException(groupVo.getName(), networkVo.getNetworkIp());
}
runnerMapVos = groupVo.getRunnerMapList();
break;
diff --git a/src/main/java/neatlogic/module/autoexec/process/stephandler/component/CreateJobProcessComponent.java b/src/main/java/neatlogic/module/autoexec/process/stephandler/component/CreateJobProcessComponent.java
index 13eeb3ade665ca4fb898adc0045b5071514ecff1..60e683d0c9a11112cf06b63cd40b2735b98147da 100644
--- a/src/main/java/neatlogic/module/autoexec/process/stephandler/component/CreateJobProcessComponent.java
+++ b/src/main/java/neatlogic/module/autoexec/process/stephandler/component/CreateJobProcessComponent.java
@@ -299,7 +299,6 @@ public class CreateJobProcessComponent extends ProcessStepHandlerBase {
jobVo.setInvokeId(processTaskStepVo.getId());
jobVo.setRouteId(processTaskStepVo.getId().toString());
jobVo.setSource(AutoExecJobProcessSource.ITSM.getValue());
- jobVo.setIsFirstFire(1);
jobVo.setAssignExecUser(SystemUser.SYSTEM.getUserUuid());
try {
autoexecJobActionService.validateCreateJob(jobVo);
diff --git a/src/main/java/neatlogic/module/autoexec/schedule/plugin/AutoexecScheduleJob.java b/src/main/java/neatlogic/module/autoexec/schedule/plugin/AutoexecScheduleJob.java
index 43615975ca4d0fe6b26d39f19003c43570c4621d..1e3f3ba2767b159682da69e955161a42e0a374f8 100644
--- a/src/main/java/neatlogic/module/autoexec/schedule/plugin/AutoexecScheduleJob.java
+++ b/src/main/java/neatlogic/module/autoexec/schedule/plugin/AutoexecScheduleJob.java
@@ -142,7 +142,6 @@ public class AutoexecScheduleJob extends JobBase {
jobVo.setInvokeId(autoexecScheduleVo.getId());
jobVo.setRouteId(autoexecScheduleVo.getId().toString());
jobVo.setOperationType(CombopOperationType.COMBOP.getValue());
- jobVo.setIsFirstFire(1);
UserVo fcuVo = userMapper.getUserByUuid(autoexecScheduleVo.getFcu());
AuthenticationInfoVo authenticationInfoVo = authenticationInfoService.getAuthenticationInfo(autoexecScheduleVo.getFcu());
UserContext.init(fcuVo, authenticationInfoVo, SystemUser.SYSTEM.getTimezone());
diff --git a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobActionServiceImpl.java b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobActionServiceImpl.java
index 5ceccc33f07e39634c023d985ff7790f7d4dedde..a390170e37a78424c4b1180ea8bee61934af275e 100644
--- a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobActionServiceImpl.java
+++ b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobActionServiceImpl.java
@@ -468,7 +468,6 @@ public class AutoexecJobActionServiceImpl implements AutoexecJobActionService, I
jobVo.setExecuteJobGroupVo(autoexecJobMapper.getJobGroupByJobIdAndSort(jobVo.getId(), 0));
autoexecJobService.getAutoexecJobDetail(jobVo);
IAutoexecJobActionHandler fireAction = AutoexecJobActionHandlerFactory.getAction(JobAction.FIRE.getValue());
- jobVo.setIsFirstFire(1);
fireAction.doService(jobVo);
}
}
@@ -498,7 +497,6 @@ public class AutoexecJobActionServiceImpl implements AutoexecJobActionService, I
if (jobVo.getTriggerType() == null || (Objects.equals(JobTriggerType.AUTO.getValue(), jobVo.getTriggerType()) && jobVo.getPlanStartTime().getTime() <= System.currentTimeMillis())) {
IAutoexecJobActionHandler fireAction = AutoexecJobActionHandlerFactory.getAction(JobAction.FIRE.getValue());
jobVo.setAction(JobAction.FIRE.getValue());
- jobVo.setIsFirstFire(1);
fireAction.doService(jobVo);
} else if (Objects.equals(JobTriggerType.AUTO.getValue(), jobVo.getTriggerType()) && jobVo.getPlanStartTime() != null) {
IJob jobHandler = SchedulerManager.getHandler(AutoexecJobAutoFireJob.class.getName());
diff --git a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobService.java b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobService.java
index cf5a615a8026c1930e2bdc10b3c799a38e22341d..7ba3a799dd07d0cfb9474792f89590707b8fa9fc 100644
--- a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobService.java
+++ b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobService.java
@@ -15,6 +15,7 @@ along with this program. If not, see .*/
package neatlogic.module.autoexec.service;
+import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import neatlogic.framework.autoexec.constvalue.JobAction;
import neatlogic.framework.autoexec.dto.AutoexecParamVo;
@@ -279,4 +280,18 @@ public interface AutoexecJobService {
*/
void updateJobPhaseNode(AutoexecJobVo jobVo, List resourceVoList, String userName, Long protocolId);
+
+ /**
+ * 作业第一次跑和重置后重跑作业更新排队等待
+ * @param jobVo 作业
+ */
+ void fireOrResetRefireWaiting(AutoexecJobVo jobVo);
+
+ /**
+ * 根据作业id获取作业等待详情
+ *
+ * @param jobId 作业id
+ */
+ JSONArray getAutoexecJobWaitingDetail(Long jobId);
+
}
diff --git a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobServiceImpl.java b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobServiceImpl.java
index e157aa916eec4fc5495eee2390ecb92c3d6dc4c7..bb11b626254b4882273523960c2a8af70aa7f264 100644
--- a/src/main/java/neatlogic/module/autoexec/service/AutoexecJobServiceImpl.java
+++ b/src/main/java/neatlogic/module/autoexec/service/AutoexecJobServiceImpl.java
@@ -52,12 +52,10 @@ import neatlogic.framework.dao.mapper.runner.RunnerMapper;
import neatlogic.framework.deploy.crossover.IDeploySqlCrossoverMapper;
import neatlogic.framework.dto.RestVo;
import neatlogic.framework.dto.runner.RunnerMapVo;
+import neatlogic.framework.exception.core.ApiRuntimeException;
import neatlogic.framework.exception.runner.*;
import neatlogic.framework.integration.authentication.enums.AuthenticateType;
-import neatlogic.framework.util.$;
-import neatlogic.framework.util.HttpRequestUtil;
-import neatlogic.framework.util.RestUtil;
-import neatlogic.framework.util.SnowflakeUtil;
+import neatlogic.framework.util.*;
import neatlogic.module.autoexec.dao.mapper.AutoexecCombopVersionMapper;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -970,7 +968,6 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC
}
-
/**
* 跟新作业阶段阶段
*
@@ -1268,8 +1265,15 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC
autoexecJobVo.setConfigStr(jobContent.getContent());
List statusList = new ArrayList<>();
String url = paramJson.getString("runnerUrl") + "/api/rest/job/phase/node/status/get";
- JSONObject statusJson = JSONObject.parseObject(AutoexecUtil.requestRunner(url, paramJson));
- AutoexecJobPhaseNodeVo nodeVo = new AutoexecJobPhaseNodeVo(statusJson);
+ JSONObject statusJson = null;
+ AutoexecJobPhaseNodeVo nodeVo = autoexecJobMapper.getJobPhaseNodeInfoByJobNodeId(paramJson.getLong("nodeId"));
+ try {
+ statusJson = JSON.parseObject(AutoexecUtil.requestRunner(url, paramJson));
+ } catch (Exception ignored) {
+ //ignored
+ }
+
+
if (isNeedOperationList) {
IAutoexecJobSource jobSource = AutoexecJobSourceFactory.getEnumInstance(autoexecJobVo.getSource());
if (jobSource == null) {
@@ -1324,6 +1328,10 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC
}
}
+ if (MapUtils.isNotEmpty(statusJson)) {
+ nodeVo.setStatus(statusJson.getString("status"));
+ nodeVo.setInteractStr(statusJson.getString("interact"));
+ }
nodeVo.setOperationStatusVoList(statusList.stream().sorted(Comparator.comparing(AutoexecJobPhaseNodeOperationStatusVo::getSort)).collect(toList()));
}
return nodeVo;
@@ -1351,6 +1359,8 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC
finalStatus = JobNodeStatus.ABORTED.getValue();
} else if (statusList.contains(JobNodeStatus.PAUSED.getValue())) {
finalStatus = JobNodeStatus.PAUSED.getValue();
+ } else if (statusList.contains(JobNodeStatus.WAITING.getValue())) {
+ finalStatus = JobNodeStatus.WAITING.getValue();
} else {
finalStatus = JobNodeStatus.PENDING.getValue();
}
@@ -1439,7 +1449,7 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC
paramJson.put("nodeStatus", nodeStatus);
for (RunnerMapVo runner : runnerVos) {
String url = runner.getUrl() + "api/rest/job/phase/node/status/update";
- HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).setConnectTimeout(5000).setReadTimeout(5000).sendRequest();
+ HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
if (StringUtils.isNotBlank(requestUtil.getError())) {
throw new RunnerHttpRequestException(url + ":" + requestUtil.getError());
}
@@ -1465,13 +1475,9 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC
throw new AutoexecJobRunnerNotFoundException(runner.getRunnerMapId().toString());
}
url = runner.getUrl() + "api/rest/health/check";
- HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setConnectTimeout(5000).setReadTimeout(5000).setPayload(new JSONObject().toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
+ HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(new JSONObject().toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
if (StringUtils.isNotBlank(requestUtil.getError())) {
- throw new RunnerConnectRefusedException(url, requestUtil.getError());
- }
- JSONObject resultJson = requestUtil.getResultJson();
- if (resultJson == null || !resultJson.containsKey("Status") || !"OK".equals(resultJson.getString("Status"))) {
- throw new RunnerConnectRefusedException(runner.getHost() + ":" + runner.getPort(), url + ", response code:" + requestUtil.getResponseCode());
+ throw new ApiRuntimeException((StringUtils.isNotBlank(requestUtil.getErrorMsg()) ? requestUtil.getErrorMsg() : requestUtil.getError()));
}
}
@@ -1520,59 +1526,63 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC
if (CollectionUtils.isEmpty(runnerVos)) {
throw new RunnerNotMatchException();
}
- jobVo.setStatus(JobStatus.RUNNING.getValue());
autoexecJobMapper.updateJobStatus(jobVo);
-
+ Integer isFirstFire = Arrays.asList(JobAction.FIRE.getValue(), JobAction.RESET_REFIRE.getValue()).contains(jobVo.getAction()) ? 1 : 0;
JSONObject paramJson = new JSONObject();
paramJson.put("jobId", jobVo.getId());
paramJson.put("tenant", TenantContext.get().getTenantUuid());
paramJson.put("isNoFireNext", jobVo.getIsNoFireNext());
- paramJson.put("isFirstFire", jobVo.getIsFirstFire());
+ paramJson.put("isFirstFire", isFirstFire);
+ if (jobVo.getExecuteJobGroupVo() == null) {
+ throw new AutoexecJobGroupNotFoundException(jobVo.getId());
+ }
if (CollectionUtils.isNotEmpty(jobVo.getExecuteJobPhaseList())) {
paramJson.put("jobPhaseNameList", jobVo.getExecuteJobPhaseList().stream().map(AutoexecJobPhaseVo::getName).collect(Collectors.toList()));
}
- if (jobVo.getExecuteJobGroupVo() != null) {
- paramJson.put("jobGroupIdList", Collections.singletonList(jobVo.getExecuteJobGroupVo().getSort()));
- }
+
+ paramJson.put("jobGroupSortList", Collections.singletonList(jobVo.getExecuteJobGroupVo().getSort()));
if (jobVo.getCurrentPhase() != null && Objects.equals(jobVo.getCurrentPhase().getExecMode(), ExecMode.SQL.getValue())) {
paramJson.put("jobPhaseNodeSqlList", jobVo.getJobPhaseNodeSqlList());
} else {
paramJson.put("jobPhaseResourceIdList", jobVo.getExecuteResourceIdList());
}
- RestVo restVo = null;
- String result = StringUtils.EMPTY;
- String url = StringUtils.EMPTY;
runnerVos = runnerVos.stream().filter(o -> StringUtils.isNotBlank(o.getUrl())).collect(collectingAndThen(toCollection(() -> new TreeSet<>(Comparator.comparing(RunnerMapVo::getUrl))), ArrayList::new));
checkRunnerHealth(runnerVos);
- try {
- Long execid = SnowflakeUtil.uniqueLong();
- for (RunnerMapVo runner : runnerVos) {
- jobVo.getEnvironment().put("RUNNER_ID", runner.getRunnerMapId());
- url = runner.getUrl() + "api/rest/job/exec";
- JSONObject passThroughEnv = jobVo.getPassThroughEnv();
- passThroughEnv.put("runnerId", runner.getRunnerMapId());
- if (jobVo.getExecuteJobGroupVo() != null) {
- passThroughEnv.put("groupSort", jobVo.getExecuteJobGroupVo().getSort());
- }
- if (CollectionUtils.isNotEmpty(jobVo.getExecuteJobPhaseList())) {
- passThroughEnv.put("phaseSort", jobVo.getExecuteJobPhaseList().get(0).getSort());
- }
- passThroughEnv.put("isFirstFire", jobVo.getIsFirstFire());
- passThroughEnv.put("EXECUSER_TOKEN", userMapper.getUserTokenByUser(UserContext.get().getUserId()));
- paramJson.put("passThroughEnv", passThroughEnv);
- paramJson.put("environment", jobVo.getEnvironment());
- paramJson.put("execid", String.valueOf(execid));
- restVo = new RestVo.Builder(url, AuthenticateType.BUILDIN.getValue()).setPayload(paramJson).build();
- result = RestUtil.sendPostRequest(restVo);
- JSONObject resultJson = JSONObject.parseObject(result);
- if (!resultJson.containsKey("Status") || !"OK".equals(resultJson.getString("Status"))) {
- throw new RunnerHttpRequestException(restVo.getUrl() + ":" + resultJson.getString("Message"));
- }
+ Long execid = SnowflakeUtil.uniqueLong();
+ for (RunnerMapVo runner : runnerVos) {
+ jobVo.getEnvironment().put("RUNNER_ID", runner.getRunnerMapId());
+ String url = runner.getUrl() + "api/rest/job/exec";
+ JSONObject passThroughEnv = jobVo.getPassThroughEnv();
+ passThroughEnv.put("runnerId", runner.getRunnerMapId());
+ passThroughEnv.put("groupSort", jobVo.getExecuteJobGroupVo().getSort());
+ if (CollectionUtils.isNotEmpty(jobVo.getExecuteJobPhaseList())) {
+ passThroughEnv.put("phaseSort", jobVo.getExecuteJobPhaseList().get(0).getSort());
}
- } catch (Exception ex) {
- logger.error(ex.getMessage(), ex);
- throw new RunnerConnectRefusedException(url + " " + result);
+ passThroughEnv.put("isFirstFire", isFirstFire);
+ passThroughEnv.put("EXECUSER_TOKEN", userMapper.getUserTokenByUser(UserContext.get().getUserId()));
+ paramJson.put("passThroughEnv", passThroughEnv);
+ paramJson.put("environment", jobVo.getEnvironment());
+ paramJson.put("execid", String.valueOf(execid));
+ HttpRequestUtil httpRequestUtil = HttpRequestUtil.post(url).setPayload(paramJson.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
+
+ if (StringUtils.isNotBlank(httpRequestUtil.getError())) {
+ throw new ApiRuntimeException((StringUtils.isNotBlank(httpRequestUtil.getErrorMsg()) ? httpRequestUtil.getErrorMsg() : httpRequestUtil.getError()));
+ }
+ }
+
+ }
+
+ @Override
+ public void fireOrResetRefireWaiting(AutoexecJobVo jobVo) {
+ jobVo.setStatus(JobStatus.WAITING.getValue());
+ autoexecJobMapper.updateJobStatus(jobVo);
+ //找到第一个组的第一个phase状态更新成排队中
+ AutoexecJobPhaseVo firstPhase = autoexecJobMapper.getJobFirstPhaseByGroupId(jobVo.getExecuteJobGroupVo().getId());
+ autoexecJobMapper.updateJobPhaseStatusByPhaseIdList(Collections.singletonList(firstPhase.getId()), JobPhaseStatus.WAITING.getValue());
+ List jobPhaseRunnerVos = autoexecJobMapper.getJobPhaseRunnerByJobIdAndPhaseIdList(jobVo.getId(), Collections.singletonList(firstPhase.getId()));
+ for (AutoexecJobPhaseRunnerVo jobPhaseRunnerVo : jobPhaseRunnerVos) {
+ autoexecJobMapper.updateJobPhaseRunnerStatus(Collections.singletonList(firstPhase.getId()), jobPhaseRunnerVo.getRunnerMapId(), JobPhaseStatus.WAITING.getValue());
}
}
@@ -1609,4 +1619,51 @@ public class AutoexecJobServiceImpl implements AutoexecJobService, IAutoexecJobC
refireAction.doService(job);
}
}
+
+ /**
+ * 根据作业id获取作业等待详情
+ *
+ * @param jobId 作业id
+ */
+ @Override
+ public JSONArray getAutoexecJobWaitingDetail(Long jobId) {
+ JSONArray queueStatusArray = new JSONArray();
+ //作业基本信息
+ AutoexecJobVo jobVo = autoexecJobMapper.getJobInfo(jobId);
+ if (jobVo == null) {
+ throw new AutoexecJobNotFoundException(jobId.toString());
+ }
+ List runnerVos = autoexecJobMapper.getJobPhaseRunnerMapByJobId(jobId);
+
+ JSONObject params = new JSONObject();
+ params.put("jobId", jobId);
+ checkRunnerHealth(runnerVos);
+ for (RunnerMapVo runner : runnerVos) {
+ String url = runner.getUrl() + "api/rest/job/waiting/detail/get";
+ HttpRequestUtil requestUtil = HttpRequestUtil.post(url).setPayload(params.toJSONString()).setAuthType(AuthenticateType.BUILDIN).sendRequest();
+ if (StringUtils.isNotBlank(requestUtil.getError())) {
+ throw new RunnerHttpRequestException(url + ":" + requestUtil.getError());
+ }
+ JSONObject resultJson = requestUtil.getResultJson();
+ JSONObject queueJson = resultJson.getJSONObject("Return");
+ if (MapUtils.isNotEmpty(queueJson)) {
+ for (Map.Entry entry : queueJson.entrySet()) {
+ String key = entry.getKey();
+ if (Objects.equals(key, "count")) {
+ continue;
+ }
+ JSONObject value = JSON.parseObject(entry.getValue().toString());
+ JSONObject queueStatus = new JSONObject();
+ queueStatus.put("sort", key + "/" + queueJson.getString("count"));
+ queueStatus.put("command", value.getString("command"));
+ queueStatus.put("fcd", TimeUtil.convertDateToString(new Date(value.getLong("fcd")), TimeUtil.YYYY_MM_DD_HH_MM_SS));
+ queueStatus.put("runner", runner.getName() + ":" + runner.getPort());
+ queueStatus.put("runnerId", runner.getId());
+ queueStatus.put("groupSortList",value.getJSONArray("groupSortList"));
+ queueStatusArray.add(queueStatus);
+ }
+ }
+ }
+ return queueStatusArray;
+ }
}
diff --git a/src/main/java/neatlogic/module/autoexec/stephandler/component/AutoexecProcessComponent.java b/src/main/java/neatlogic/module/autoexec/stephandler/component/AutoexecProcessComponent.java
index 8d4e3e833952edba4334843fabbecfefc578f441..999feb3996e906b4ed0636c068aefca3e13327a9 100644
--- a/src/main/java/neatlogic/module/autoexec/stephandler/component/AutoexecProcessComponent.java
+++ b/src/main/java/neatlogic/module/autoexec/stephandler/component/AutoexecProcessComponent.java
@@ -247,7 +247,6 @@ public class AutoexecProcessComponent extends ProcessStepHandlerBase {
jobObj.put("operationType", jobVo.getOperationType());
jobObj.put("invokeId", jobVo.getInvokeId());
jobObj.put("routeId", jobVo.getRouteId());
- jobObj.put("isFirstFire", jobVo.getIsFirstFire());
jobObj.put("assignExecUser", jobVo.getAssignExecUser());
logger.error(jobObj.toJSONString());
JSONObject errorMessageObj = new JSONObject();
@@ -490,7 +489,6 @@ public class AutoexecProcessComponent extends ProcessStepHandlerBase {
jobVo.setOperationType(CombopOperationType.COMBOP.getValue());
jobVo.setInvokeId(currentProcessTaskStepVo.getId());
jobVo.setRouteId(currentProcessTaskStepVo.getId().toString());
- jobVo.setIsFirstFire(1);
jobVo.setAssignExecUser(SystemUser.SYSTEM.getUserUuid());
return jobVo;
}
@@ -538,7 +536,6 @@ public class AutoexecProcessComponent extends ProcessStepHandlerBase {
jobVo.setOperationType(CombopOperationType.COMBOP.getValue());
jobVo.setInvokeId(currentProcessTaskStepVo.getId());
jobVo.setRouteId(currentProcessTaskStepVo.getId().toString());
- jobVo.setIsFirstFire(1);
jobVo.setAssignExecUser(SystemUser.SYSTEM.getUserUuid());
JSONObject tbodyObj = tbodyList.getJSONObject(index);
// 场景
@@ -1252,7 +1249,6 @@ public class AutoexecProcessComponent extends ProcessStepHandlerBase {
jobVo.setOperationType(CombopOperationType.COMBOP.getValue());
jobVo.setInvokeId(currentProcessTaskStepVo.getId());
jobVo.setRouteId(currentProcessTaskStepVo.getId().toString());
- jobVo.setIsFirstFire(1);
jobVo.setAssignExecUser(SystemUser.SYSTEM.getUserUuid());
return jobVo;
}
diff --git a/src/main/resources/neatlogic/resources/autoexec/changelog/2025-04-10/neatlogic_tenant.sql b/src/main/resources/neatlogic/resources/autoexec/changelog/2025-04-10/neatlogic_tenant.sql
new file mode 100644
index 0000000000000000000000000000000000000000..5c96246d5c731a65ac68ed5953d52c544133cdbd
--- /dev/null
+++ b/src/main/resources/neatlogic/resources/autoexec/changelog/2025-04-10/neatlogic_tenant.sql
@@ -0,0 +1,8 @@
+ALTER TABLE `autoexec_job`
+MODIFY COLUMN `status` enum('running','pausing','paused','completed','pending','aborting','aborted','succeed','failed','waitInput','ready','revoked','saved','checked','waiting') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '作业状态' AFTER `name`;
+
+ALTER TABLE `autoexec_job_phase_node`
+MODIFY COLUMN `status` enum('succeed','pending','failed','ignored','running','aborted','aborting','waitInput','pausing','paused','invalid','waiting') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '状态' AFTER `end_time`;
+
+ALTER TABLE `autoexec_job_phase_runner`
+MODIFY COLUMN `status` enum('pending','completed','failed','paused','aborted','running','aborting','pausing','waitInput','ignored','waiting') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT 'pending' COMMENT '状态' AFTER `runner_map_id`;
\ No newline at end of file