From 0439e1dba9b769f71d83a87e417ba8704bed1398 Mon Sep 17 00:00:00 2001
From: lvzk <897706680@qq.com>
Date: Wed, 9 Apr 2025 19:31:27 +0800
Subject: [PATCH 1/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?=
=?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?=
=?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../autoexecrunner/api/job/JobExecApi.java | 6 +-
.../asynchronization/NeatLogicThread.java | 190 ++++++++++++++++++
.../queue/NeatLogicUniqueBlockingQueue.java | 102 ++++++++++
.../threadlocal/UserContext.java | 24 ++-
.../threadpool/CachedThreadPool.java | 144 +++++++++++++
.../autoexec/ProcessWaitTask.java | 69 +++++++
.../autoexecrunner/common/config/Config.java | 13 ++
.../core/ExecProcessCommand.java | 1 +
.../autoexecrunner/dto/ThreadPoolVo.java | 124 ++++++++++++
.../autoexecrunner/dto/ThreadTaskVo.java | 84 ++++++++
.../autoexecrunner/dto/ThreadVo.java | 62 ++++++
.../startup/handler/AutoexecQueueThread.java | 157 +++++++++++++++
12 files changed, 962 insertions(+), 14 deletions(-)
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
index 7622d2e..6ed4b7a 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
@@ -17,10 +17,9 @@ package com.neatlogic.autoexecrunner.api.job;
import com.alibaba.fastjson.JSONObject;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext;
import com.neatlogic.autoexecrunner.constvalue.JobAction;
-import com.neatlogic.autoexecrunner.core.ExecProcessCommand;
import com.neatlogic.autoexecrunner.dto.CommandVo;
import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase;
-import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
import com.neatlogic.autoexecrunner.util.FileUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;
@@ -83,8 +82,7 @@ public class JobExecApi extends PrivateApiComponentBase {
}
commandList.add("--reuseconslog");
commandVo.setCommandList(commandList);
- ExecProcessCommand processCommand = new ExecProcessCommand(commandVo);
- CommonThreadPool.execute(processCommand);
+ AutoexecQueueThread.addUpdateTagent(commandVo);
return null;
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java
new file mode 100644
index 0000000..d37b747
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java
@@ -0,0 +1,190 @@
+/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.asynchronization;
+
+
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext;
+import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+public abstract class NeatLogicThread implements Runnable, Comparable {
+ private static final Logger logger = LoggerFactory.getLogger(NeatLogicThread.class);
+ protected UserContext userContext;
+ private final String tenantUuid;
+ private String threadName;
+ private boolean isUnique = false;
+ /* For generating thread ID */
+ private long id;
+ private int priority = 3;//默认优先级是3,数字越低优先级越高
+ private Semaphore lock;//用于hold住其他异步线程,控制两个异步线程的先后顺序
+ private CountDownLatch countDownLatch;//用于hold住主线程,这里只需要countdown,不需要等待
+ private boolean needAwaitAdvance = true;// 是否需要等待所有模块加载完成后再任务
+
+ @Override
+ public int compareTo(NeatLogicThread other) {
+ return Integer.compare(this.priority, other.priority); // 优先级高的先出队,priority越小代表优先级越高
+ }
+
+ public Semaphore getLock() {
+ return lock;
+ }
+
+ public void setLock(Semaphore lock) {
+ this.lock = lock;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public int getPriority() {
+ if (priority <= 1) {
+ priority = 1;
+ }
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+
+
+ /*public NeatLogicThread() {
+ userContext = UserContext.get();
+ tenantContext = TenantContext.get();
+ inputFromContext = InputFromContext.get();
+ }*/
+
+ /*public NeatLogicThread(UserContext _userContext, TenantContext _tenantContext) {
+ if (_userContext != null) {
+ userContext = _userContext.copy();
+ }
+ tenantUuid = _tenantContext.getTenantUuid();
+ activeModuleList = _tenantContext.getActiveModuleList();
+ inputFromContext = InputFromContext.get();
+ }*/
+
+
+ public NeatLogicThread(String _threadName) {
+ UserContext tmp = UserContext.get();
+ if (tmp != null) {
+ userContext = tmp.copy();
+ }
+ tenantUuid = TenantContext.get().getTenantUuid();
+ this.threadName = _threadName;
+ }
+
+ public NeatLogicThread(String _threadName, int priority) {
+ UserContext tmp = UserContext.get();
+ if (tmp != null) {
+ userContext = tmp.copy();
+ }
+ tenantUuid = TenantContext.get().getTenantUuid();
+ this.threadName = _threadName;
+ this.priority = priority;
+ }
+
+ public NeatLogicThread(String _threadName, boolean _isUnique) {
+ userContext = UserContext.get();
+ tenantUuid = TenantContext.get().getTenantUuid();
+ this.threadName = _threadName;
+ this.isUnique = _isUnique;
+ }
+
+ @Override
+ public final void run() {
+ TenantContext tenantContext = TenantContext.init(tenantUuid);
+ UserContext.init(userContext);
+ try {
+ String oldThreadName = Thread.currentThread().getName();
+ if (StringUtils.isNotBlank(threadName)) {
+ Thread.currentThread().setName(threadName);
+ }
+
+ if (this.lock != null) {
+ //System.out.println(this.getThreadName() + "尝试获取锁" + this.lock);
+ lock.acquire();
+ //System.out.println(this.getThreadName() + "成功获取锁" + this.lock);
+ }
+ execute();
+ Thread.currentThread().setName(oldThreadName);
+ } catch (ApiRuntimeException ex) {
+ logger.warn(ex.getMessage(), ex);
+ } catch (Exception ex) {
+ logger.error(ex.getMessage(), ex);
+ } finally {
+ if (this.lock != null) {
+ lock.release();
+ //System.out.println(this.getThreadName() + "成功释放锁" + this.lock);
+ }
+ if (countDownLatch != null) {
+ countDownLatch.countDown();
+ }
+ // 清除所有threadlocal
+ if (TenantContext.get() != null) {
+ TenantContext.get().release();
+ }
+ if (UserContext.get() != null) {
+ UserContext.get().release();
+ }
+ }
+ }
+
+ protected abstract void execute();
+
+ public void setUnique(boolean unique) {
+ isUnique = unique;
+ }
+
+ public boolean isUnique() {
+ return isUnique;
+ }
+
+ public void setThreadName(String threadName) {
+ this.threadName = threadName;
+ }
+
+ public String getThreadName() {
+ return threadName;
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ public boolean isNeedAwaitAdvance() {
+ return needAwaitAdvance;
+ }
+
+ public void setNeedAwaitAdvance(boolean needAwaitAdvance) {
+ this.needAwaitAdvance = needAwaitAdvance;
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
new file mode 100644
index 0000000..ae9deaf
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.neatlogic.autoexecrunner.asynchronization.queue;
+
+import com.alibaba.fastjson.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class NeatLogicUniqueBlockingQueue {
+ private static final Logger logger = LoggerFactory.getLogger(NeatLogicUniqueBlockingQueue.class);
+ private final BlockingQueue> blockingQueue;
+ private final ConcurrentHashMap taskMap; // 用于去重
+
+ public NeatLogicUniqueBlockingQueue(int capacity) {
+ this.blockingQueue = new LinkedBlockingQueue<>(capacity);
+ this.taskMap = new ConcurrentHashMap<>();
+ }
+
+ public boolean offer(T t) {
+ Task task = new Task<>(t);
+ // 保证任务唯一性
+ if (taskMap.putIfAbsent(task.getUniqueKey(), Boolean.TRUE) == null) {
+ logger.debug("====TagentUpdateInfo-addQueue:" + JSON.toJSONString(task));
+ // 如果任务是新任务,放入队列
+ boolean added = blockingQueue.offer(task);
+ if (!added) {
+ // 如果队列已满,移除任务标记
+ taskMap.remove(task.getUniqueKey());
+ logger.error("Queue is full!");
+ }
+ return added;
+ } else {
+ if (t != null) {
+ logger.debug("NeatLogicUniqueBlockingQueue repeat: {}", JSON.toJSONString(t));
+ }
+ }
+ return false; // 已存在任务,直接返回 false
+ }
+
+ public T take() throws InterruptedException {
+ Task task = blockingQueue.take(); // 阻塞式获取任务
+ taskMap.remove(task.getUniqueKey()); // 移除已处理任务的唯一标记
+ return task.getT();
+ }
+
+ private static class Task {
+ private final T t;
+
+ public Task(T t) {
+ this.t = t;
+ }
+
+ public T getT() {
+ return t;
+ }
+
+ public String getUniqueKey() {
+ return String.valueOf(t.hashCode());
+ }
+ }
+
+ public int size(){
+ return blockingQueue.size();
+ }
+
+// public static void main(String[] args) throws InterruptedException {
+// NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1);
+//
+// // 模拟任务插入
+// UserSessionVo a = new UserSessionVo();
+// a.setToken("1111");
+// System.out.println(queue.offer(a)); // 返回 true,任务插入成功
+// UserSessionVo b = new UserSessionVo();
+// b.setToken("222");
+// System.out.println(queue.offer(b)); // 返回 false,任务已存在
+//
+// // 模拟任务消费
+// UserSessionVo task = queue.take(); // 消费 "task1"
+// UserSessionVo task2 = queue.take(); // 消费 "task1"
+// UserSessionVo task3 = queue.take(); // 消费 "task1"
+// }
+}
+
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java
index 89bf0a2..f63f71a 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java
@@ -27,19 +27,23 @@ public class UserContext implements Serializable {
private String timezone = "+8:00";
private String token;
private List roleUuidList = new ArrayList<>();
-
+
+ public UserContext copy() {
+ UserContext userContext = new UserContext();
+ userContext.setRequest(request);
+ userContext.setToken(token);
+ userContext.setTenant(tenant);
+ userContext.setUserName(userName);
+ userContext.setUserId(userId);
+ userContext.setUserUuid(userUuid);
+ userContext.setTimezone(timezone);
+ return userContext;
+ }
+
public static UserContext init(UserContext _userContext) {
UserContext context = new UserContext();
if (_userContext != null) {
- context.setUserId(_userContext.getUserId());
- context.setUserUuid(_userContext.getUserUuid());
- context.setUserName(_userContext.getUserName());
- context.setTenant(_userContext.getTenant());
- context.setTimezone(_userContext.getTimezone());
- context.setToken(_userContext.getToken());
- // context.setRequest(_userContext.getRequest());
- // context.setResponse(_userContext.getResponse());
- context.setRoleUuidList(_userContext.getRoleUuidList());
+ context = _userContext.copy();
}
instance.set(context);
return context;
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java
new file mode 100644
index 0000000..b1f50a2
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java
@@ -0,0 +1,144 @@
+/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.asynchronization.threadpool;
+
+
+import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread;
+import com.neatlogic.autoexecrunner.dto.ThreadPoolVo;
+import com.neatlogic.autoexecrunner.dto.ThreadTaskVo;
+import com.neatlogic.autoexecrunner.dto.ThreadVo;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class CachedThreadPool {
+ static int rank = 15;
+ static int cpu = Runtime.getRuntime().availableProcessors();
+ private static final Log logger = LogFactory.getLog(CachedThreadPool.class);
+ private static final Map threadTaskMap = new ConcurrentHashMap<>();
+ private static final Map threadMap = new ConcurrentHashMap<>();
+ private static final Set threadSet = new HashSet<>();
+ private static final PriorityBlockingQueue threadQueue = new PriorityBlockingQueue<>();
+
+ static class NeatLogicThreadFactory implements ThreadFactory {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r) {
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } finally {
+ threadMap.remove(this.getId());
+ }
+ }
+ };
+ threadMap.put(thread.getId(), new ThreadVo(thread.getId(), thread.getName()));
+ return thread;
+ }
+ }
+
+ private static final ThreadPoolExecutor mainThreadPool = new ThreadPoolExecutor(0, cpu * rank,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new NeatLogicThreadFactory(), new NeatLogicRejectHandler()) {
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t, r);
+ if (r instanceof NeatLogicThread) {
+ NeatLogicThread nt = (NeatLogicThread) r;
+ nt.setId(t.getId());
+ ThreadTaskVo threadVo = new ThreadTaskVo();
+ threadVo.setId(t.getId());
+ threadVo.setName(nt.getThreadName());
+ threadVo.setPoolName("main");
+ threadVo.setStartTime(new Date());
+ threadVo.setPriority(nt.getPriority());
+ threadTaskMap.put(nt.getId(), threadVo);
+ threadSet.add(nt.getThreadName());
+ }
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+
+ // 任务完成后从 activeTasks 中移除
+ if (r instanceof NeatLogicThread) {
+ NeatLogicThread task = (NeatLogicThread) r;
+ threadTaskMap.remove(task.getId());
+ threadSet.remove(task.getThreadName());
+ }
+ //尝试从队列中拿出任务处理
+ Runnable task = threadQueue.poll();
+ if (task != null) {
+ mainThreadPool.execute(task);
+ }
+ }
+ };
+
+ public static void execute(NeatLogicThread command, CountDownLatch countDownLatch) {
+ command.setCountDownLatch(countDownLatch);
+ execute(command);
+ }
+
+ public static void execute(NeatLogicThread command, Semaphore lock) {
+ command.setLock(lock);
+ execute(command);
+ }
+
+ public static void execute(NeatLogicThread command) {
+ try {
+ boolean isExists = command.isUnique() && StringUtils.isNotBlank(command.getThreadName()) && threadSet.contains(command.getThreadName());
+ if (!isExists) {
+ mainThreadPool.execute(command);
+ } else {
+ logger.warn(command.getThreadName() + " is running");
+ }
+ } catch (RejectedExecutionException ex) {
+ logger.error(ex.getMessage(), ex);
+ }
+ }
+
+
+ static class NeatLogicRejectHandler implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ //进入等待队列
+ if (r instanceof NeatLogicThread) {
+ threadQueue.offer((NeatLogicThread) r);
+ } else {
+ logger.error("线程池已满,非NeatLogicThread子类线程将被抛弃");
+ }
+ }
+ }
+
+
+ public static ThreadPoolVo getStatus() {
+ ThreadPoolVo threadPoolVo = new ThreadPoolVo();
+ List threadTasks = new ArrayList<>(threadTaskMap.values());
+ List threads = new ArrayList<>(threadMap.values());
+ threadPoolVo.setThreadTaskList(threadTasks);
+ threadPoolVo.setThreadList(threads);
+ threadPoolVo.setMaxThreadCount(cpu * rank);
+ threadPoolVo.setMainPoolSize(mainThreadPool.getPoolSize());
+ threadPoolVo.setMainActiveCount(mainThreadPool.getActiveCount());
+ threadPoolVo.setMainQueueSize(threadQueue.size());
+ return threadPoolVo;
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java
new file mode 100644
index 0000000..8068c16
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java
@@ -0,0 +1,69 @@
+package com.neatlogic.autoexecrunner.autoexec;
+
+import com.alibaba.fastjson.JSONObject;
+import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread;
+import com.neatlogic.autoexecrunner.dto.CommandVo;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+public class ProcessWaitTask extends NeatLogicThread {
+ private static final Logger logger = LoggerFactory.getLogger(ProcessWaitTask.class);
+ private final Process process;
+ private final CommandVo commandVo;
+ private final JSONObject payload;
+
+ public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload) {
+ super("THREAD-AUTOEXEC-WAIT-" + commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY));
+ this.process = process;
+ this.commandVo = commandVo;
+ this.payload = payload;
+ }
+
+
+ @Override
+ protected void execute() {
+
+ try {
+ int exitCode = process.waitFor();
+ int exitStatus = process.exitValue();
+ commandVo.setExitValue(exitStatus);
+ logger.debug("进程[{}] 退出,状态码: {}", getPid(process), exitCode);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error(String.format("等待被中断: 入参:%s, errorMsg:%s", payload, e.getMessage()), e);
+ } finally {
+ // 确保关闭流
+ closeQuietly(process.getInputStream());
+ closeQuietly(process.getErrorStream());
+ closeQuietly(process.getOutputStream());
+ AutoexecQueueThread.removeProcess(process);
+ }
+ }
+
+ // 兼容不同JDK版本的PID获取
+ private long getPid(Process p) {
+ try {
+ if (p.getClass().getName().contains("UNIXProcess")) {
+ Field pidField = p.getClass().getDeclaredField("pid");
+ pidField.setAccessible(true);
+ return pidField.getLong(p);
+ }
+ } catch (Exception e) {
+ // 忽略异常
+ }
+ return -1; // 未知PID
+ }
+
+ private void closeQuietly(java.io.Closeable c) {
+ try {
+ if (c != null) c.close();
+ } catch (IOException ignore) {
+ }
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
index 7a5eb5e..bac3c8c 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
@@ -37,6 +37,8 @@ public class Config {
private static String DATA_HOME;//文件根目录
private static String DEPLOY_HOME;//发布目录
private static String GITLAB_PASSWORD;// gitlab private_token
+ private static Integer MAX_PROCESS_QUEUE_SIZE;//最大自动化作业队列数,多余的则丢弃
+ private static Integer MAX_PROCESS_EXECUTE_COUNT;//最大执行作业数,超过的则进入队列
//neatlogic
private static String NEATLOGIC_ROOT;
@@ -168,6 +170,13 @@ public class Config {
return AUTOEXEC_TOKEN;
}
+ public static Integer MAX_PROCESS_QUEUE_SIZE() {
+ return MAX_PROCESS_QUEUE_SIZE;
+ }
+ public static Integer MAX_PROCESS_EXECUTE_COUNT() {
+ return MAX_PROCESS_EXECUTE_COUNT;
+ }
+
@PostConstruct
public void init() {
try {
@@ -238,6 +247,10 @@ public class Config {
AUTOEXEC_TOKEN = prop.getProperty("autoexec.token", "499922b4317c251c2ce525f7b83e3d94");
UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000"));
+
+ MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "2000"));
+
+ MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.size", "10"));
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java
index 14acccf..f4b3350 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
* @author lvzk
* @since 2021/4/21 17:12
**/
+@Deprecated
public class ExecProcessCommand implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ExecProcessCommand.class);
private final ProcessBuilder builder;
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java
new file mode 100644
index 0000000..a12af67
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.neatlogic.autoexecrunner.dto;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ThreadPoolVo {
+ private int mainQueueSize;
+ private int mainPoolSize;
+ private int backupQueueSize;
+ private int backupPoolSize;
+ private int mainActiveCount;
+ private int backupActiveCount;
+ private int maxThreadCount;
+ private List threadTaskList = new ArrayList<>();
+ private List threadList = new ArrayList<>();
+
+ public int getMaxThreadCount() {
+ return maxThreadCount;
+ }
+
+ public void setMaxThreadCount(int maxThreadCount) {
+ this.maxThreadCount = maxThreadCount;
+ }
+
+
+ public int getMainQueueSize() {
+ return mainQueueSize;
+ }
+
+ public void setMainQueueSize(int mainQueueSize) {
+ this.mainQueueSize = mainQueueSize;
+ }
+
+ public int getMainPoolSize() {
+ return mainPoolSize;
+ }
+
+ public int getMainActiveCount() {
+ return mainActiveCount;
+ }
+
+ public void setMainActiveCount(int mainActiveCount) {
+ this.mainActiveCount = mainActiveCount;
+ }
+
+ public int getBackupActiveCount() {
+ return backupActiveCount;
+ }
+
+ public void setBackupActiveCount(int backupActiveCount) {
+ this.backupActiveCount = backupActiveCount;
+ }
+
+ public void setMainPoolSize(int mainPoolSize) {
+ this.mainPoolSize = mainPoolSize;
+ }
+
+ public int getBackupQueueSize() {
+ return backupQueueSize;
+ }
+
+ public void setBackupQueueSize(int backupQueueSize) {
+ this.backupQueueSize = backupQueueSize;
+ }
+
+ public int getBackupPoolSize() {
+ return backupPoolSize;
+ }
+
+ public void setBackupPoolSize(int backupPoolSize) {
+ this.backupPoolSize = backupPoolSize;
+ }
+
+ boolean isSorted = false;
+
+ public List getThreadTaskList() {
+ if (CollectionUtils.isNotEmpty(threadTaskList)) {
+ threadTaskList.sort((o1, o2) -> {
+ long s = o1.getStartTime().getTime();
+ long e = o2.getStartTime().getTime();
+ return Long.compare(s, e);
+ });
+ }
+ return threadTaskList;
+ }
+
+ public void setThreadTaskList(List threadTaskList) {
+ this.threadTaskList = threadTaskList;
+ }
+
+ public List getThreadList() {
+ if (CollectionUtils.isNotEmpty(threadList)) {
+ threadList.sort((o1, o2) -> {
+ long s = o1.getStartTime().getTime();
+ long e = o2.getStartTime().getTime();
+ return Long.compare(s, e);
+ });
+ }
+ return threadList;
+ }
+
+ public void setThreadList(List threadList) {
+ this.threadList = threadList;
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java
new file mode 100644
index 0000000..0c95c6c
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.neatlogic.autoexecrunner.dto;
+
+import java.util.Date;
+
+public class ThreadTaskVo {
+ private Long id;
+ private String name;
+ private Date startTime;
+ private long timeCost;
+ private String poolName;
+ private String status;
+ private int priority;
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getTimeCost() {
+ return System.currentTimeMillis() - this.startTime.getTime();
+ }
+
+
+ public String getPoolName() {
+ return poolName;
+ }
+
+ public void setPoolName(String poolName) {
+ this.poolName = poolName;
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java
new file mode 100644
index 0000000..3b4f2c0
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.neatlogic.autoexecrunner.dto;
+
+import java.util.Date;
+
+public class ThreadVo {
+ private long id;
+ private String name;
+ private Date startTime;
+ private long timeCost;
+
+ public ThreadVo(long id, String name) {
+ this.id = id;
+ this.name = name;
+ this.startTime = new Date();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getTimeCost() {
+ return System.currentTimeMillis() - this.startTime.getTime();
+ }
+
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
new file mode 100644
index 0000000..141674c
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
@@ -0,0 +1,157 @@
+package com.neatlogic.autoexecrunner.startup.handler;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue;
+import com.neatlogic.autoexecrunner.asynchronization.threadpool.CachedThreadPool;
+import com.neatlogic.autoexecrunner.autoexec.ProcessWaitTask;
+import com.neatlogic.autoexecrunner.common.config.Config;
+import com.neatlogic.autoexecrunner.dto.CommandVo;
+import com.neatlogic.autoexecrunner.startup.IStartUp;
+import com.neatlogic.autoexecrunner.util.FileUtil;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class AutoexecQueueThread implements IStartUp {
+ private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE() + 5);
+ private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class);
+ private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(50000);
+ private volatile boolean running = true;
+
+ @Override
+ public String getName() {
+ return "创建自动化作业线程";
+ }
+
+ @Override
+ public String getDescription() {
+ return null;
+ }
+
+ @Override
+ public void doService() {
+ // 启动监控线程
+ Thread watchdog = new Thread(() -> {
+ Thread workerThread = null;
+
+ while (running) {
+ if (workerThread == null || !workerThread.isAlive()) {
+ System.out.println("工作线程未运行,准备启动...");
+ logger.debug("工作线程未运行,准备启动...");
+ workerThread = new Thread(() -> {
+ Thread.currentThread().setName("");
+ while (running) {
+ CommandVo commandVo = null;
+ try {
+ // 你的业务逻辑
+ if (processQueue.size() <= Config.MAX_PROCESS_QUEUE_SIZE()) {
+ commandVo = blockingQueue.take();
+ logger.debug("作业{} 即将运行...", commandVo.getTenant() + "-" + commandVo.getJobId());
+ createSubProcessAndStart(commandVo);
+ } else {
+ logger.debug("作业进程最大数量:{}, 需等待运行中的进程结束后,才继续创建队列内的作业进程!", Config.MAX_PROCESS_QUEUE_SIZE());
+ }
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ System.out.printf("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage());
+ logger.error(String.format("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e);
+ break;
+ } catch (Exception e) {
+ System.out.printf("创建自动化作业子进程失败:入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage());
+ logger.error(String.format("创建自动化作业子进程失败:入参:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e);
+ break; // 退出由外层 watchdog 负责重启
+ }
+ }
+ });
+
+ workerThread.setDaemon(true);
+ workerThread.start();
+ }
+
+ try {
+ Thread.sleep(2000); // 每2秒检查一次
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ });
+
+ watchdog.setDaemon(true);
+ watchdog.start();
+ }
+
+ private void createSubProcessAndStart(CommandVo commandVo) {
+ File NULL_FILE = new File("/dev/null");
+ ProcessBuilder builder = new ProcessBuilder(commandVo.getCommandList());
+ builder.redirectOutput(NULL_FILE);
+ File consoleLog = FileUtil.createFile(commandVo.getConsoleLogPath());
+ builder.redirectError(ProcessBuilder.Redirect.appendTo(consoleLog));
+ Map env = builder.environment();
+ JSONObject environment = commandVo.getEnvironment();
+ if (MapUtils.isNotEmpty(environment)) {
+ for (Map.Entry entry : environment.entrySet()) {
+ env.put(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ env.put("tenant", commandVo.getTenant());
+ JSONObject payload = new JSONObject();
+ Process process = null;
+ try {
+ payload.put("jobId", commandVo.getJobId());
+ payload.put("status", 1);
+ payload.put("command", commandVo);
+ payload.put("passThroughEnv", commandVo.getPassThroughEnv().toJSONString());
+ process = builder.start();
+ addProcess(process);
+ CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload));
+ } catch (IOException e) {
+ logger.error(String.format("进程启动失败: %s ,error: %s", payload, e.getMessage()), e);
+ System.err.println("进程启动失败: " + payload + ",error:" + e.getMessage());
+ } finally {
+ // 确保关闭流
+ if (process != null) {
+ closeQuietly(process.getInputStream());
+ closeQuietly(process.getErrorStream());
+ closeQuietly(process.getOutputStream());
+ }
+ }
+ }
+
+ private void closeQuietly(java.io.Closeable c) {
+ try {
+ if (c != null) c.close();
+ } catch (IOException ignore) {
+ }
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ public static void addUpdateTagent(CommandVo commandVo) {
+ blockingQueue.offer(commandVo);
+ }
+
+ public static void addProcess(Process process) {
+ boolean result = processQueue.offer(process);
+ if (!result) {
+ logger.error("processQueue offer failed!current queue size is :{}", processQueue.size());
+ }
+ }
+
+ public static void removeProcess(Process process) {
+ boolean result = processQueue.remove(process);
+ if (!result) {
+ logger.error("processQueue remove failed!current queue size is :{}", processQueue.size());
+ }
+ }
+}
--
Gitee
From 6e2f08b7b2a55c236e944eeb8632533df043d3a3 Mon Sep 17 00:00:00 2001
From: lvzk <897706680@qq.com>
Date: Thu, 10 Apr 2025 19:08:26 +0800
Subject: [PATCH 2/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?=
=?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?=
=?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../asynchronization/NeatLogicThread.java | 5 +
.../queue/NeatLogicUniqueBlockingQueue.java | 17 ++-
.../threadlocal/RequestContext.java | 134 ++++++++++++++++++
.../autoexec/ProcessWaitTask.java | 17 +--
.../autoexecrunner/common/config/Config.java | 4 +-
.../core/ExecProcessCommand.java | 1 -
.../filter/JsonWebTokenValidFilter.java | 4 +-
.../logback/RequestUrlConverter.java | 37 +++++
.../logback/TenantConverter.java | 38 +++++
.../startup/handler/AutoexecQueueThread.java | 59 ++++----
src/main/resources/logback-spring.xml | 47 ++++--
11 files changed, 315 insertions(+), 48 deletions(-)
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java
create mode 100644 src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java
index d37b747..87e8f72 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java
@@ -16,6 +16,7 @@ along with this program. If not, see .*/
package com.neatlogic.autoexecrunner.asynchronization;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext;
import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException;
@@ -29,6 +30,7 @@ import java.util.concurrent.Semaphore;
public abstract class NeatLogicThread implements Runnable, Comparable {
private static final Logger logger = LoggerFactory.getLogger(NeatLogicThread.class);
protected UserContext userContext;
+ protected RequestContext requestContext;
private final String tenantUuid;
private String threadName;
private boolean isUnique = false;
@@ -95,6 +97,7 @@ public abstract class NeatLogicThread implements Runnable, Comparable {
public T take() throws InterruptedException {
Task task = blockingQueue.take(); // 阻塞式获取任务
taskMap.remove(task.getUniqueKey()); // 移除已处理任务的唯一标记
+ TenantContext tenantContext = TenantContext.get();
+ if (tenantContext != null) {
+ tenantContext.switchTenant(task.getTenantUuid());
+ } else {
+ TenantContext.init(task.getTenantUuid());
+ }
return task.getT();
}
private static class Task {
private final T t;
+ private final String tenantUuid;
public Task(T t) {
this.t = t;
+ this.tenantUuid = TenantContext.get().getTenantUuid();
}
public T getT() {
return t;
}
+ public String getTenantUuid() {
+ return tenantUuid;
+ }
+
public String getUniqueKey() {
- return String.valueOf(t.hashCode());
+ // 唯一标识任务的 key,可根据需求定义,例如 `tenantUuid-t.hashCode`
+ //System.out.println(tenantUuid + "-" + t.hashCode());
+ return tenantUuid + "-" + t.hashCode();
}
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java
new file mode 100644
index 0000000..efcb667
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java
@@ -0,0 +1,134 @@
+/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.asynchronization.threadlocal;
+
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * 保存请求信息
+ */
+public class RequestContext implements Serializable {
+ private static final ThreadLocal instance = new ThreadLocal<>();
+ private static final long serialVersionUID = -5420998728515359626L;
+ private String url;
+ private HttpServletRequest request;
+ private HttpServletResponse response;
+ //接口访问速率
+ private Double apiRate;
+ //租户接口访问总速率
+ private Double tenantRate;
+ //语言
+ Locale locale;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public HttpServletRequest getRequest() {
+ return request;
+ }
+
+ public void setRequest(HttpServletRequest request) {
+ this.request = request;
+ }
+
+ public HttpServletResponse getResponse() {
+ return response;
+ }
+
+ public void setResponse(HttpServletResponse response) {
+ this.response = response;
+ }
+
+ public Double getApiRate() {
+ return apiRate;
+ }
+
+ public void setApiRate(Double apiRate) {
+ this.apiRate = apiRate;
+ }
+
+
+ public Double getTenantRate() {
+ return tenantRate;
+ }
+
+ public void setTenantRate(Double tenantRate) {
+ this.tenantRate = tenantRate;
+ }
+
+ public Locale getLocale() {
+ return locale;
+ }
+
+ public void setLocale(Locale locale) {
+ this.locale = locale;
+ }
+
+ public static RequestContext init(RequestContext _requestContext) {
+ RequestContext context = new RequestContext();
+ if (_requestContext != null) {
+ context.setUrl(_requestContext.getUrl());
+ context.setLocale(_requestContext.getLocale());
+ }
+ instance.set(context);
+ return context;
+ }
+
+ public static RequestContext init(HttpServletRequest request, String url, HttpServletResponse response) {
+ RequestContext context = new RequestContext(request, url);
+ context.setResponse(response);
+ instance.set(context);
+ if (request.getCookies() != null && request.getCookies().length > 0) {
+ Optional languageCookie = Arrays.stream(request.getCookies()).filter(o -> Objects.equals(o.getName(), "neatlogic_language")).findFirst();
+ if (languageCookie.isPresent()) {
+ context.setLocale(new Locale(languageCookie.get().getValue()));
+ } else {
+ context.setLocale(Locale.getDefault());
+ }
+ }
+ return context;
+ }
+
+ private RequestContext() {
+
+ }
+
+ private RequestContext(HttpServletRequest request, String url) {
+ this.url = url;
+ this.request = request;
+ }
+
+ public static RequestContext get() {
+ return instance.get();
+ }
+
+ public void release() {
+ instance.remove();
+ }
+
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java
index 8068c16..c2b0f81 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java
@@ -4,8 +4,6 @@ import com.alibaba.fastjson.JSONObject;
import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread;
import com.neatlogic.autoexecrunner.dto.CommandVo;
import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -17,32 +15,35 @@ public class ProcessWaitTask extends NeatLogicThread {
private final Process process;
private final CommandVo commandVo;
private final JSONObject payload;
+ private final String jobName;
- public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload) {
- super("THREAD-AUTOEXEC-WAIT-" + commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY));
+ public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload, String jobName) {
+ super("THREAD-AUTOEXEC-WAIT-" + jobName);
this.process = process;
this.commandVo = commandVo;
this.payload = payload;
+ this.jobName = jobName;
}
@Override
protected void execute() {
-
+ Long pid = null;
try {
int exitCode = process.waitFor();
int exitStatus = process.exitValue();
commandVo.setExitValue(exitStatus);
- logger.debug("进程[{}] 退出,状态码: {}", getPid(process), exitCode);
+ pid = getPid(process);
+ logger.debug("process[{}] finished,exitCode: {}", pid, exitCode);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error(String.format("等待被中断: 入参:%s, errorMsg:%s", payload, e.getMessage()), e);
+ logger.error(String.format("thread interrupt: param:%s, errorMsg:%s", payload, e.getMessage()), e);
} finally {
// 确保关闭流
closeQuietly(process.getInputStream());
closeQuietly(process.getErrorStream());
closeQuietly(process.getOutputStream());
- AutoexecQueueThread.removeProcess(process);
+ AutoexecQueueThread.removeProcess(process, jobName, pid);
}
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
index bac3c8c..dd660c9 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
@@ -248,9 +248,9 @@ public class Config {
UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000"));
- MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "2000"));
+ MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "1000"));
- MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.size", "10"));
+ MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.count", "20"));
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java
index f4b3350..14acccf 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java
@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
* @author lvzk
* @since 2021/4/21 17:12
**/
-@Deprecated
public class ExecProcessCommand implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ExecProcessCommand.class);
private final ProcessBuilder builder;
diff --git a/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java b/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java
index d942f06..917777b 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java
@@ -2,6 +2,7 @@ package com.neatlogic.autoexecrunner.filter;
import com.alibaba.fastjson.JSONObject;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext;
import com.neatlogic.autoexecrunner.common.config.Config;
@@ -41,7 +42,8 @@ public class JsonWebTokenValidFilter extends OncePerRequestFilter {
UserVo userVo = null;
JSONObject redirectObj = new JSONObject();
String authType = null;
-
+ //初始化request上下文
+ RequestContext.init(request, request.getRequestURI(), response);
//判断租户
try {
String tenant = request.getHeader("Tenant");
diff --git a/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java b/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java
new file mode 100644
index 0000000..cb3cfb1
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java
@@ -0,0 +1,37 @@
+/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.logback;
+
+import ch.qos.logback.classic.pattern.ClassicConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext;
+
+public class RequestUrlConverter extends ClassicConverter {
+ /**
+ * The convert method is responsible for extracting data from the event and
+ * storing it for later use by the write method.
+ *
+ * @param event
+ */
+ @Override
+ public String convert(ILoggingEvent event) {
+ RequestContext requestContext = RequestContext.get();
+ if (requestContext != null) {
+ return requestContext.getUrl();
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java b/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java
new file mode 100644
index 0000000..b20ceb4
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java
@@ -0,0 +1,38 @@
+/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+
+package com.neatlogic.autoexecrunner.logback;
+
+
+import ch.qos.logback.classic.pattern.ClassicConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
+
+public class TenantConverter extends ClassicConverter {
+ /**
+ * The convert method is responsible for extracting data from the event and
+ * storing it for later use by the write method.
+ *
+ * @param event
+ */
+ @Override
+ public String convert(ILoggingEvent event) {
+ TenantContext tenantContext = TenantContext.get();
+ if (tenantContext != null) {
+ return tenantContext.getTenantUuid();
+ }
+ return "";
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
index 141674c..a98bfc2 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
@@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -44,29 +45,28 @@ public class AutoexecQueueThread implements IStartUp {
while (running) {
if (workerThread == null || !workerThread.isAlive()) {
- System.out.println("工作线程未运行,准备启动...");
- logger.debug("工作线程未运行,准备启动...");
+ System.out.println("autoexec job thread is down,ready to start...");
+ logger.debug("autoexec job thread is down,ready to start...");
workerThread = new Thread(() -> {
- Thread.currentThread().setName("");
+ Thread.currentThread().setName("AutoexecQueueThread");
+ System.out.println("autoexec job thread start succeed!");
while (running) {
CommandVo commandVo = null;
try {
// 你的业务逻辑
- if (processQueue.size() <= Config.MAX_PROCESS_QUEUE_SIZE()) {
+ if (processQueue.size() <= Config.MAX_PROCESS_EXECUTE_COUNT()) {
commandVo = blockingQueue.take();
- logger.debug("作业{} 即将运行...", commandVo.getTenant() + "-" + commandVo.getJobId());
+ logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.MAX_PROCESS_EXECUTE_COUNT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)));
createSubProcessAndStart(commandVo);
} else {
- logger.debug("作业进程最大数量:{}, 需等待运行中的进程结束后,才继续创建队列内的作业进程!", Config.MAX_PROCESS_QUEUE_SIZE());
+ logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.MAX_PROCESS_EXECUTE_COUNT());
}
Thread.sleep(2000);
} catch (InterruptedException e) {
- System.out.printf("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage());
- logger.error(String.format("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e);
+ logger.error(String.format("autoexec job thread is interrupted...params:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e);
break;
} catch (Exception e) {
- System.out.printf("创建自动化作业子进程失败:入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage());
- logger.error(String.format("创建自动化作业子进程失败:入参:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e);
+ logger.error(String.format("create sub process failed:params:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e);
break; // 退出由外层 watchdog 负责重启
}
}
@@ -104,33 +104,34 @@ public class AutoexecQueueThread implements IStartUp {
}
env.put("tenant", commandVo.getTenant());
JSONObject payload = new JSONObject();
- Process process = null;
+ Process process;
try {
payload.put("jobId", commandVo.getJobId());
payload.put("status", 1);
payload.put("command", commandVo);
payload.put("passThroughEnv", commandVo.getPassThroughEnv().toJSONString());
process = builder.start();
+ String jobName = (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY));
addProcess(process);
- CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload));
+ logger.debug("autoexec job sub process {} is running, pid {},added to processQueue,now processQueue size is {}", jobName, getPid(process), processQueue.size());
+ CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload, jobName));
} catch (IOException e) {
- logger.error(String.format("进程启动失败: %s ,error: %s", payload, e.getMessage()), e);
- System.err.println("进程启动失败: " + payload + ",error:" + e.getMessage());
- } finally {
- // 确保关闭流
- if (process != null) {
- closeQuietly(process.getInputStream());
- closeQuietly(process.getErrorStream());
- closeQuietly(process.getOutputStream());
- }
+ logger.error(String.format("autoexec job sub process start failed: %s ,error: %s", payload, e.getMessage()), e);
}
}
- private void closeQuietly(java.io.Closeable c) {
+ // 兼容不同JDK版本的PID获取
+ private long getPid(Process p) {
try {
- if (c != null) c.close();
- } catch (IOException ignore) {
+ if (p.getClass().getName().contains("UNIXProcess")) {
+ Field pidField = p.getClass().getDeclaredField("pid");
+ pidField.setAccessible(true);
+ return pidField.getLong(p);
+ }
+ } catch (Exception e) {
+ // 忽略异常
}
+ return -1; // 未知PID
}
public void stop() {
@@ -148,10 +149,16 @@ public class AutoexecQueueThread implements IStartUp {
}
}
- public static void removeProcess(Process process) {
+ public static void removeProcess(Process process, String jobName, Long pid) {
boolean result = processQueue.remove(process);
if (!result) {
- logger.error("processQueue remove failed!current queue size is :{}", processQueue.size());
+ logger.error("autoexec process:{} (pid:{})finished. processQueue remove failed!current queue size is :{}", jobName, pid, blockingQueue.size());
+ } else {
+ logger.debug("autoexec process:{} (pid:{})finished. processQueue remove succeed!current queue size is :{}", jobName, pid, blockingQueue.size());
}
}
+
+ public static Integer getProcessQueueSize() {
+ return processQueue.size();
+ }
}
diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml
index e18ecc8..aefed5d 100755
--- a/src/main/resources/logback-spring.xml
+++ b/src/main/resources/logback-spring.xml
@@ -1,10 +1,12 @@
+
+
${LOG_HOME}/debug.log
- debug.log.%i
+ ${LOG_HOME}/debug.log.%i
1
5
@@ -18,14 +20,14 @@
100MB
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
${LOG_HOME}/info.log
- info.log.%i
+ ${LOG_HOME}/info.log.%i
1
5
@@ -39,7 +41,7 @@
100MB
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
@@ -47,7 +49,7 @@
class="ch.qos.logback.core.rolling.RollingFileAppender">
${LOG_HOME}/warn.log
- warn.log.%i
+ ${LOG_HOME}/warn.log.%i
1
5
@@ -61,14 +63,14 @@
100MB
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
${LOG_HOME}/error.log
- error.log.%i
+ ${LOG_HOME}/error.log.%i
1
5
@@ -82,7 +84,27 @@
100MB
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line]- %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
+
+
+
+
+ ${LOG_HOME}/process.log
+
+ ${LOG_HOME}/process.%i
+ 1
+ 5
+
+
+ DEBUG
+ ACCEPT
+ DENY
+
+
+ 100MB
+
+
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
@@ -110,7 +132,7 @@
- [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n
+ [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n
UTF-8
false
@@ -137,5 +159,12 @@
+
+
+
+
+
+
+
\ No newline at end of file
--
Gitee
From dc40d3a8746ccef8b642c8a27cf0a3439c33382a Mon Sep 17 00:00:00 2001
From: lvzk <897706680@qq.com>
Date: Fri, 11 Apr 2025 20:57:23 +0800
Subject: [PATCH 3/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?=
=?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?=
=?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../api/job/GetJobQueueStatusApi.java | 68 +++++++++++++++++++
.../autoexecrunner/api/job/JobExecApi.java | 2 +-
.../queue/NeatLogicUniqueBlockingQueue.java | 17 ++++-
.../autoexecrunner/dto/CommandVo.java | 14 ++--
.../startup/handler/AutoexecQueueThread.java | 8 ++-
5 files changed, 101 insertions(+), 8 deletions(-)
create mode 100755 src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java
new file mode 100755
index 0000000..9898c0a
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java
@@ -0,0 +1,68 @@
+/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved.
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .*/
+package com.neatlogic.autoexecrunner.api.job;
+
+import com.alibaba.fastjson.JSONObject;
+import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue;
+import com.neatlogic.autoexecrunner.constvalue.ApiParamType;
+import com.neatlogic.autoexecrunner.dto.CommandVo;
+import com.neatlogic.autoexecrunner.restful.annotation.Input;
+import com.neatlogic.autoexecrunner.restful.annotation.Param;
+import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+
+@Component
+public class GetJobQueueStatusApi extends PrivateApiComponentBase {
+ @Override
+ public String getName() {
+ return "获取作业排队状态";
+ }
+
+ @Input({
+ @Param(name = "jobId", type = ApiParamType.LONG, isRequired = true, desc = "作业id"),
+ @Param(name = "groupSort", type = ApiParamType.LONG, desc = "阶段组id"),
+ @Param(name = "phaseName", type = ApiParamType.STRING, desc = "阶段名"),
+ })
+ @Override
+ public Object myDoService(JSONObject jsonObj) throws Exception {
+ String jobId = jsonObj.getString("jobId");
+ Integer groupSort = jsonObj.getInteger("groupSort");
+ NeatLogicUniqueBlockingQueue blockingQueue = AutoexecQueueThread.getBlockingQueue();
+ List list = blockingQueue.getQueue();
+ JSONObject result = new JSONObject();
+ for (int i = 0; i < list.size(); i++) {
+ CommandVo commandVo = list.get(i);
+ JSONObject commandJson = new JSONObject();
+ commandJson.put("fcd",commandVo.getFcd().getTime());
+ commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','")));
+ if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) {
+ result.put(String.valueOf(i + 1), commandJson);
+ }
+ }
+ result.put("count", list.size());
+ return result;
+ }
+
+ @Override
+ public String getToken() {
+ return "/job/queue/status/get";
+ }
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
index 6ed4b7a..ad2ee79 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
@@ -82,7 +82,7 @@ public class JobExecApi extends PrivateApiComponentBase {
}
commandList.add("--reuseconslog");
commandVo.setCommandList(commandList);
- AutoexecQueueThread.addUpdateTagent(commandVo);
+ AutoexecQueueThread.addCommand(commandVo);
return null;
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
index dea5fb3..e34029b 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
@@ -19,12 +19,16 @@ package com.neatlogic.autoexecrunner.asynchronization.queue;
import com.alibaba.fastjson.JSON;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext;
+import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
public class NeatLogicUniqueBlockingQueue {
private static final Logger logger = LoggerFactory.getLogger(NeatLogicUniqueBlockingQueue.class);
@@ -69,7 +73,7 @@ public class NeatLogicUniqueBlockingQueue {
return task.getT();
}
- private static class Task {
+ public static class Task {
private final T t;
private final String tenantUuid;
@@ -93,10 +97,19 @@ public class NeatLogicUniqueBlockingQueue {
}
}
- public int size(){
+ public int size() {
return blockingQueue.size();
}
+ public List getQueue() {
+ List list = new ArrayList<>();
+ List> taskList = new ArrayList<>(blockingQueue);
+ if (CollectionUtils.isNotEmpty(taskList)) {
+ list = taskList.stream().map(Task::getT).collect(Collectors.toList());
+ }
+ return list;
+ }
+
// public static void main(String[] args) throws InterruptedException {
// NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1);
//
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
index 34d6c91..d3e3cb7 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
@@ -9,10 +9,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* @author lvzk
@@ -33,6 +30,7 @@ public class CommandVo {
private List jobGroupIdList;//需要执行的组
private JSONArray jobPhaseNodeSqlList;
private JSONObject environment;//设置环境变量
+ private Date fcd;
private String consoleLogPath;
@@ -219,4 +217,12 @@ public class CommandVo {
this.consoleLogPath = Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(getJobId(), new StringBuilder()) + File.separator + "log" + File.separator + "console.txt";
return consoleLogPath;
}
+
+ public Date getFcd() {
+ return fcd;
+ }
+
+ public void setFcd(Date fcd) {
+ this.fcd = fcd;
+ }
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
index a98bfc2..1685a99 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.util.Date;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -138,7 +139,8 @@ public class AutoexecQueueThread implements IStartUp {
running = false;
}
- public static void addUpdateTagent(CommandVo commandVo) {
+ public static void addCommand(CommandVo commandVo) {
+ commandVo.setFcd(new Date());
blockingQueue.offer(commandVo);
}
@@ -161,4 +163,8 @@ public class AutoexecQueueThread implements IStartUp {
public static Integer getProcessQueueSize() {
return processQueue.size();
}
+
+ public static NeatLogicUniqueBlockingQueue getBlockingQueue(){
+ return blockingQueue;
+ }
}
--
Gitee
From 791eb29c2f77d86db67eebdcfe7902182af246f0 Mon Sep 17 00:00:00 2001
From: lvzk <897706680@qq.com>
Date: Tue, 15 Apr 2025 17:15:49 +0800
Subject: [PATCH 4/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?=
=?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?=
=?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
...usApi.java => GetJobWaitingDetailApi.java} | 13 +++-------
.../autoexecrunner/api/job/JobExecApi.java | 5 +++-
.../exception/job/JobQueueFullException.java | 14 ++++++++++
.../startup/handler/AutoexecQueueThread.java | 26 +++++++++++++------
4 files changed, 40 insertions(+), 18 deletions(-)
rename src/main/java/com/neatlogic/autoexecrunner/api/job/{GetJobQueueStatusApi.java => GetJobWaitingDetailApi.java} (80%)
create mode 100755 src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java
similarity index 80%
rename from src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java
rename to src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java
index 9898c0a..498fe1e 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java
@@ -15,7 +15,6 @@ along with this program. If not, see .*/
package com.neatlogic.autoexecrunner.api.job;
import com.alibaba.fastjson.JSONObject;
-import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue;
import com.neatlogic.autoexecrunner.constvalue.ApiParamType;
import com.neatlogic.autoexecrunner.dto.CommandVo;
import com.neatlogic.autoexecrunner.restful.annotation.Input;
@@ -25,12 +24,11 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
import org.springframework.stereotype.Component;
import java.util.List;
-import java.util.Objects;
import java.util.stream.Collectors;
@Component
-public class GetJobQueueStatusApi extends PrivateApiComponentBase {
+public class GetJobWaitingDetailApi extends PrivateApiComponentBase {
@Override
public String getName() {
return "获取作业排队状态";
@@ -45,17 +43,14 @@ public class GetJobQueueStatusApi extends PrivateApiComponentBase {
public Object myDoService(JSONObject jsonObj) throws Exception {
String jobId = jsonObj.getString("jobId");
Integer groupSort = jsonObj.getInteger("groupSort");
- NeatLogicUniqueBlockingQueue blockingQueue = AutoexecQueueThread.getBlockingQueue();
- List list = blockingQueue.getQueue();
+ List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort(jobId,groupSort);
JSONObject result = new JSONObject();
for (int i = 0; i < list.size(); i++) {
CommandVo commandVo = list.get(i);
JSONObject commandJson = new JSONObject();
commandJson.put("fcd",commandVo.getFcd().getTime());
commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','")));
- if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) {
- result.put(String.valueOf(i + 1), commandJson);
- }
+ result.put(String.valueOf(i + 1), commandJson);
}
result.put("count", list.size());
return result;
@@ -63,6 +58,6 @@ public class GetJobQueueStatusApi extends PrivateApiComponentBase {
@Override
public String getToken() {
- return "/job/queue/status/get";
+ return "/job/waiting/detail/get";
}
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
index ad2ee79..c6596f5 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
@@ -18,6 +18,7 @@ import com.alibaba.fastjson.JSONObject;
import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext;
import com.neatlogic.autoexecrunner.constvalue.JobAction;
import com.neatlogic.autoexecrunner.dto.CommandVo;
+import com.neatlogic.autoexecrunner.exception.job.JobQueueFullException;
import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase;
import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
import com.neatlogic.autoexecrunner.util.FileUtil;
@@ -82,7 +83,9 @@ public class JobExecApi extends PrivateApiComponentBase {
}
commandList.add("--reuseconslog");
commandVo.setCommandList(commandList);
- AutoexecQueueThread.addCommand(commandVo);
+ if(!AutoexecQueueThread.addCommand(commandVo)){
+ throw new JobQueueFullException();
+ }
return null;
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java
new file mode 100755
index 0000000..8b25d14
--- /dev/null
+++ b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java
@@ -0,0 +1,14 @@
+package com.neatlogic.autoexecrunner.exception.job;
+
+
+import com.neatlogic.autoexecrunner.common.config.Config;
+import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
+
+public class JobQueueFullException extends ApiRuntimeException {
+
+ public JobQueueFullException() {
+ super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.MAX_PROCESS_QUEUE_SIZE() + "),无法执行");
+ }
+
+}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
index 1685a99..3b84ec8 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
@@ -17,15 +17,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
-import java.util.Date;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class AutoexecQueueThread implements IStartUp {
- private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE() + 5);
+ private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_EXECUTE_COUNT() + 5);
private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class);
- private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(50000);
+ private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE());
private volatile boolean running = true;
@Override
@@ -139,9 +138,9 @@ public class AutoexecQueueThread implements IStartUp {
running = false;
}
- public static void addCommand(CommandVo commandVo) {
+ public static boolean addCommand(CommandVo commandVo) {
commandVo.setFcd(new Date());
- blockingQueue.offer(commandVo);
+ return blockingQueue.offer(commandVo);
}
public static void addProcess(Process process) {
@@ -164,7 +163,18 @@ public class AutoexecQueueThread implements IStartUp {
return processQueue.size();
}
- public static NeatLogicUniqueBlockingQueue getBlockingQueue(){
- return blockingQueue;
+ public static Integer getBlockingQueueSize() {
+ return blockingQueue.size();
+ }
+
+ public static List getBlockingQueueByJobIdAndGroupSort(String jobId, Integer groupSort) {
+ List list = blockingQueue.getQueue();
+ List jobCommandList = new ArrayList<>();
+ for (CommandVo commandVo : list) {
+ if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) {
+ jobCommandList.add(commandVo);
+ }
+ }
+ return jobCommandList;
}
}
--
Gitee
From 86e147ec294eaf30014e0ad59442cb45e676e03b Mon Sep 17 00:00:00 2001
From: lvzk <897706680@qq.com>
Date: Wed, 16 Apr 2025 19:40:50 +0800
Subject: [PATCH 5/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?=
=?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?=
=?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../api/job/GetJobWaitingDetailApi.java | 13 +++++--
.../autoexecrunner/api/job/JobAbortApi.java | 10 +++--
.../autoexecrunner/api/job/JobExecApi.java | 12 ++++--
.../queue/NeatLogicUniqueBlockingQueue.java | 26 +++++++++++--
.../autoexecrunner/dto/CommandVo.java | 39 ++++++++++++++++---
.../startup/handler/AutoexecQueueThread.java | 24 +++++++-----
6 files changed, 97 insertions(+), 27 deletions(-)
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java
index 498fe1e..ab7ee80 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java
@@ -24,6 +24,7 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
import org.springframework.stereotype.Component;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
@@ -43,16 +44,22 @@ public class GetJobWaitingDetailApi extends PrivateApiComponentBase {
public Object myDoService(JSONObject jsonObj) throws Exception {
String jobId = jsonObj.getString("jobId");
Integer groupSort = jsonObj.getInteger("groupSort");
- List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort(jobId,groupSort);
+ List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort();
JSONObject result = new JSONObject();
for (int i = 0; i < list.size(); i++) {
CommandVo commandVo = list.get(i);
JSONObject commandJson = new JSONObject();
+ commandJson.put("groupSortList",commandVo.getJobGroupSortList());
+ commandJson.put("nodeSqlList",commandVo.getJobPhaseNodeSqlList());
+ commandJson.put("phaseNameList",commandVo.getJobPhaseNameList());
+ commandJson.put("resourceIdList",commandVo.getJobPhaseResourceIdList());
commandJson.put("fcd",commandVo.getFcd().getTime());
commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','")));
- result.put(String.valueOf(i + 1), commandJson);
+ if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupSortList().contains(groupSort))) {
+ result.put(String.valueOf(i + 1), commandJson);
+ }
}
- result.put("count", list.size());
+ result.put("count", AutoexecQueueThread.getBlockingQueueSize());
return result;
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java
index 8c46885..b31930f 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java
@@ -20,6 +20,7 @@ import com.neatlogic.autoexecrunner.constvalue.JobAction;
import com.neatlogic.autoexecrunner.core.ExecProcessCommand;
import com.neatlogic.autoexecrunner.dto.CommandVo;
import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase;
+import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool;
import org.springframework.stereotype.Component;
@@ -45,13 +46,16 @@ public class JobAbortApi extends PrivateApiComponentBase {
//set command
List commandList = Arrays.asList("autoexec", "--jobid", commandVo.getJobId(), "--execuser", UserContext.get().getUserUuid(), "--abort");
commandList = new ArrayList<>(commandList);
- if(commandVo.getPassThroughEnv() != null){
+ if (commandVo.getPassThroughEnv() != null) {
commandList.add("--passthroughenv");
commandList.add(commandVo.getPassThroughEnv().toString());
}
commandVo.setCommandList(commandList);
- ExecProcessCommand processCommand = new ExecProcessCommand(commandVo);
- CommonThreadPool.execute(processCommand);
+ //队列里不存在才执行abort命令
+ if (!AutoexecQueueThread.removeCommand(commandVo)) {
+ ExecProcessCommand processCommand = new ExecProcessCommand(commandVo);
+ CommonThreadPool.execute(processCommand);
+ }
return null;
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
index c6596f5..d9c534f 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java
@@ -23,6 +23,8 @@ import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentB
import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
import com.neatlogic.autoexecrunner.util.FileUtil;
import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@@ -36,6 +38,8 @@ import java.util.stream.Collectors;
**/
@Component
public class JobExecApi extends PrivateApiComponentBase {
+ private static final Logger logger = LoggerFactory.getLogger(JobExecApi.class);
+
@Override
public String getName() {
return "创建执行作业剧本进程";
@@ -65,9 +69,9 @@ public class JobExecApi extends PrivateApiComponentBase {
commandList.add("--passthroughenv");
commandList.add(commandVo.getPassThroughEnv().toString());
}
- if (CollectionUtils.isNotEmpty(commandVo.getJobGroupIdList())) {
+ if (CollectionUtils.isNotEmpty(commandVo.getJobGroupSortList())) {
commandList.add("--phasegroups");
- commandList.add(commandVo.getJobGroupIdList().stream().map(Object::toString).collect(Collectors.joining("','")));
+ commandList.add(commandVo.getJobGroupSortList().stream().map(Object::toString).collect(Collectors.joining("','")));
}
if (CollectionUtils.isNotEmpty(commandVo.getJobPhaseNameList())) {
commandList.add("--phases");
@@ -83,8 +87,10 @@ public class JobExecApi extends PrivateApiComponentBase {
}
commandList.add("--reuseconslog");
commandVo.setCommandList(commandList);
- if(!AutoexecQueueThread.addCommand(commandVo)){
+ if (AutoexecQueueThread.addCommand(commandVo) == -1) {
throw new JobQueueFullException();
+ } else if (AutoexecQueueThread.addCommand(commandVo) == 0) {
+ logger.debug("队列里已存在相同的执行命令:{}", String.join(",", commandList));
}
return null;
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
index e34029b..1af28a5 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
public class NeatLogicUniqueBlockingQueue {
@@ -40,7 +41,11 @@ public class NeatLogicUniqueBlockingQueue {
this.taskMap = new ConcurrentHashMap<>();
}
- public boolean offer(T t) {
+ /**
+ * 添加队列成员
+ * 返回-1:队列已满,1:添加成功,0:重复添加
+ */
+ public int offer(T t) {
Task task = new Task<>(t);
// 保证任务唯一性
if (taskMap.putIfAbsent(task.getUniqueKey(), Boolean.TRUE) == null) {
@@ -51,14 +56,29 @@ public class NeatLogicUniqueBlockingQueue {
// 如果队列已满,移除任务标记
taskMap.remove(task.getUniqueKey());
logger.error("Queue is full!");
+ return -1;
}
- return added;
+ return 1;
} else {
if (t != null) {
logger.debug("NeatLogicUniqueBlockingQueue repeat: {}", JSON.toJSONString(t));
}
+ return 0;
}
- return false; // 已存在任务,直接返回 false
+ }
+
+ public boolean remove(Predicate> condition) {
+ for (Task task : blockingQueue) {
+ if (condition.test(task)) {
+ boolean removed = blockingQueue.remove(task);
+ if (removed) {
+ taskMap.remove(task.getUniqueKey());
+ logger.debug("Removed task: {}", JSON.toJSONString(task));
+ return true;
+ }
+ }
+ }
+ return false;
}
public T take() throws InterruptedException {
diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
index d3e3cb7..db7a3ca 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java
@@ -27,7 +27,7 @@ public class CommandVo {
private JSONObject passThroughEnv;//web端传到runner贯穿autoexec 回调web端会携带该变量
private List jobPhaseNameList;//需要执行的phaseNameList
private List jobPhaseResourceIdList;//需要执行的resourceIdList
- private List jobGroupIdList;//需要执行的组
+ private List jobGroupSortList;//需要执行的组
private JSONArray jobPhaseNodeSqlList;
private JSONObject environment;//设置环境变量
private Date fcd;
@@ -71,9 +71,9 @@ public class CommandVo {
if (CollectionUtils.isNotEmpty(jobPhaseResourceIdArray)) {
this.jobPhaseResourceIdList = jobPhaseResourceIdArray.toJavaList(Long.class);
}
- JSONArray jobGroupIdArray = jsonObj.getJSONArray("jobGroupIdList");
+ JSONArray jobGroupIdArray = jsonObj.getJSONArray("jobGroupSortList");
if (CollectionUtils.isNotEmpty(jobGroupIdArray)) {
- this.jobGroupIdList = jobGroupIdArray.toJavaList(Integer.class);
+ this.jobGroupSortList = jobGroupIdArray.toJavaList(Integer.class);
}
JSONArray jobPhaseNodeSqlList = jsonObj.getJSONArray("jobPhaseNodeSqlList");
@@ -187,8 +187,8 @@ public class CommandVo {
return jobPhaseResourceIdList;
}
- public List getJobGroupIdList() {
- return jobGroupIdList;
+ public List getJobGroupSortList() {
+ return jobGroupSortList;
}
public JSONArray getJobPhaseNodeSqlList() {
@@ -225,4 +225,33 @@ public class CommandVo {
public void setFcd(Date fcd) {
this.fcd = fcd;
}
+
+ private String getFilteredCommandString() {
+ if (commandList == null) return "";
+ List filtered = new ArrayList<>();
+ Iterator iterator = commandList.iterator();
+ while (iterator.hasNext()) {
+ String item = iterator.next();
+ if ("--execid".equals(item)) {
+ // 跳过 "--execid" 和它后面的那个参数
+ if (iterator.hasNext()) iterator.next();
+ continue;
+ }
+ filtered.add(item);
+ }
+ return String.join(",", filtered);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof CommandVo)) return false;
+ CommandVo that = (CommandVo) o;
+ return Objects.equals(getFilteredCommandString(), that.getFilteredCommandString());
+ }
+
+ @Override
+ public int hashCode() {
+ return getFilteredCommandString().hashCode();
+ }
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
index 3b84ec8..7c34974 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
@@ -138,11 +138,22 @@ public class AutoexecQueueThread implements IStartUp {
running = false;
}
- public static boolean addCommand(CommandVo commandVo) {
+ /**
+ * 进入队列
+ * 返回-1:队列已满,1:添加成功,0:重复添加
+ */
+ public static int addCommand(CommandVo commandVo) {
commandVo.setFcd(new Date());
return blockingQueue.offer(commandVo);
}
+ /**
+ * 删除命令
+ */
+ public static boolean removeCommand(CommandVo commandVo) {
+ return blockingQueue.remove(task -> task.getT().getJobId().equals(commandVo.getJobId()));
+ }
+
public static void addProcess(Process process) {
boolean result = processQueue.offer(process);
if (!result) {
@@ -167,14 +178,7 @@ public class AutoexecQueueThread implements IStartUp {
return blockingQueue.size();
}
- public static List getBlockingQueueByJobIdAndGroupSort(String jobId, Integer groupSort) {
- List list = blockingQueue.getQueue();
- List jobCommandList = new ArrayList<>();
- for (CommandVo commandVo : list) {
- if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) {
- jobCommandList.add(commandVo);
- }
- }
- return jobCommandList;
+ public static List getBlockingQueueByJobIdAndGroupSort() {
+ return blockingQueue.getQueue();
}
}
--
Gitee
From 82830847eeb1eb3f6d30fac791124cfd2393308f Mon Sep 17 00:00:00 2001
From: lvzk <897706680@qq.com>
Date: Thu, 17 Apr 2025 11:48:05 +0800
Subject: [PATCH 6/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?=
=?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?=
=?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../autoexecrunner/common/config/Config.java | 16 ++++++++--------
.../exception/job/JobQueueFullException.java | 2 +-
.../startup/handler/AutoexecQueueThread.java | 10 +++++-----
src/main/resources/application.properties | 10 +++++++++-
4 files changed, 23 insertions(+), 15 deletions(-)
diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
index dd660c9..77cfc7b 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java
@@ -37,8 +37,8 @@ public class Config {
private static String DATA_HOME;//文件根目录
private static String DEPLOY_HOME;//发布目录
private static String GITLAB_PASSWORD;// gitlab private_token
- private static Integer MAX_PROCESS_QUEUE_SIZE;//最大自动化作业队列数,多余的则丢弃
- private static Integer MAX_PROCESS_EXECUTE_COUNT;//最大执行作业数,超过的则进入队列
+ private static Integer SUBPROCESS_COMMAND_QUEUE_MAX_SIZE;//自动化作业命令等待队列的最大容量,超过此数量的新命令将被拒绝或丢弃
+ private static Integer SUBPROCESS_EXECUTION_MAX_CONCURRENT;//自动化作业最大并发子进程数,超过此数量的命令将进入等待队列
//neatlogic
private static String NEATLOGIC_ROOT;
@@ -170,11 +170,11 @@ public class Config {
return AUTOEXEC_TOKEN;
}
- public static Integer MAX_PROCESS_QUEUE_SIZE() {
- return MAX_PROCESS_QUEUE_SIZE;
+ public static Integer SUBPROCESS_COMMAND_QUEUE_MAX_SIZE() {
+ return SUBPROCESS_COMMAND_QUEUE_MAX_SIZE;
}
- public static Integer MAX_PROCESS_EXECUTE_COUNT() {
- return MAX_PROCESS_EXECUTE_COUNT;
+ public static Integer SUBPROCESS_EXECUTION_MAX_CONCURRENT() {
+ return SUBPROCESS_EXECUTION_MAX_CONCURRENT;
}
@PostConstruct
@@ -248,9 +248,9 @@ public class Config {
UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000"));
- MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "1000"));
+ SUBPROCESS_COMMAND_QUEUE_MAX_SIZE = Integer.parseInt(prop.getProperty("subprocess.command.queue.max-size", "1000"));
- MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.count", "20"));
+ SUBPROCESS_EXECUTION_MAX_CONCURRENT = Integer.parseInt(prop.getProperty("subprocess.execution.max-concurrent", "20"));
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java
index 8b25d14..a72dae6 100755
--- a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java
@@ -8,7 +8,7 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread;
public class JobQueueFullException extends ApiRuntimeException {
public JobQueueFullException() {
- super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.MAX_PROCESS_QUEUE_SIZE() + "),无法执行");
+ super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.SUBPROCESS_COMMAND_QUEUE_MAX_SIZE() + "),无法执行");
}
}
diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
index 7c34974..9f1a4e0 100644
--- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
+++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java
@@ -22,9 +22,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class AutoexecQueueThread implements IStartUp {
- private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_EXECUTE_COUNT() + 5);
+ private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT() + 5);
private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class);
- private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE());
+ private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.SUBPROCESS_COMMAND_QUEUE_MAX_SIZE());
private volatile boolean running = true;
@Override
@@ -54,12 +54,12 @@ public class AutoexecQueueThread implements IStartUp {
CommandVo commandVo = null;
try {
// 你的业务逻辑
- if (processQueue.size() <= Config.MAX_PROCESS_EXECUTE_COUNT()) {
+ if (processQueue.size() <= Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT()) {
commandVo = blockingQueue.take();
- logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.MAX_PROCESS_EXECUTE_COUNT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)));
+ logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)));
createSubProcessAndStart(commandVo);
} else {
- logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.MAX_PROCESS_EXECUTE_COUNT());
+ logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT());
}
Thread.sleep(2000);
} catch (InterruptedException e) {
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 2606178..81f7bfe 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -14,7 +14,6 @@ spring.servlet.multipart.max-file-size=100MB
#NEATLOGIC WEB
neatlogic.root=http://127.0.0.1:8080/neatlogic
-#认证 连接时校验
#RUNNER
@@ -32,3 +31,12 @@ autoexec.home=/Users/cocokong/IdeaProjects/autoexec/data/job
deploy.home=/app/autoexec/data/verdata
data.home=${runner.home}/data
file.mimetype.text.plain=sql text c cc c++ cpp h pl py txt java el gitignore js css properties jsp yml json md vue sh config htm html xml classpath project pm less scss
+
+
+# Maximum number of concurrent subprocesses for autoexec jobs.
+# Commands exceeding this limit will be placed in the waiting queue.
+subprocess.execution.max-concurrent=20
+
+# Maximum capacity of the command waiting queue for autoexec jobs.
+# New commands exceeding this limit will be rejected or discarded.
+subprocess.command.queue.max-size=1000
--
Gitee