diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java
new file mode 100755
index 0000000000000000000000000000000000000000..ab7ee80be9da7be92b4a5345ba52ec586f732a55
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java
@@ -0,0 +1,70 @@
+/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+package com.neatlogic.autoexecrunner.api.job;
+
+import com.alibaba.fastjson.JSONObject;
+import com.neatlogic.autoexecrunner.constvalue.ApiParamType;
+import com.neatlogic.autoexecrunner.dto.CommandVo;
+import com.neatlogic.autoexecrunner.restful.annotation.Input;
+import com.neatlogic.autoexecrunner.restful.annotation.Param;
+import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+
+@Component
+public class GetJobWaitingDetailApi extends PrivateApiComponentBase {
+ @Override
+ public String getName() {
+ return "获取作业排队状态";
+ }
+
+ @Input({
+ @Param(name = "jobId", type = ApiParamType.LONG, isRequired = true, desc = "作业id"),
+ @Param(name = "groupSort", type = ApiParamType.LONG, desc = "阶段组id"),
+ @Param(name = "phaseName", type = ApiParamType.STRING, desc = "阶段名"),
+ })
+ @Override
+ public Object myDoService(JSONObject jsonObj) throws Exception {
+ String jobId = jsonObj.getString("jobId");
+ Integer groupSort = jsonObj.getInteger("groupSort");
+ List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort();
+ JSONObject result = new JSONObject();
+ for (int i = 0; i < list.size(); i++) {
+ CommandVo commandVo = list.get(i);
+ JSONObject commandJson = new JSONObject();
+ commandJson.put("groupSortList",commandVo.getJobGroupSortList());
+ commandJson.put("nodeSqlList",commandVo.getJobPhaseNodeSqlList());
+ commandJson.put("phaseNameList",commandVo.getJobPhaseNameList());
+ commandJson.put("resourceIdList",commandVo.getJobPhaseResourceIdList());
+ commandJson.put("fcd",commandVo.getFcd().getTime());
+ commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','")));
+ if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupSortList().contains(groupSort))) {
+ result.put(String.valueOf(i + 1), commandJson);
+ }
+ }
+ result.put("count", AutoexecQueueThread.getBlockingQueueSize());
+ return result;
+ }
+
+ @Override
+ public String getToken() {
+ return "/job/waiting/detail/get";
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java
index 8c46885014b774d07409aad4fc02a74658a1a93f..b31930f1308d987663035a75dcd7c4e449080693 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java
@@ -20,6 +20,7 @@ import com.neatlogic.autoexecrunner.constvalue.JobAction;
import com.neatlogic.autoexecrunner.core.ExecProcessCommand;
import com.neatlogic.autoexecrunner.dto.CommandVo;
import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool;
import org.springframework.stereotype.Component;
@@ -45,13 +46,16 @@ public class JobAbortApi extends PrivateApiComponentBase {
//set command
List commandList = Arrays.asList("autoexec", "--jobid", commandVo.getJobId(), "--execuser", UserContext.get().getUserUuid(), "--abort");
commandList = new ArrayList<>(commandList);
- if(commandVo.getPassThroughEnv() != null){
+ if (commandVo.getPassThroughEnv() != null) {
commandList.add("--passthroughenv");
commandList.add(commandVo.getPassThroughEnv().toString());
}
commandVo.setCommandList(commandList);
- ExecProcessCommand processCommand = new ExecProcessCommand(commandVo);
- CommonThreadPool.execute(processCommand);
+ //队列里不存在才执行abort命令
+ if (!AutoexecQueueThread.removeCommand(commandVo)) {
+ ExecProcessCommand processCommand = new ExecProcessCommand(commandVo);
+ CommonThreadPool.execute(processCommand);
+ }
return null;
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
index 7622d2e0e5738018c0d57e2239efb00673e73903..d9c534fb5306e5e38ba2e34327fa3b06974e5ece 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
@@ -17,12 +17,14 @@ package com.neatlogic.autoexecrunner.api.job;
import com.alibaba.fastjson.JSONObject;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext;
import com.neatlogic.autoexecrunner.constvalue.JobAction;
-import com.neatlogic.autoexecrunner.core.ExecProcessCommand;
import com.neatlogic.autoexecrunner.dto.CommandVo;
+import com.neatlogic.autoexecrunner.exception.job.JobQueueFullException;
import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase;
-import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
import com.neatlogic.autoexecrunner.util.FileUtil;
import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@@ -36,6 +38,8 @@ import java.util.stream.Collectors;
**/
@Component
public class JobExecApi extends PrivateApiComponentBase {
+ private static final Logger logger = LoggerFactory.getLogger(JobExecApi.class);
+
@Override
public String getName() {
return "创建执行作业剧本进程";
@@ -65,9 +69,9 @@ public class JobExecApi extends PrivateApiComponentBase {
commandList.add("--passthroughenv");
commandList.add(commandVo.getPassThroughEnv().toString());
}
- if (CollectionUtils.isNotEmpty(commandVo.getJobGroupIdList())) {
+ if (CollectionUtils.isNotEmpty(commandVo.getJobGroupSortList())) {
commandList.add("--phasegroups");
- commandList.add(commandVo.getJobGroupIdList().stream().map(Object::toString).collect(Collectors.joining("','")));
+ commandList.add(commandVo.getJobGroupSortList().stream().map(Object::toString).collect(Collectors.joining("','")));
}
if (CollectionUtils.isNotEmpty(commandVo.getJobPhaseNameList())) {
commandList.add("--phases");
@@ -83,8 +87,11 @@ public class JobExecApi extends PrivateApiComponentBase {
}
commandList.add("--reuseconslog");
commandVo.setCommandList(commandList);
- ExecProcessCommand processCommand = new ExecProcessCommand(commandVo);
- CommonThreadPool.execute(processCommand);
+ if (AutoexecQueueThread.addCommand(commandVo) == -1) {
+ throw new JobQueueFullException();
+ } else if (AutoexecQueueThread.addCommand(commandVo) == 0) {
+ logger.debug("队列里已存在相同的执行命令:{}", String.join(",", commandList));
+ }
return null;
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java
new file mode 100644
index 0000000000000000000000000000000000000000..87e8f72db534e02dbba347f2a150937903736211
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java
@@ -0,0 +1,195 @@
+/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.asynchronization;
+
+
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext;
+import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+public abstract class NeatLogicThread implements Runnable, Comparable {
+ private static final Logger logger = LoggerFactory.getLogger(NeatLogicThread.class);
+ protected UserContext userContext;
+ protected RequestContext requestContext;
+ private final String tenantUuid;
+ private String threadName;
+ private boolean isUnique = false;
+ /* For generating thread ID */
+ private long id;
+ private int priority = 3;//默认优先级是3,数字越低优先级越高
+ private Semaphore lock;//用于hold住其他异步线程,控制两个异步线程的先后顺序
+ private CountDownLatch countDownLatch;//用于hold住主线程,这里只需要countdown,不需要等待
+ private boolean needAwaitAdvance = true;// 是否需要等待所有模块加载完成后再任务
+
+ @Override
+ public int compareTo(NeatLogicThread other) {
+ return Integer.compare(this.priority, other.priority); // 优先级高的先出队,priority越小代表优先级越高
+ }
+
+ public Semaphore getLock() {
+ return lock;
+ }
+
+ public void setLock(Semaphore lock) {
+ this.lock = lock;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public int getPriority() {
+ if (priority <= 1) {
+ priority = 1;
+ }
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+
+
+ /*public NeatLogicThread() {
+ userContext = UserContext.get();
+ tenantContext = TenantContext.get();
+ inputFromContext = InputFromContext.get();
+ }*/
+
+ /*public NeatLogicThread(UserContext _userContext, TenantContext _tenantContext) {
+ if (_userContext != null) {
+ userContext = _userContext.copy();
+ }
+ tenantUuid = _tenantContext.getTenantUuid();
+ activeModuleList = _tenantContext.getActiveModuleList();
+ inputFromContext = InputFromContext.get();
+ }*/
+
+
+ public NeatLogicThread(String _threadName) {
+ UserContext tmp = UserContext.get();
+ if (tmp != null) {
+ userContext = tmp.copy();
+ }
+ tenantUuid = TenantContext.get().getTenantUuid();
+ requestContext = RequestContext.get();
+ this.threadName = _threadName;
+ }
+
+ public NeatLogicThread(String _threadName, int priority) {
+ UserContext tmp = UserContext.get();
+ if (tmp != null) {
+ userContext = tmp.copy();
+ }
+ tenantUuid = TenantContext.get().getTenantUuid();
+ requestContext = RequestContext.get();
+ this.threadName = _threadName;
+ this.priority = priority;
+ }
+
+ public NeatLogicThread(String _threadName, boolean _isUnique) {
+ userContext = UserContext.get();
+ tenantUuid = TenantContext.get().getTenantUuid();
+ requestContext = RequestContext.get();
+ this.threadName = _threadName;
+ this.isUnique = _isUnique;
+ }
+
+ @Override
+ public final void run() {
+ TenantContext tenantContext = TenantContext.init(tenantUuid);
+ UserContext.init(userContext);
+ try {
+ String oldThreadName = Thread.currentThread().getName();
+ if (StringUtils.isNotBlank(threadName)) {
+ Thread.currentThread().setName(threadName);
+ }
+
+ if (this.lock != null) {
+ //System.out.println(this.getThreadName() + "尝试获取锁" + this.lock);
+ lock.acquire();
+ //System.out.println(this.getThreadName() + "成功获取锁" + this.lock);
+ }
+ execute();
+ Thread.currentThread().setName(oldThreadName);
+ } catch (ApiRuntimeException ex) {
+ logger.warn(ex.getMessage(), ex);
+ } catch (Exception ex) {
+ logger.error(ex.getMessage(), ex);
+ } finally {
+ if (this.lock != null) {
+ lock.release();
+ //System.out.println(this.getThreadName() + "成功释放锁" + this.lock);
+ }
+ if (countDownLatch != null) {
+ countDownLatch.countDown();
+ }
+ // 清除所有threadlocal
+ if (TenantContext.get() != null) {
+ TenantContext.get().release();
+ }
+ if (UserContext.get() != null) {
+ UserContext.get().release();
+ }
+ }
+ }
+
+ protected abstract void execute();
+
+ public void setUnique(boolean unique) {
+ isUnique = unique;
+ }
+
+ public boolean isUnique() {
+ return isUnique;
+ }
+
+ public void setThreadName(String threadName) {
+ this.threadName = threadName;
+ }
+
+ public String getThreadName() {
+ return threadName;
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ public boolean isNeedAwaitAdvance() {
+ return needAwaitAdvance;
+ }
+
+ public void setNeedAwaitAdvance(boolean needAwaitAdvance) {
+ this.needAwaitAdvance = needAwaitAdvance;
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..1af28a58d4593b3fc460cdaaf0ffa630d9f4c114
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.neatlogic.autoexecrunner.asynchronization.queue;
+
+import com.alibaba.fastjson.JSON;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
+import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class NeatLogicUniqueBlockingQueue {
+ private static final Logger logger = LoggerFactory.getLogger(NeatLogicUniqueBlockingQueue.class);
+ private final BlockingQueue> blockingQueue;
+ private final ConcurrentHashMap taskMap; // 用于去重
+
+ public NeatLogicUniqueBlockingQueue(int capacity) {
+ this.blockingQueue = new LinkedBlockingQueue<>(capacity);
+ this.taskMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * 添加队列成员
+ * 返回-1:队列已满,1:添加成功,0:重复添加
+ */
+ public int offer(T t) {
+ Task task = new Task<>(t);
+ // 保证任务唯一性
+ if (taskMap.putIfAbsent(task.getUniqueKey(), Boolean.TRUE) == null) {
+ logger.debug("====TagentUpdateInfo-addQueue:" + JSON.toJSONString(task));
+ // 如果任务是新任务,放入队列
+ boolean added = blockingQueue.offer(task);
+ if (!added) {
+ // 如果队列已满,移除任务标记
+ taskMap.remove(task.getUniqueKey());
+ logger.error("Queue is full!");
+ return -1;
+ }
+ return 1;
+ } else {
+ if (t != null) {
+ logger.debug("NeatLogicUniqueBlockingQueue repeat: {}", JSON.toJSONString(t));
+ }
+ return 0;
+ }
+ }
+
+ public boolean remove(Predicate> condition) {
+ for (Task task : blockingQueue) {
+ if (condition.test(task)) {
+ boolean removed = blockingQueue.remove(task);
+ if (removed) {
+ taskMap.remove(task.getUniqueKey());
+ logger.debug("Removed task: {}", JSON.toJSONString(task));
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public T take() throws InterruptedException {
+ Task task = blockingQueue.take(); // 阻塞式获取任务
+ taskMap.remove(task.getUniqueKey()); // 移除已处理任务的唯一标记
+ TenantContext tenantContext = TenantContext.get();
+ if (tenantContext != null) {
+ tenantContext.switchTenant(task.getTenantUuid());
+ } else {
+ TenantContext.init(task.getTenantUuid());
+ }
+ return task.getT();
+ }
+
+ public static class Task {
+ private final T t;
+ private final String tenantUuid;
+
+ public Task(T t) {
+ this.t = t;
+ this.tenantUuid = TenantContext.get().getTenantUuid();
+ }
+
+ public T getT() {
+ return t;
+ }
+
+ public String getTenantUuid() {
+ return tenantUuid;
+ }
+
+ public String getUniqueKey() {
+ // 唯一标识任务的 key,可根据需求定义,例如 `tenantUuid-t.hashCode`
+ //System.out.println(tenantUuid + "-" + t.hashCode());
+ return tenantUuid + "-" + t.hashCode();
+ }
+ }
+
+ public int size() {
+ return blockingQueue.size();
+ }
+
+ public List getQueue() {
+ List list = new ArrayList<>();
+ List> taskList = new ArrayList<>(blockingQueue);
+ if (CollectionUtils.isNotEmpty(taskList)) {
+ list = taskList.stream().map(Task::getT).collect(Collectors.toList());
+ }
+ return list;
+ }
+
+// public static void main(String[] args) throws InterruptedException {
+// NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1);
+//
+// // 模拟任务插入
+// UserSessionVo a = new UserSessionVo();
+// a.setToken("1111");
+// System.out.println(queue.offer(a)); // 返回 true,任务插入成功
+// UserSessionVo b = new UserSessionVo();
+// b.setToken("222");
+// System.out.println(queue.offer(b)); // 返回 false,任务已存在
+//
+// // 模拟任务消费
+// UserSessionVo task = queue.take(); // 消费 "task1"
+// UserSessionVo task2 = queue.take(); // 消费 "task1"
+// UserSessionVo task3 = queue.take(); // 消费 "task1"
+// }
+}
+
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java
new file mode 100644
index 0000000000000000000000000000000000000000..efcb667942309451e39c85225a7d0b38c6fac09c
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java
@@ -0,0 +1,134 @@
+/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.asynchronization.threadlocal;
+
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * 保存请求信息
+ */
+public class RequestContext implements Serializable {
+ private static final ThreadLocal instance = new ThreadLocal<>();
+ private static final long serialVersionUID = -5420998728515359626L;
+ private String url;
+ private HttpServletRequest request;
+ private HttpServletResponse response;
+ //接口访问速率
+ private Double apiRate;
+ //租户接口访问总速率
+ private Double tenantRate;
+ //语言
+ Locale locale;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public HttpServletRequest getRequest() {
+ return request;
+ }
+
+ public void setRequest(HttpServletRequest request) {
+ this.request = request;
+ }
+
+ public HttpServletResponse getResponse() {
+ return response;
+ }
+
+ public void setResponse(HttpServletResponse response) {
+ this.response = response;
+ }
+
+ public Double getApiRate() {
+ return apiRate;
+ }
+
+ public void setApiRate(Double apiRate) {
+ this.apiRate = apiRate;
+ }
+
+
+ public Double getTenantRate() {
+ return tenantRate;
+ }
+
+ public void setTenantRate(Double tenantRate) {
+ this.tenantRate = tenantRate;
+ }
+
+ public Locale getLocale() {
+ return locale;
+ }
+
+ public void setLocale(Locale locale) {
+ this.locale = locale;
+ }
+
+ public static RequestContext init(RequestContext _requestContext) {
+ RequestContext context = new RequestContext();
+ if (_requestContext != null) {
+ context.setUrl(_requestContext.getUrl());
+ context.setLocale(_requestContext.getLocale());
+ }
+ instance.set(context);
+ return context;
+ }
+
+ public static RequestContext init(HttpServletRequest request, String url, HttpServletResponse response) {
+ RequestContext context = new RequestContext(request, url);
+ context.setResponse(response);
+ instance.set(context);
+ if (request.getCookies() != null && request.getCookies().length > 0) {
+ Optional languageCookie = Arrays.stream(request.getCookies()).filter(o -> Objects.equals(o.getName(), "neatlogic_language")).findFirst();
+ if (languageCookie.isPresent()) {
+ context.setLocale(new Locale(languageCookie.get().getValue()));
+ } else {
+ context.setLocale(Locale.getDefault());
+ }
+ }
+ return context;
+ }
+
+ private RequestContext() {
+
+ }
+
+ private RequestContext(HttpServletRequest request, String url) {
+ this.url = url;
+ this.request = request;
+ }
+
+ public static RequestContext get() {
+ return instance.get();
+ }
+
+ public void release() {
+ instance.remove();
+ }
+
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java
index 89bf0a29c00d17d22f285d2f01c4788ca8d799c5..f63f71a118d9b8e15c1640717dd39c22387be2c3 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java
@@ -27,19 +27,23 @@ public class UserContext implements Serializable {
private String timezone = "+8:00";
private String token;
private List roleUuidList = new ArrayList<>();
-
+
+ public UserContext copy() {
+ UserContext userContext = new UserContext();
+ userContext.setRequest(request);
+ userContext.setToken(token);
+ userContext.setTenant(tenant);
+ userContext.setUserName(userName);
+ userContext.setUserId(userId);
+ userContext.setUserUuid(userUuid);
+ userContext.setTimezone(timezone);
+ return userContext;
+ }
+
public static UserContext init(UserContext _userContext) {
UserContext context = new UserContext();
if (_userContext != null) {
- context.setUserId(_userContext.getUserId());
- context.setUserUuid(_userContext.getUserUuid());
- context.setUserName(_userContext.getUserName());
- context.setTenant(_userContext.getTenant());
- context.setTimezone(_userContext.getTimezone());
- context.setToken(_userContext.getToken());
- // context.setRequest(_userContext.getRequest());
- // context.setResponse(_userContext.getResponse());
- context.setRoleUuidList(_userContext.getRoleUuidList());
+ context = _userContext.copy();
}
instance.set(context);
return context;
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java
new file mode 100644
index 0000000000000000000000000000000000000000..b1f50a2141e583884980ad4f47c0acb9d30b76d8
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java
@@ -0,0 +1,144 @@
+/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.asynchronization.threadpool;
+
+
+import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread;
+import com.neatlogic.autoexecrunner.dto.ThreadPoolVo;
+import com.neatlogic.autoexecrunner.dto.ThreadTaskVo;
+import com.neatlogic.autoexecrunner.dto.ThreadVo;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class CachedThreadPool {
+ static int rank = 15;
+ static int cpu = Runtime.getRuntime().availableProcessors();
+ private static final Log logger = LogFactory.getLog(CachedThreadPool.class);
+ private static final Map threadTaskMap = new ConcurrentHashMap<>();
+ private static final Map threadMap = new ConcurrentHashMap<>();
+ private static final Set threadSet = new HashSet<>();
+ private static final PriorityBlockingQueue threadQueue = new PriorityBlockingQueue<>();
+
+ static class NeatLogicThreadFactory implements ThreadFactory {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r) {
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } finally {
+ threadMap.remove(this.getId());
+ }
+ }
+ };
+ threadMap.put(thread.getId(), new ThreadVo(thread.getId(), thread.getName()));
+ return thread;
+ }
+ }
+
+ private static final ThreadPoolExecutor mainThreadPool = new ThreadPoolExecutor(0, cpu * rank,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new NeatLogicThreadFactory(), new NeatLogicRejectHandler()) {
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t, r);
+ if (r instanceof NeatLogicThread) {
+ NeatLogicThread nt = (NeatLogicThread) r;
+ nt.setId(t.getId());
+ ThreadTaskVo threadVo = new ThreadTaskVo();
+ threadVo.setId(t.getId());
+ threadVo.setName(nt.getThreadName());
+ threadVo.setPoolName("main");
+ threadVo.setStartTime(new Date());
+ threadVo.setPriority(nt.getPriority());
+ threadTaskMap.put(nt.getId(), threadVo);
+ threadSet.add(nt.getThreadName());
+ }
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+
+ // 任务完成后从 activeTasks 中移除
+ if (r instanceof NeatLogicThread) {
+ NeatLogicThread task = (NeatLogicThread) r;
+ threadTaskMap.remove(task.getId());
+ threadSet.remove(task.getThreadName());
+ }
+ //尝试从队列中拿出任务处理
+ Runnable task = threadQueue.poll();
+ if (task != null) {
+ mainThreadPool.execute(task);
+ }
+ }
+ };
+
+ public static void execute(NeatLogicThread command, CountDownLatch countDownLatch) {
+ command.setCountDownLatch(countDownLatch);
+ execute(command);
+ }
+
+ public static void execute(NeatLogicThread command, Semaphore lock) {
+ command.setLock(lock);
+ execute(command);
+ }
+
+ public static void execute(NeatLogicThread command) {
+ try {
+ boolean isExists = command.isUnique() && StringUtils.isNotBlank(command.getThreadName()) && threadSet.contains(command.getThreadName());
+ if (!isExists) {
+ mainThreadPool.execute(command);
+ } else {
+ logger.warn(command.getThreadName() + " is running");
+ }
+ } catch (RejectedExecutionException ex) {
+ logger.error(ex.getMessage(), ex);
+ }
+ }
+
+
+ static class NeatLogicRejectHandler implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ //进入等待队列
+ if (r instanceof NeatLogicThread) {
+ threadQueue.offer((NeatLogicThread) r);
+ } else {
+ logger.error("线程池已满,非NeatLogicThread子类线程将被抛弃");
+ }
+ }
+ }
+
+
+ public static ThreadPoolVo getStatus() {
+ ThreadPoolVo threadPoolVo = new ThreadPoolVo();
+ List threadTasks = new ArrayList<>(threadTaskMap.values());
+ List threads = new ArrayList<>(threadMap.values());
+ threadPoolVo.setThreadTaskList(threadTasks);
+ threadPoolVo.setThreadList(threads);
+ threadPoolVo.setMaxThreadCount(cpu * rank);
+ threadPoolVo.setMainPoolSize(mainThreadPool.getPoolSize());
+ threadPoolVo.setMainActiveCount(mainThreadPool.getActiveCount());
+ threadPoolVo.setMainQueueSize(threadQueue.size());
+ return threadPoolVo;
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java
new file mode 100644
index 0000000000000000000000000000000000000000..c2b0f81972f100730597d0c3e4a72d8f34e0f0ab
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java
@@ -0,0 +1,70 @@
+package com.neatlogic.autoexecrunner.autoexec;
+
+import com.alibaba.fastjson.JSONObject;
+import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread;
+import com.neatlogic.autoexecrunner.dto.CommandVo;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+public class ProcessWaitTask extends NeatLogicThread {
+ private static final Logger logger = LoggerFactory.getLogger(ProcessWaitTask.class);
+ private final Process process;
+ private final CommandVo commandVo;
+ private final JSONObject payload;
+ private final String jobName;
+
+ public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload, String jobName) {
+ super("THREAD-AUTOEXEC-WAIT-" + jobName);
+ this.process = process;
+ this.commandVo = commandVo;
+ this.payload = payload;
+ this.jobName = jobName;
+ }
+
+
+ @Override
+ protected void execute() {
+ Long pid = null;
+ try {
+ int exitCode = process.waitFor();
+ int exitStatus = process.exitValue();
+ commandVo.setExitValue(exitStatus);
+ pid = getPid(process);
+ logger.debug("process[{}] finished,exitCode: {}", pid, exitCode);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error(String.format("thread interrupt: param:%s, errorMsg:%s", payload, e.getMessage()), e);
+ } finally {
+ // 确保关闭流
+ closeQuietly(process.getInputStream());
+ closeQuietly(process.getErrorStream());
+ closeQuietly(process.getOutputStream());
+ AutoexecQueueThread.removeProcess(process, jobName, pid);
+ }
+ }
+
+ // 兼容不同JDK版本的PID获取
+ private long getPid(Process p) {
+ try {
+ if (p.getClass().getName().contains("UNIXProcess")) {
+ Field pidField = p.getClass().getDeclaredField("pid");
+ pidField.setAccessible(true);
+ return pidField.getLong(p);
+ }
+ } catch (Exception e) {
+ // 忽略异常
+ }
+ return -1; // 未知PID
+ }
+
+ private void closeQuietly(java.io.Closeable c) {
+ try {
+ if (c != null) c.close();
+ } catch (IOException ignore) {
+ }
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
index 7a5eb5e723e3c27f375f30a27ccdca9d4548388f..77cfc7b7032855add7947f3040718718eea4b88c 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
@@ -37,6 +37,8 @@ public class Config {
private static String DATA_HOME;//文件根目录
private static String DEPLOY_HOME;//发布目录
private static String GITLAB_PASSWORD;// gitlab private_token
+ private static Integer SUBPROCESS_COMMAND_QUEUE_MAX_SIZE;//自动化作业命令等待队列的最大容量,超过此数量的新命令将被拒绝或丢弃
+ private static Integer SUBPROCESS_EXECUTION_MAX_CONCURRENT;//自动化作业最大并发子进程数,超过此数量的命令将进入等待队列
//neatlogic
private static String NEATLOGIC_ROOT;
@@ -168,6 +170,13 @@ public class Config {
return AUTOEXEC_TOKEN;
}
+ public static Integer SUBPROCESS_COMMAND_QUEUE_MAX_SIZE() {
+ return SUBPROCESS_COMMAND_QUEUE_MAX_SIZE;
+ }
+ public static Integer SUBPROCESS_EXECUTION_MAX_CONCURRENT() {
+ return SUBPROCESS_EXECUTION_MAX_CONCURRENT;
+ }
+
@PostConstruct
public void init() {
try {
@@ -238,6 +247,10 @@ public class Config {
AUTOEXEC_TOKEN = prop.getProperty("autoexec.token", "499922b4317c251c2ce525f7b83e3d94");
UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000"));
+
+ SUBPROCESS_COMMAND_QUEUE_MAX_SIZE = Integer.parseInt(prop.getProperty("subprocess.command.queue.max-size", "1000"));
+
+ SUBPROCESS_EXECUTION_MAX_CONCURRENT = Integer.parseInt(prop.getProperty("subprocess.execution.max-concurrent", "20"));
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
index 34d6c914d9f68b9f9b38a14cedaf62d04a6581a3..db7a3ca35ebfc343ba6e9d5e7a5b1802599a8d20 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
@@ -9,10 +9,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* @author lvzk
@@ -30,9 +27,10 @@ public class CommandVo {
private JSONObject passThroughEnv;//web端传到runner贯穿autoexec 回调web端会携带该变量
private List jobPhaseNameList;//需要执行的phaseNameList
private List jobPhaseResourceIdList;//需要执行的resourceIdList
- private List jobGroupIdList;//需要执行的组
+ private List jobGroupSortList;//需要执行的组
private JSONArray jobPhaseNodeSqlList;
private JSONObject environment;//设置环境变量
+ private Date fcd;
private String consoleLogPath;
@@ -73,9 +71,9 @@ public class CommandVo {
if (CollectionUtils.isNotEmpty(jobPhaseResourceIdArray)) {
this.jobPhaseResourceIdList = jobPhaseResourceIdArray.toJavaList(Long.class);
}
- JSONArray jobGroupIdArray = jsonObj.getJSONArray("jobGroupIdList");
+ JSONArray jobGroupIdArray = jsonObj.getJSONArray("jobGroupSortList");
if (CollectionUtils.isNotEmpty(jobGroupIdArray)) {
- this.jobGroupIdList = jobGroupIdArray.toJavaList(Integer.class);
+ this.jobGroupSortList = jobGroupIdArray.toJavaList(Integer.class);
}
JSONArray jobPhaseNodeSqlList = jsonObj.getJSONArray("jobPhaseNodeSqlList");
@@ -189,8 +187,8 @@ public class CommandVo {
return jobPhaseResourceIdList;
}
- public List getJobGroupIdList() {
- return jobGroupIdList;
+ public List getJobGroupSortList() {
+ return jobGroupSortList;
}
public JSONArray getJobPhaseNodeSqlList() {
@@ -219,4 +217,41 @@ public class CommandVo {
this.consoleLogPath = Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(getJobId(), new StringBuilder()) + File.separator + "log" + File.separator + "console.txt";
return consoleLogPath;
}
+
+ public Date getFcd() {
+ return fcd;
+ }
+
+ public void setFcd(Date fcd) {
+ this.fcd = fcd;
+ }
+
+ private String getFilteredCommandString() {
+ if (commandList == null) return "";
+ List filtered = new ArrayList<>();
+ Iterator iterator = commandList.iterator();
+ while (iterator.hasNext()) {
+ String item = iterator.next();
+ if ("--execid".equals(item)) {
+ // 跳过 "--execid" 和它后面的那个参数
+ if (iterator.hasNext()) iterator.next();
+ continue;
+ }
+ filtered.add(item);
+ }
+ return String.join(",", filtered);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof CommandVo)) return false;
+ CommandVo that = (CommandVo) o;
+ return Objects.equals(getFilteredCommandString(), that.getFilteredCommandString());
+ }
+
+ @Override
+ public int hashCode() {
+ return getFilteredCommandString().hashCode();
+ }
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java
new file mode 100644
index 0000000000000000000000000000000000000000..a12af67a02198250b7d1e808f320d923974995e6
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.neatlogic.autoexecrunner.dto;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ThreadPoolVo {
+ private int mainQueueSize;
+ private int mainPoolSize;
+ private int backupQueueSize;
+ private int backupPoolSize;
+ private int mainActiveCount;
+ private int backupActiveCount;
+ private int maxThreadCount;
+ private List threadTaskList = new ArrayList<>();
+ private List threadList = new ArrayList<>();
+
+ public int getMaxThreadCount() {
+ return maxThreadCount;
+ }
+
+ public void setMaxThreadCount(int maxThreadCount) {
+ this.maxThreadCount = maxThreadCount;
+ }
+
+
+ public int getMainQueueSize() {
+ return mainQueueSize;
+ }
+
+ public void setMainQueueSize(int mainQueueSize) {
+ this.mainQueueSize = mainQueueSize;
+ }
+
+ public int getMainPoolSize() {
+ return mainPoolSize;
+ }
+
+ public int getMainActiveCount() {
+ return mainActiveCount;
+ }
+
+ public void setMainActiveCount(int mainActiveCount) {
+ this.mainActiveCount = mainActiveCount;
+ }
+
+ public int getBackupActiveCount() {
+ return backupActiveCount;
+ }
+
+ public void setBackupActiveCount(int backupActiveCount) {
+ this.backupActiveCount = backupActiveCount;
+ }
+
+ public void setMainPoolSize(int mainPoolSize) {
+ this.mainPoolSize = mainPoolSize;
+ }
+
+ public int getBackupQueueSize() {
+ return backupQueueSize;
+ }
+
+ public void setBackupQueueSize(int backupQueueSize) {
+ this.backupQueueSize = backupQueueSize;
+ }
+
+ public int getBackupPoolSize() {
+ return backupPoolSize;
+ }
+
+ public void setBackupPoolSize(int backupPoolSize) {
+ this.backupPoolSize = backupPoolSize;
+ }
+
+ boolean isSorted = false;
+
+ public List getThreadTaskList() {
+ if (CollectionUtils.isNotEmpty(threadTaskList)) {
+ threadTaskList.sort((o1, o2) -> {
+ long s = o1.getStartTime().getTime();
+ long e = o2.getStartTime().getTime();
+ return Long.compare(s, e);
+ });
+ }
+ return threadTaskList;
+ }
+
+ public void setThreadTaskList(List threadTaskList) {
+ this.threadTaskList = threadTaskList;
+ }
+
+ public List getThreadList() {
+ if (CollectionUtils.isNotEmpty(threadList)) {
+ threadList.sort((o1, o2) -> {
+ long s = o1.getStartTime().getTime();
+ long e = o2.getStartTime().getTime();
+ return Long.compare(s, e);
+ });
+ }
+ return threadList;
+ }
+
+ public void setThreadList(List threadList) {
+ this.threadList = threadList;
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java
new file mode 100644
index 0000000000000000000000000000000000000000..0c95c6c8f5d40acfaae35f1006e4689c4e272d75
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.neatlogic.autoexecrunner.dto;
+
+import java.util.Date;
+
+public class ThreadTaskVo {
+ private Long id;
+ private String name;
+ private Date startTime;
+ private long timeCost;
+ private String poolName;
+ private String status;
+ private int priority;
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getTimeCost() {
+ return System.currentTimeMillis() - this.startTime.getTime();
+ }
+
+
+ public String getPoolName() {
+ return poolName;
+ }
+
+ public void setPoolName(String poolName) {
+ this.poolName = poolName;
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java
new file mode 100644
index 0000000000000000000000000000000000000000..3b4f2c01c04e93f450f90fbccd4a10440264fba0
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.neatlogic.autoexecrunner.dto;
+
+import java.util.Date;
+
+public class ThreadVo {
+ private long id;
+ private String name;
+ private Date startTime;
+ private long timeCost;
+
+ public ThreadVo(long id, String name) {
+ this.id = id;
+ this.name = name;
+ this.startTime = new Date();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getTimeCost() {
+ return System.currentTimeMillis() - this.startTime.getTime();
+ }
+
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java
new file mode 100755
index 0000000000000000000000000000000000000000..a72dae60f6632e78189123b0a8cd195e40a9caa1
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java
@@ -0,0 +1,14 @@
+package com.neatlogic.autoexecrunner.exception.job;
+
+
+import com.neatlogic.autoexecrunner.common.config.Config;
+import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
+
+public class JobQueueFullException extends ApiRuntimeException {
+
+ public JobQueueFullException() {
+ super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.SUBPROCESS_COMMAND_QUEUE_MAX_SIZE() + "),无法执行");
+ }
+
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java b/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java
index d942f06293c7a51ed5a32614f7bad7a7a2623311..917777b17a574bbdae109ac61c13dc1de443d8d6 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java
@@ -2,6 +2,7 @@ package com.neatlogic.autoexecrunner.filter;
import com.alibaba.fastjson.JSONObject;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext;
import com.neatlogic.autoexecrunner.common.config.Config;
@@ -41,7 +42,8 @@ public class JsonWebTokenValidFilter extends OncePerRequestFilter {
UserVo userVo = null;
JSONObject redirectObj = new JSONObject();
String authType = null;
-
+ //初始化request上下文
+ RequestContext.init(request, request.getRequestURI(), response);
//判断租户
try {
String tenant = request.getHeader("Tenant");
diff --git a/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java b/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java
new file mode 100644
index 0000000000000000000000000000000000000000..cb3cfb157c0c2afa6e3923c139429ae41c6c263a
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java
@@ -0,0 +1,37 @@
+/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.logback;
+
+import ch.qos.logback.classic.pattern.ClassicConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext;
+
+public class RequestUrlConverter extends ClassicConverter {
+ /**
+ * The convert method is responsible for extracting data from the event and
+ * storing it for later use by the write method.
+ *
+ * @param event
+ */
+ @Override
+ public String convert(ILoggingEvent event) {
+ RequestContext requestContext = RequestContext.get();
+ if (requestContext != null) {
+ return requestContext.getUrl();
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java b/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java
new file mode 100644
index 0000000000000000000000000000000000000000..b20ceb46cb7c79b699eb80f837943cb2f465a0c7
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java
@@ -0,0 +1,38 @@
+/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.logback;
+
+
+import ch.qos.logback.classic.pattern.ClassicConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
+
+public class TenantConverter extends ClassicConverter {
+ /**
+ * The convert method is responsible for extracting data from the event and
+ * storing it for later use by the write method.
+ *
+ * @param event
+ */
+ @Override
+ public String convert(ILoggingEvent event) {
+ TenantContext tenantContext = TenantContext.get();
+ if (tenantContext != null) {
+ return tenantContext.getTenantUuid();
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
new file mode 100644
index 0000000000000000000000000000000000000000..9f1a4e0469c551e02c519a416c24b5b2838cf891
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
@@ -0,0 +1,184 @@
+package com.neatlogic.autoexecrunner.startup.handler;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue;
+import com.neatlogic.autoexecrunner.asynchronization.threadpool.CachedThreadPool;
+import com.neatlogic.autoexecrunner.autoexec.ProcessWaitTask;
+import com.neatlogic.autoexecrunner.common.config.Config;
+import com.neatlogic.autoexecrunner.dto.CommandVo;
+import com.neatlogic.autoexecrunner.startup.IStartUp;
+import com.neatlogic.autoexecrunner.util.FileUtil;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class AutoexecQueueThread implements IStartUp {
+ private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT() + 5);
+ private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class);
+ private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.SUBPROCESS_COMMAND_QUEUE_MAX_SIZE());
+ private volatile boolean running = true;
+
+ @Override
+ public String getName() {
+ return "创建自动化作业线程";
+ }
+
+ @Override
+ public String getDescription() {
+ return null;
+ }
+
+ @Override
+ public void doService() {
+ // 启动监控线程
+ Thread watchdog = new Thread(() -> {
+ Thread workerThread = null;
+
+ while (running) {
+ if (workerThread == null || !workerThread.isAlive()) {
+ System.out.println("autoexec job thread is down,ready to start...");
+ logger.debug("autoexec job thread is down,ready to start...");
+ workerThread = new Thread(() -> {
+ Thread.currentThread().setName("AutoexecQueueThread");
+ System.out.println("autoexec job thread start succeed!");
+ while (running) {
+ CommandVo commandVo = null;
+ try {
+ // 你的业务逻辑
+ if (processQueue.size() <= Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT()) {
+ commandVo = blockingQueue.take();
+ logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)));
+ createSubProcessAndStart(commandVo);
+ } else {
+ logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT());
+ }
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ logger.error(String.format("autoexec job thread is interrupted...params:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e);
+ break;
+ } catch (Exception e) {
+ logger.error(String.format("create sub process failed:params:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e);
+ break; // 退出由外层 watchdog 负责重启
+ }
+ }
+ });
+
+ workerThread.setDaemon(true);
+ workerThread.start();
+ }
+
+ try {
+ Thread.sleep(2000); // 每2秒检查一次
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ });
+
+ watchdog.setDaemon(true);
+ watchdog.start();
+ }
+
+ private void createSubProcessAndStart(CommandVo commandVo) {
+ File NULL_FILE = new File("/dev/null");
+ ProcessBuilder builder = new ProcessBuilder(commandVo.getCommandList());
+ builder.redirectOutput(NULL_FILE);
+ File consoleLog = FileUtil.createFile(commandVo.getConsoleLogPath());
+ builder.redirectError(ProcessBuilder.Redirect.appendTo(consoleLog));
+ Map env = builder.environment();
+ JSONObject environment = commandVo.getEnvironment();
+ if (MapUtils.isNotEmpty(environment)) {
+ for (Map.Entry entry : environment.entrySet()) {
+ env.put(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ env.put("tenant", commandVo.getTenant());
+ JSONObject payload = new JSONObject();
+ Process process;
+ try {
+ payload.put("jobId", commandVo.getJobId());
+ payload.put("status", 1);
+ payload.put("command", commandVo);
+ payload.put("passThroughEnv", commandVo.getPassThroughEnv().toJSONString());
+ process = builder.start();
+ String jobName = (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY));
+ addProcess(process);
+ logger.debug("autoexec job sub process {} is running, pid {},added to processQueue,now processQueue size is {}", jobName, getPid(process), processQueue.size());
+ CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload, jobName));
+ } catch (IOException e) {
+ logger.error(String.format("autoexec job sub process start failed: %s ,error: %s", payload, e.getMessage()), e);
+ }
+ }
+
+ // 兼容不同JDK版本的PID获取
+ private long getPid(Process p) {
+ try {
+ if (p.getClass().getName().contains("UNIXProcess")) {
+ Field pidField = p.getClass().getDeclaredField("pid");
+ pidField.setAccessible(true);
+ return pidField.getLong(p);
+ }
+ } catch (Exception e) {
+ // 忽略异常
+ }
+ return -1; // 未知PID
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ /**
+ * 进入队列
+ * 返回-1:队列已满,1:添加成功,0:重复添加
+ */
+ public static int addCommand(CommandVo commandVo) {
+ commandVo.setFcd(new Date());
+ return blockingQueue.offer(commandVo);
+ }
+
+ /**
+ * 删除命令
+ */
+ public static boolean removeCommand(CommandVo commandVo) {
+ return blockingQueue.remove(task -> task.getT().getJobId().equals(commandVo.getJobId()));
+ }
+
+ public static void addProcess(Process process) {
+ boolean result = processQueue.offer(process);
+ if (!result) {
+ logger.error("processQueue offer failed!current queue size is :{}", processQueue.size());
+ }
+ }
+
+ public static void removeProcess(Process process, String jobName, Long pid) {
+ boolean result = processQueue.remove(process);
+ if (!result) {
+ logger.error("autoexec process:{} (pid:{})finished. processQueue remove failed!current queue size is :{}", jobName, pid, blockingQueue.size());
+ } else {
+ logger.debug("autoexec process:{} (pid:{})finished. processQueue remove succeed!current queue size is :{}", jobName, pid, blockingQueue.size());
+ }
+ }
+
+ public static Integer getProcessQueueSize() {
+ return processQueue.size();
+ }
+
+ public static Integer getBlockingQueueSize() {
+ return blockingQueue.size();
+ }
+
+ public static List getBlockingQueueByJobIdAndGroupSort() {
+ return blockingQueue.getQueue();
+ }
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 260617808c94a255c240d8248229cc8336242b4d..81f7bfe415f8b75f03af32351a9c8ccbb90f04d7 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -14,7 +14,6 @@ spring.servlet.multipart.max-file-size=100MB
#NEATLOGIC WEB
neatlogic.root=http://127.0.0.1:8080/neatlogic
-#认证 连接时校验
#RUNNER
@@ -32,3 +31,12 @@ autoexec.home=/Users/cocokong/IdeaProjects/autoexec/data/job
deploy.home=/app/autoexec/data/verdata
data.home=${runner.home}/data
file.mimetype.text.plain=sql text c cc c++ cpp h pl py txt java el gitignore js css properties jsp yml json md vue sh config htm html xml classpath project pm less scss
+
+
+# Maximum number of concurrent subprocesses for autoexec jobs.
+# Commands exceeding this limit will be placed in the waiting queue.
+subprocess.execution.max-concurrent=20
+
+# Maximum capacity of the command waiting queue for autoexec jobs.
+# New commands exceeding this limit will be rejected or discarded.
+subprocess.command.queue.max-size=1000
diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml
index e18ecc82a7d745f7c5f31b5d1a7942251b708e3e..aefed5dc77ad55f09f3b26830eb411c5067c0f15 100755
--- a/src/main/resources/logback-spring.xml
+++ b/src/main/resources/logback-spring.xml
@@ -1,10 +1,12 @@
+
+
${LOG_HOME}/debug.log
- debug.log.%i
+ ${LOG_HOME}/debug.log.%i
1
5
@@ -18,14 +20,14 @@
100MB
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
${LOG_HOME}/info.log
- info.log.%i
+ ${LOG_HOME}/info.log.%i
1
5
@@ -39,7 +41,7 @@
100MB
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
@@ -47,7 +49,7 @@
class="ch.qos.logback.core.rolling.RollingFileAppender">
${LOG_HOME}/warn.log
- warn.log.%i
+ ${LOG_HOME}/warn.log.%i
1
5
@@ -61,14 +63,14 @@
100MB
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
${LOG_HOME}/error.log
- error.log.%i
+ ${LOG_HOME}/error.log.%i
1
5
@@ -82,7 +84,27 @@
100MB
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line]- %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
+
+
+
+
+ ${LOG_HOME}/process.log
+
+ ${LOG_HOME}/process.%i
+ 1
+ 5
+
+
+ DEBUG
+ ACCEPT
+ DENY
+
+
+ 100MB
+
+
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
@@ -110,7 +132,7 @@
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
UTF-8
false
@@ -137,5 +159,12 @@
+
+
+
+
+
+
+
\ No newline at end of file