From 0439e1dba9b769f71d83a87e417ba8704bed1398 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Wed, 9 Apr 2025 19:31:27 +0800 Subject: [PATCH 1/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../autoexecrunner/api/job/JobExecApi.java | 6 +- .../asynchronization/NeatLogicThread.java | 190 ++++++++++++++++++ .../queue/NeatLogicUniqueBlockingQueue.java | 102 ++++++++++ .../threadlocal/UserContext.java | 24 ++- .../threadpool/CachedThreadPool.java | 144 +++++++++++++ .../autoexec/ProcessWaitTask.java | 69 +++++++ .../autoexecrunner/common/config/Config.java | 13 ++ .../core/ExecProcessCommand.java | 1 + .../autoexecrunner/dto/ThreadPoolVo.java | 124 ++++++++++++ .../autoexecrunner/dto/ThreadTaskVo.java | 84 ++++++++ .../autoexecrunner/dto/ThreadVo.java | 62 ++++++ .../startup/handler/AutoexecQueueThread.java | 157 +++++++++++++++ 12 files changed, 962 insertions(+), 14 deletions(-) create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index 7622d2e..6ed4b7a 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -17,10 +17,9 @@ package com.neatlogic.autoexecrunner.api.job; import com.alibaba.fastjson.JSONObject; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; import com.neatlogic.autoexecrunner.constvalue.JobAction; -import com.neatlogic.autoexecrunner.core.ExecProcessCommand; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; -import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.util.FileUtil; import org.apache.commons.collections4.CollectionUtils; import org.springframework.stereotype.Component; @@ -83,8 +82,7 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); - CommonThreadPool.execute(processCommand); + AutoexecQueueThread.addUpdateTagent(commandVo); return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java new file mode 100644 index 0000000..d37b747 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java @@ -0,0 +1,190 @@ +/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.asynchronization; + + +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; +import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; + +public abstract class NeatLogicThread implements Runnable, Comparable { + private static final Logger logger = LoggerFactory.getLogger(NeatLogicThread.class); + protected UserContext userContext; + private final String tenantUuid; + private String threadName; + private boolean isUnique = false; + /* For generating thread ID */ + private long id; + private int priority = 3;//默认优先级是3,数字越低优先级越高 + private Semaphore lock;//用于hold住其他异步线程,控制两个异步线程的先后顺序 + private CountDownLatch countDownLatch;//用于hold住主线程,这里只需要countdown,不需要等待 + private boolean needAwaitAdvance = true;// 是否需要等待所有模块加载完成后再任务 + + @Override + public int compareTo(NeatLogicThread other) { + return Integer.compare(this.priority, other.priority); // 优先级高的先出队,priority越小代表优先级越高 + } + + public Semaphore getLock() { + return lock; + } + + public void setLock(Semaphore lock) { + this.lock = lock; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public int getPriority() { + if (priority <= 1) { + priority = 1; + } + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + + + /*public NeatLogicThread() { + userContext = UserContext.get(); + tenantContext = TenantContext.get(); + inputFromContext = InputFromContext.get(); + }*/ + + /*public NeatLogicThread(UserContext _userContext, TenantContext _tenantContext) { + if (_userContext != null) { + userContext = _userContext.copy(); + } + tenantUuid = _tenantContext.getTenantUuid(); + activeModuleList = _tenantContext.getActiveModuleList(); + inputFromContext = InputFromContext.get(); + }*/ + + + public NeatLogicThread(String _threadName) { + UserContext tmp = UserContext.get(); + if (tmp != null) { + userContext = tmp.copy(); + } + tenantUuid = TenantContext.get().getTenantUuid(); + this.threadName = _threadName; + } + + public NeatLogicThread(String _threadName, int priority) { + UserContext tmp = UserContext.get(); + if (tmp != null) { + userContext = tmp.copy(); + } + tenantUuid = TenantContext.get().getTenantUuid(); + this.threadName = _threadName; + this.priority = priority; + } + + public NeatLogicThread(String _threadName, boolean _isUnique) { + userContext = UserContext.get(); + tenantUuid = TenantContext.get().getTenantUuid(); + this.threadName = _threadName; + this.isUnique = _isUnique; + } + + @Override + public final void run() { + TenantContext tenantContext = TenantContext.init(tenantUuid); + UserContext.init(userContext); + try { + String oldThreadName = Thread.currentThread().getName(); + if (StringUtils.isNotBlank(threadName)) { + Thread.currentThread().setName(threadName); + } + + if (this.lock != null) { + //System.out.println(this.getThreadName() + "尝试获取锁" + this.lock); + lock.acquire(); + //System.out.println(this.getThreadName() + "成功获取锁" + this.lock); + } + execute(); + Thread.currentThread().setName(oldThreadName); + } catch (ApiRuntimeException ex) { + logger.warn(ex.getMessage(), ex); + } catch (Exception ex) { + logger.error(ex.getMessage(), ex); + } finally { + if (this.lock != null) { + lock.release(); + //System.out.println(this.getThreadName() + "成功释放锁" + this.lock); + } + if (countDownLatch != null) { + countDownLatch.countDown(); + } + // 清除所有threadlocal + if (TenantContext.get() != null) { + TenantContext.get().release(); + } + if (UserContext.get() != null) { + UserContext.get().release(); + } + } + } + + protected abstract void execute(); + + public void setUnique(boolean unique) { + isUnique = unique; + } + + public boolean isUnique() { + return isUnique; + } + + public void setThreadName(String threadName) { + this.threadName = threadName; + } + + public String getThreadName() { + return threadName; + } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + public boolean isNeedAwaitAdvance() { + return needAwaitAdvance; + } + + public void setNeedAwaitAdvance(boolean needAwaitAdvance) { + this.needAwaitAdvance = needAwaitAdvance; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java new file mode 100644 index 0000000..ae9deaf --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.asynchronization.queue; + +import com.alibaba.fastjson.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +public class NeatLogicUniqueBlockingQueue { + private static final Logger logger = LoggerFactory.getLogger(NeatLogicUniqueBlockingQueue.class); + private final BlockingQueue> blockingQueue; + private final ConcurrentHashMap taskMap; // 用于去重 + + public NeatLogicUniqueBlockingQueue(int capacity) { + this.blockingQueue = new LinkedBlockingQueue<>(capacity); + this.taskMap = new ConcurrentHashMap<>(); + } + + public boolean offer(T t) { + Task task = new Task<>(t); + // 保证任务唯一性 + if (taskMap.putIfAbsent(task.getUniqueKey(), Boolean.TRUE) == null) { + logger.debug("====TagentUpdateInfo-addQueue:" + JSON.toJSONString(task)); + // 如果任务是新任务,放入队列 + boolean added = blockingQueue.offer(task); + if (!added) { + // 如果队列已满,移除任务标记 + taskMap.remove(task.getUniqueKey()); + logger.error("Queue is full!"); + } + return added; + } else { + if (t != null) { + logger.debug("NeatLogicUniqueBlockingQueue repeat: {}", JSON.toJSONString(t)); + } + } + return false; // 已存在任务,直接返回 false + } + + public T take() throws InterruptedException { + Task task = blockingQueue.take(); // 阻塞式获取任务 + taskMap.remove(task.getUniqueKey()); // 移除已处理任务的唯一标记 + return task.getT(); + } + + private static class Task { + private final T t; + + public Task(T t) { + this.t = t; + } + + public T getT() { + return t; + } + + public String getUniqueKey() { + return String.valueOf(t.hashCode()); + } + } + + public int size(){ + return blockingQueue.size(); + } + +// public static void main(String[] args) throws InterruptedException { +// NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1); +// +// // 模拟任务插入 +// UserSessionVo a = new UserSessionVo(); +// a.setToken("1111"); +// System.out.println(queue.offer(a)); // 返回 true,任务插入成功 +// UserSessionVo b = new UserSessionVo(); +// b.setToken("222"); +// System.out.println(queue.offer(b)); // 返回 false,任务已存在 +// +// // 模拟任务消费 +// UserSessionVo task = queue.take(); // 消费 "task1" +// UserSessionVo task2 = queue.take(); // 消费 "task1" +// UserSessionVo task3 = queue.take(); // 消费 "task1" +// } +} + diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java index 89bf0a2..f63f71a 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/UserContext.java @@ -27,19 +27,23 @@ public class UserContext implements Serializable { private String timezone = "+8:00"; private String token; private List roleUuidList = new ArrayList<>(); - + + public UserContext copy() { + UserContext userContext = new UserContext(); + userContext.setRequest(request); + userContext.setToken(token); + userContext.setTenant(tenant); + userContext.setUserName(userName); + userContext.setUserId(userId); + userContext.setUserUuid(userUuid); + userContext.setTimezone(timezone); + return userContext; + } + public static UserContext init(UserContext _userContext) { UserContext context = new UserContext(); if (_userContext != null) { - context.setUserId(_userContext.getUserId()); - context.setUserUuid(_userContext.getUserUuid()); - context.setUserName(_userContext.getUserName()); - context.setTenant(_userContext.getTenant()); - context.setTimezone(_userContext.getTimezone()); - context.setToken(_userContext.getToken()); - // context.setRequest(_userContext.getRequest()); - // context.setResponse(_userContext.getResponse()); - context.setRoleUuidList(_userContext.getRoleUuidList()); + context = _userContext.copy(); } instance.set(context); return context; diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java new file mode 100644 index 0000000..b1f50a2 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadpool/CachedThreadPool.java @@ -0,0 +1,144 @@ +/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.asynchronization.threadpool; + + +import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread; +import com.neatlogic.autoexecrunner.dto.ThreadPoolVo; +import com.neatlogic.autoexecrunner.dto.ThreadTaskVo; +import com.neatlogic.autoexecrunner.dto.ThreadVo; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.*; +import java.util.concurrent.*; + +public class CachedThreadPool { + static int rank = 15; + static int cpu = Runtime.getRuntime().availableProcessors(); + private static final Log logger = LogFactory.getLog(CachedThreadPool.class); + private static final Map threadTaskMap = new ConcurrentHashMap<>(); + private static final Map threadMap = new ConcurrentHashMap<>(); + private static final Set threadSet = new HashSet<>(); + private static final PriorityBlockingQueue threadQueue = new PriorityBlockingQueue<>(); + + static class NeatLogicThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r) { + @Override + public void run() { + try { + super.run(); + } finally { + threadMap.remove(this.getId()); + } + } + }; + threadMap.put(thread.getId(), new ThreadVo(thread.getId(), thread.getName())); + return thread; + } + } + + private static final ThreadPoolExecutor mainThreadPool = new ThreadPoolExecutor(0, cpu * rank, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), new NeatLogicThreadFactory(), new NeatLogicRejectHandler()) { + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + if (r instanceof NeatLogicThread) { + NeatLogicThread nt = (NeatLogicThread) r; + nt.setId(t.getId()); + ThreadTaskVo threadVo = new ThreadTaskVo(); + threadVo.setId(t.getId()); + threadVo.setName(nt.getThreadName()); + threadVo.setPoolName("main"); + threadVo.setStartTime(new Date()); + threadVo.setPriority(nt.getPriority()); + threadTaskMap.put(nt.getId(), threadVo); + threadSet.add(nt.getThreadName()); + } + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + + // 任务完成后从 activeTasks 中移除 + if (r instanceof NeatLogicThread) { + NeatLogicThread task = (NeatLogicThread) r; + threadTaskMap.remove(task.getId()); + threadSet.remove(task.getThreadName()); + } + //尝试从队列中拿出任务处理 + Runnable task = threadQueue.poll(); + if (task != null) { + mainThreadPool.execute(task); + } + } + }; + + public static void execute(NeatLogicThread command, CountDownLatch countDownLatch) { + command.setCountDownLatch(countDownLatch); + execute(command); + } + + public static void execute(NeatLogicThread command, Semaphore lock) { + command.setLock(lock); + execute(command); + } + + public static void execute(NeatLogicThread command) { + try { + boolean isExists = command.isUnique() && StringUtils.isNotBlank(command.getThreadName()) && threadSet.contains(command.getThreadName()); + if (!isExists) { + mainThreadPool.execute(command); + } else { + logger.warn(command.getThreadName() + " is running"); + } + } catch (RejectedExecutionException ex) { + logger.error(ex.getMessage(), ex); + } + } + + + static class NeatLogicRejectHandler implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + //进入等待队列 + if (r instanceof NeatLogicThread) { + threadQueue.offer((NeatLogicThread) r); + } else { + logger.error("线程池已满,非NeatLogicThread子类线程将被抛弃"); + } + } + } + + + public static ThreadPoolVo getStatus() { + ThreadPoolVo threadPoolVo = new ThreadPoolVo(); + List threadTasks = new ArrayList<>(threadTaskMap.values()); + List threads = new ArrayList<>(threadMap.values()); + threadPoolVo.setThreadTaskList(threadTasks); + threadPoolVo.setThreadList(threads); + threadPoolVo.setMaxThreadCount(cpu * rank); + threadPoolVo.setMainPoolSize(mainThreadPool.getPoolSize()); + threadPoolVo.setMainActiveCount(mainThreadPool.getActiveCount()); + threadPoolVo.setMainQueueSize(threadQueue.size()); + return threadPoolVo; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java new file mode 100644 index 0000000..8068c16 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java @@ -0,0 +1,69 @@ +package com.neatlogic.autoexecrunner.autoexec; + +import com.alibaba.fastjson.JSONObject; +import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread; +import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; + +public class ProcessWaitTask extends NeatLogicThread { + private static final Logger logger = LoggerFactory.getLogger(ProcessWaitTask.class); + private final Process process; + private final CommandVo commandVo; + private final JSONObject payload; + + public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload) { + super("THREAD-AUTOEXEC-WAIT-" + commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)); + this.process = process; + this.commandVo = commandVo; + this.payload = payload; + } + + + @Override + protected void execute() { + + try { + int exitCode = process.waitFor(); + int exitStatus = process.exitValue(); + commandVo.setExitValue(exitStatus); + logger.debug("进程[{}] 退出,状态码: {}", getPid(process), exitCode); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error(String.format("等待被中断: 入参:%s, errorMsg:%s", payload, e.getMessage()), e); + } finally { + // 确保关闭流 + closeQuietly(process.getInputStream()); + closeQuietly(process.getErrorStream()); + closeQuietly(process.getOutputStream()); + AutoexecQueueThread.removeProcess(process); + } + } + + // 兼容不同JDK版本的PID获取 + private long getPid(Process p) { + try { + if (p.getClass().getName().contains("UNIXProcess")) { + Field pidField = p.getClass().getDeclaredField("pid"); + pidField.setAccessible(true); + return pidField.getLong(p); + } + } catch (Exception e) { + // 忽略异常 + } + return -1; // 未知PID + } + + private void closeQuietly(java.io.Closeable c) { + try { + if (c != null) c.close(); + } catch (IOException ignore) { + } + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java index 7a5eb5e..bac3c8c 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java +++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java @@ -37,6 +37,8 @@ public class Config { private static String DATA_HOME;//文件根目录 private static String DEPLOY_HOME;//发布目录 private static String GITLAB_PASSWORD;// gitlab private_token + private static Integer MAX_PROCESS_QUEUE_SIZE;//最大自动化作业队列数,多余的则丢弃 + private static Integer MAX_PROCESS_EXECUTE_COUNT;//最大执行作业数,超过的则进入队列 //neatlogic private static String NEATLOGIC_ROOT; @@ -168,6 +170,13 @@ public class Config { return AUTOEXEC_TOKEN; } + public static Integer MAX_PROCESS_QUEUE_SIZE() { + return MAX_PROCESS_QUEUE_SIZE; + } + public static Integer MAX_PROCESS_EXECUTE_COUNT() { + return MAX_PROCESS_EXECUTE_COUNT; + } + @PostConstruct public void init() { try { @@ -238,6 +247,10 @@ public class Config { AUTOEXEC_TOKEN = prop.getProperty("autoexec.token", "499922b4317c251c2ce525f7b83e3d94"); UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000")); + + MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "2000")); + + MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.size", "10")); } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java index 14acccf..f4b3350 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java +++ b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; * @author lvzk * @since 2021/4/21 17:12 **/ +@Deprecated public class ExecProcessCommand implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ExecProcessCommand.class); private final ProcessBuilder builder; diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java new file mode 100644 index 0000000..a12af67 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadPoolVo.java @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.dto; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; + +public class ThreadPoolVo { + private int mainQueueSize; + private int mainPoolSize; + private int backupQueueSize; + private int backupPoolSize; + private int mainActiveCount; + private int backupActiveCount; + private int maxThreadCount; + private List threadTaskList = new ArrayList<>(); + private List threadList = new ArrayList<>(); + + public int getMaxThreadCount() { + return maxThreadCount; + } + + public void setMaxThreadCount(int maxThreadCount) { + this.maxThreadCount = maxThreadCount; + } + + + public int getMainQueueSize() { + return mainQueueSize; + } + + public void setMainQueueSize(int mainQueueSize) { + this.mainQueueSize = mainQueueSize; + } + + public int getMainPoolSize() { + return mainPoolSize; + } + + public int getMainActiveCount() { + return mainActiveCount; + } + + public void setMainActiveCount(int mainActiveCount) { + this.mainActiveCount = mainActiveCount; + } + + public int getBackupActiveCount() { + return backupActiveCount; + } + + public void setBackupActiveCount(int backupActiveCount) { + this.backupActiveCount = backupActiveCount; + } + + public void setMainPoolSize(int mainPoolSize) { + this.mainPoolSize = mainPoolSize; + } + + public int getBackupQueueSize() { + return backupQueueSize; + } + + public void setBackupQueueSize(int backupQueueSize) { + this.backupQueueSize = backupQueueSize; + } + + public int getBackupPoolSize() { + return backupPoolSize; + } + + public void setBackupPoolSize(int backupPoolSize) { + this.backupPoolSize = backupPoolSize; + } + + boolean isSorted = false; + + public List getThreadTaskList() { + if (CollectionUtils.isNotEmpty(threadTaskList)) { + threadTaskList.sort((o1, o2) -> { + long s = o1.getStartTime().getTime(); + long e = o2.getStartTime().getTime(); + return Long.compare(s, e); + }); + } + return threadTaskList; + } + + public void setThreadTaskList(List threadTaskList) { + this.threadTaskList = threadTaskList; + } + + public List getThreadList() { + if (CollectionUtils.isNotEmpty(threadList)) { + threadList.sort((o1, o2) -> { + long s = o1.getStartTime().getTime(); + long e = o2.getStartTime().getTime(); + return Long.compare(s, e); + }); + } + return threadList; + } + + public void setThreadList(List threadList) { + this.threadList = threadList; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java new file mode 100644 index 0000000..0c95c6c --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadTaskVo.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.dto; + +import java.util.Date; + +public class ThreadTaskVo { + private Long id; + private String name; + private Date startTime; + private long timeCost; + private String poolName; + private String status; + private int priority; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public long getTimeCost() { + return System.currentTimeMillis() - this.startTime.getTime(); + } + + + public String getPoolName() { + return poolName; + } + + public void setPoolName(String poolName) { + this.poolName = poolName; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java new file mode 100644 index 0000000..3b4f2c0 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/ThreadVo.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.neatlogic.autoexecrunner.dto; + +import java.util.Date; + +public class ThreadVo { + private long id; + private String name; + private Date startTime; + private long timeCost; + + public ThreadVo(long id, String name) { + this.id = id; + this.name = name; + this.startTime = new Date(); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public Date getStartTime() { + return startTime; + } + + public void setStartTime(Date startTime) { + this.startTime = startTime; + } + + public long getTimeCost() { + return System.currentTimeMillis() - this.startTime.getTime(); + } + +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java new file mode 100644 index 0000000..141674c --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -0,0 +1,157 @@ +package com.neatlogic.autoexecrunner.startup.handler; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue; +import com.neatlogic.autoexecrunner.asynchronization.threadpool.CachedThreadPool; +import com.neatlogic.autoexecrunner.autoexec.ProcessWaitTask; +import com.neatlogic.autoexecrunner.common.config.Config; +import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.startup.IStartUp; +import com.neatlogic.autoexecrunner.util.FileUtil; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class AutoexecQueueThread implements IStartUp { + private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE() + 5); + private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class); + private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(50000); + private volatile boolean running = true; + + @Override + public String getName() { + return "创建自动化作业线程"; + } + + @Override + public String getDescription() { + return null; + } + + @Override + public void doService() { + // 启动监控线程 + Thread watchdog = new Thread(() -> { + Thread workerThread = null; + + while (running) { + if (workerThread == null || !workerThread.isAlive()) { + System.out.println("工作线程未运行,准备启动..."); + logger.debug("工作线程未运行,准备启动..."); + workerThread = new Thread(() -> { + Thread.currentThread().setName(""); + while (running) { + CommandVo commandVo = null; + try { + // 你的业务逻辑 + if (processQueue.size() <= Config.MAX_PROCESS_QUEUE_SIZE()) { + commandVo = blockingQueue.take(); + logger.debug("作业{} 即将运行...", commandVo.getTenant() + "-" + commandVo.getJobId()); + createSubProcessAndStart(commandVo); + } else { + logger.debug("作业进程最大数量:{}, 需等待运行中的进程结束后,才继续创建队列内的作业进程!", Config.MAX_PROCESS_QUEUE_SIZE()); + } + Thread.sleep(2000); + } catch (InterruptedException e) { + System.out.printf("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()); + logger.error(String.format("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); + break; + } catch (Exception e) { + System.out.printf("创建自动化作业子进程失败:入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()); + logger.error(String.format("创建自动化作业子进程失败:入参:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); + break; // 退出由外层 watchdog 负责重启 + } + } + }); + + workerThread.setDaemon(true); + workerThread.start(); + } + + try { + Thread.sleep(2000); // 每2秒检查一次 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + }); + + watchdog.setDaemon(true); + watchdog.start(); + } + + private void createSubProcessAndStart(CommandVo commandVo) { + File NULL_FILE = new File("/dev/null"); + ProcessBuilder builder = new ProcessBuilder(commandVo.getCommandList()); + builder.redirectOutput(NULL_FILE); + File consoleLog = FileUtil.createFile(commandVo.getConsoleLogPath()); + builder.redirectError(ProcessBuilder.Redirect.appendTo(consoleLog)); + Map env = builder.environment(); + JSONObject environment = commandVo.getEnvironment(); + if (MapUtils.isNotEmpty(environment)) { + for (Map.Entry entry : environment.entrySet()) { + env.put(entry.getKey(), entry.getValue().toString()); + } + } + env.put("tenant", commandVo.getTenant()); + JSONObject payload = new JSONObject(); + Process process = null; + try { + payload.put("jobId", commandVo.getJobId()); + payload.put("status", 1); + payload.put("command", commandVo); + payload.put("passThroughEnv", commandVo.getPassThroughEnv().toJSONString()); + process = builder.start(); + addProcess(process); + CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload)); + } catch (IOException e) { + logger.error(String.format("进程启动失败: %s ,error: %s", payload, e.getMessage()), e); + System.err.println("进程启动失败: " + payload + ",error:" + e.getMessage()); + } finally { + // 确保关闭流 + if (process != null) { + closeQuietly(process.getInputStream()); + closeQuietly(process.getErrorStream()); + closeQuietly(process.getOutputStream()); + } + } + } + + private void closeQuietly(java.io.Closeable c) { + try { + if (c != null) c.close(); + } catch (IOException ignore) { + } + } + + public void stop() { + running = false; + } + + public static void addUpdateTagent(CommandVo commandVo) { + blockingQueue.offer(commandVo); + } + + public static void addProcess(Process process) { + boolean result = processQueue.offer(process); + if (!result) { + logger.error("processQueue offer failed!current queue size is :{}", processQueue.size()); + } + } + + public static void removeProcess(Process process) { + boolean result = processQueue.remove(process); + if (!result) { + logger.error("processQueue remove failed!current queue size is :{}", processQueue.size()); + } + } +} -- Gitee From 6e2f08b7b2a55c236e944eeb8632533df043d3a3 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Thu, 10 Apr 2025 19:08:26 +0800 Subject: [PATCH 2/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../asynchronization/NeatLogicThread.java | 5 + .../queue/NeatLogicUniqueBlockingQueue.java | 17 ++- .../threadlocal/RequestContext.java | 134 ++++++++++++++++++ .../autoexec/ProcessWaitTask.java | 17 +-- .../autoexecrunner/common/config/Config.java | 4 +- .../core/ExecProcessCommand.java | 1 - .../filter/JsonWebTokenValidFilter.java | 4 +- .../logback/RequestUrlConverter.java | 37 +++++ .../logback/TenantConverter.java | 38 +++++ .../startup/handler/AutoexecQueueThread.java | 59 ++++---- src/main/resources/logback-spring.xml | 47 ++++-- 11 files changed, 315 insertions(+), 48 deletions(-) create mode 100644 src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java create mode 100644 src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java index d37b747..87e8f72 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/NeatLogicThread.java @@ -16,6 +16,7 @@ along with this program. If not, see .*/ package com.neatlogic.autoexecrunner.asynchronization; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException; @@ -29,6 +30,7 @@ import java.util.concurrent.Semaphore; public abstract class NeatLogicThread implements Runnable, Comparable { private static final Logger logger = LoggerFactory.getLogger(NeatLogicThread.class); protected UserContext userContext; + protected RequestContext requestContext; private final String tenantUuid; private String threadName; private boolean isUnique = false; @@ -95,6 +97,7 @@ public abstract class NeatLogicThread implements Runnable, Comparable { public T take() throws InterruptedException { Task task = blockingQueue.take(); // 阻塞式获取任务 taskMap.remove(task.getUniqueKey()); // 移除已处理任务的唯一标记 + TenantContext tenantContext = TenantContext.get(); + if (tenantContext != null) { + tenantContext.switchTenant(task.getTenantUuid()); + } else { + TenantContext.init(task.getTenantUuid()); + } return task.getT(); } private static class Task { private final T t; + private final String tenantUuid; public Task(T t) { this.t = t; + this.tenantUuid = TenantContext.get().getTenantUuid(); } public T getT() { return t; } + public String getTenantUuid() { + return tenantUuid; + } + public String getUniqueKey() { - return String.valueOf(t.hashCode()); + // 唯一标识任务的 key,可根据需求定义,例如 `tenantUuid-t.hashCode` + //System.out.println(tenantUuid + "-" + t.hashCode()); + return tenantUuid + "-" + t.hashCode(); } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java new file mode 100644 index 0000000..efcb667 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/threadlocal/RequestContext.java @@ -0,0 +1,134 @@ +/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.asynchronization.threadlocal; + +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; +import java.util.Optional; + +/** + * 保存请求信息 + */ +public class RequestContext implements Serializable { + private static final ThreadLocal instance = new ThreadLocal<>(); + private static final long serialVersionUID = -5420998728515359626L; + private String url; + private HttpServletRequest request; + private HttpServletResponse response; + //接口访问速率 + private Double apiRate; + //租户接口访问总速率 + private Double tenantRate; + //语言 + Locale locale; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public HttpServletRequest getRequest() { + return request; + } + + public void setRequest(HttpServletRequest request) { + this.request = request; + } + + public HttpServletResponse getResponse() { + return response; + } + + public void setResponse(HttpServletResponse response) { + this.response = response; + } + + public Double getApiRate() { + return apiRate; + } + + public void setApiRate(Double apiRate) { + this.apiRate = apiRate; + } + + + public Double getTenantRate() { + return tenantRate; + } + + public void setTenantRate(Double tenantRate) { + this.tenantRate = tenantRate; + } + + public Locale getLocale() { + return locale; + } + + public void setLocale(Locale locale) { + this.locale = locale; + } + + public static RequestContext init(RequestContext _requestContext) { + RequestContext context = new RequestContext(); + if (_requestContext != null) { + context.setUrl(_requestContext.getUrl()); + context.setLocale(_requestContext.getLocale()); + } + instance.set(context); + return context; + } + + public static RequestContext init(HttpServletRequest request, String url, HttpServletResponse response) { + RequestContext context = new RequestContext(request, url); + context.setResponse(response); + instance.set(context); + if (request.getCookies() != null && request.getCookies().length > 0) { + Optional languageCookie = Arrays.stream(request.getCookies()).filter(o -> Objects.equals(o.getName(), "neatlogic_language")).findFirst(); + if (languageCookie.isPresent()) { + context.setLocale(new Locale(languageCookie.get().getValue())); + } else { + context.setLocale(Locale.getDefault()); + } + } + return context; + } + + private RequestContext() { + + } + + private RequestContext(HttpServletRequest request, String url) { + this.url = url; + this.request = request; + } + + public static RequestContext get() { + return instance.get(); + } + + public void release() { + instance.remove(); + } + +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java index 8068c16..c2b0f81 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java +++ b/src/main/java/com/neatlogic/autoexecrunner/autoexec/ProcessWaitTask.java @@ -4,8 +4,6 @@ import com.alibaba.fastjson.JSONObject; import com.neatlogic.autoexecrunner.asynchronization.NeatLogicThread; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,32 +15,35 @@ public class ProcessWaitTask extends NeatLogicThread { private final Process process; private final CommandVo commandVo; private final JSONObject payload; + private final String jobName; - public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload) { - super("THREAD-AUTOEXEC-WAIT-" + commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)); + public ProcessWaitTask(Process process, CommandVo commandVo, JSONObject payload, String jobName) { + super("THREAD-AUTOEXEC-WAIT-" + jobName); this.process = process; this.commandVo = commandVo; this.payload = payload; + this.jobName = jobName; } @Override protected void execute() { - + Long pid = null; try { int exitCode = process.waitFor(); int exitStatus = process.exitValue(); commandVo.setExitValue(exitStatus); - logger.debug("进程[{}] 退出,状态码: {}", getPid(process), exitCode); + pid = getPid(process); + logger.debug("process[{}] finished,exitCode: {}", pid, exitCode); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - logger.error(String.format("等待被中断: 入参:%s, errorMsg:%s", payload, e.getMessage()), e); + logger.error(String.format("thread interrupt: param:%s, errorMsg:%s", payload, e.getMessage()), e); } finally { // 确保关闭流 closeQuietly(process.getInputStream()); closeQuietly(process.getErrorStream()); closeQuietly(process.getOutputStream()); - AutoexecQueueThread.removeProcess(process); + AutoexecQueueThread.removeProcess(process, jobName, pid); } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java index bac3c8c..dd660c9 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java +++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java @@ -248,9 +248,9 @@ public class Config { UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000")); - MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "2000")); + MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "1000")); - MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.size", "10")); + MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.count", "20")); } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java index f4b3350..14acccf 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java +++ b/src/main/java/com/neatlogic/autoexecrunner/core/ExecProcessCommand.java @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; * @author lvzk * @since 2021/4/21 17:12 **/ -@Deprecated public class ExecProcessCommand implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ExecProcessCommand.class); private final ProcessBuilder builder; diff --git a/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java b/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java index d942f06..917777b 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java +++ b/src/main/java/com/neatlogic/autoexecrunner/filter/JsonWebTokenValidFilter.java @@ -2,6 +2,7 @@ package com.neatlogic.autoexecrunner.filter; import com.alibaba.fastjson.JSONObject; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; import com.neatlogic.autoexecrunner.common.config.Config; @@ -41,7 +42,8 @@ public class JsonWebTokenValidFilter extends OncePerRequestFilter { UserVo userVo = null; JSONObject redirectObj = new JSONObject(); String authType = null; - + //初始化request上下文 + RequestContext.init(request, request.getRequestURI(), response); //判断租户 try { String tenant = request.getHeader("Tenant"); diff --git a/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java b/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java new file mode 100644 index 0000000..cb3cfb1 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/logback/RequestUrlConverter.java @@ -0,0 +1,37 @@ +/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.logback; + +import ch.qos.logback.classic.pattern.ClassicConverter; +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.RequestContext; + +public class RequestUrlConverter extends ClassicConverter { + /** + * The convert method is responsible for extracting data from the event and + * storing it for later use by the write method. + * + * @param event + */ + @Override + public String convert(ILoggingEvent event) { + RequestContext requestContext = RequestContext.get(); + if (requestContext != null) { + return requestContext.getUrl(); + } + return ""; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java b/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java new file mode 100644 index 0000000..b20ceb4 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/logback/TenantConverter.java @@ -0,0 +1,38 @@ +/*Copyright (C) $today.year 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ + +package com.neatlogic.autoexecrunner.logback; + + +import ch.qos.logback.classic.pattern.ClassicConverter; +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; + +public class TenantConverter extends ClassicConverter { + /** + * The convert method is responsible for extracting data from the event and + * storing it for later use by the write method. + * + * @param event + */ + @Override + public String convert(ILoggingEvent event) { + TenantContext tenantContext = TenantContext.get(); + if (tenantContext != null) { + return tenantContext.getTenantUuid(); + } + return ""; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 141674c..a98bfc2 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -44,29 +45,28 @@ public class AutoexecQueueThread implements IStartUp { while (running) { if (workerThread == null || !workerThread.isAlive()) { - System.out.println("工作线程未运行,准备启动..."); - logger.debug("工作线程未运行,准备启动..."); + System.out.println("autoexec job thread is down,ready to start..."); + logger.debug("autoexec job thread is down,ready to start..."); workerThread = new Thread(() -> { - Thread.currentThread().setName(""); + Thread.currentThread().setName("AutoexecQueueThread"); + System.out.println("autoexec job thread start succeed!"); while (running) { CommandVo commandVo = null; try { // 你的业务逻辑 - if (processQueue.size() <= Config.MAX_PROCESS_QUEUE_SIZE()) { + if (processQueue.size() <= Config.MAX_PROCESS_EXECUTE_COUNT()) { commandVo = blockingQueue.take(); - logger.debug("作业{} 即将运行...", commandVo.getTenant() + "-" + commandVo.getJobId()); + logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.MAX_PROCESS_EXECUTE_COUNT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY))); createSubProcessAndStart(commandVo); } else { - logger.debug("作业进程最大数量:{}, 需等待运行中的进程结束后,才继续创建队列内的作业进程!", Config.MAX_PROCESS_QUEUE_SIZE()); + logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.MAX_PROCESS_EXECUTE_COUNT()); } Thread.sleep(2000); } catch (InterruptedException e) { - System.out.printf("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()); - logger.error(String.format("创建自动化作业子进程的线程被中断...入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); + logger.error(String.format("autoexec job thread is interrupted...params:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); break; } catch (Exception e) { - System.out.printf("创建自动化作业子进程失败:入参:%s ,errorMsg:%s%n", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()); - logger.error(String.format("创建自动化作业子进程失败:入参:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); + logger.error(String.format("create sub process failed:params:%s ,errorMsg:%s", commandVo != null ? JSON.toJSONString(commandVo) : StringUtils.EMPTY, e.getMessage()), e); break; // 退出由外层 watchdog 负责重启 } } @@ -104,33 +104,34 @@ public class AutoexecQueueThread implements IStartUp { } env.put("tenant", commandVo.getTenant()); JSONObject payload = new JSONObject(); - Process process = null; + Process process; try { payload.put("jobId", commandVo.getJobId()); payload.put("status", 1); payload.put("command", commandVo); payload.put("passThroughEnv", commandVo.getPassThroughEnv().toJSONString()); process = builder.start(); + String jobName = (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY)); addProcess(process); - CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload)); + logger.debug("autoexec job sub process {} is running, pid {},added to processQueue,now processQueue size is {}", jobName, getPid(process), processQueue.size()); + CachedThreadPool.execute(new ProcessWaitTask(process, commandVo, payload, jobName)); } catch (IOException e) { - logger.error(String.format("进程启动失败: %s ,error: %s", payload, e.getMessage()), e); - System.err.println("进程启动失败: " + payload + ",error:" + e.getMessage()); - } finally { - // 确保关闭流 - if (process != null) { - closeQuietly(process.getInputStream()); - closeQuietly(process.getErrorStream()); - closeQuietly(process.getOutputStream()); - } + logger.error(String.format("autoexec job sub process start failed: %s ,error: %s", payload, e.getMessage()), e); } } - private void closeQuietly(java.io.Closeable c) { + // 兼容不同JDK版本的PID获取 + private long getPid(Process p) { try { - if (c != null) c.close(); - } catch (IOException ignore) { + if (p.getClass().getName().contains("UNIXProcess")) { + Field pidField = p.getClass().getDeclaredField("pid"); + pidField.setAccessible(true); + return pidField.getLong(p); + } + } catch (Exception e) { + // 忽略异常 } + return -1; // 未知PID } public void stop() { @@ -148,10 +149,16 @@ public class AutoexecQueueThread implements IStartUp { } } - public static void removeProcess(Process process) { + public static void removeProcess(Process process, String jobName, Long pid) { boolean result = processQueue.remove(process); if (!result) { - logger.error("processQueue remove failed!current queue size is :{}", processQueue.size()); + logger.error("autoexec process:{} (pid:{})finished. processQueue remove failed!current queue size is :{}", jobName, pid, blockingQueue.size()); + } else { + logger.debug("autoexec process:{} (pid:{})finished. processQueue remove succeed!current queue size is :{}", jobName, pid, blockingQueue.size()); } } + + public static Integer getProcessQueueSize() { + return processQueue.size(); + } } diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index e18ecc8..aefed5d 100755 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -1,10 +1,12 @@ + + ${LOG_HOME}/debug.log - debug.log.%i + ${LOG_HOME}/debug.log.%i 1 5 @@ -18,14 +20,14 @@ 100MB - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n ${LOG_HOME}/info.log - info.log.%i + ${LOG_HOME}/info.log.%i 1 5 @@ -39,7 +41,7 @@ 100MB - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n @@ -47,7 +49,7 @@ class="ch.qos.logback.core.rolling.RollingFileAppender"> ${LOG_HOME}/warn.log - warn.log.%i + ${LOG_HOME}/warn.log.%i 1 5 @@ -61,14 +63,14 @@ 100MB - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n ${LOG_HOME}/error.log - error.log.%i + ${LOG_HOME}/error.log.%i 1 5 @@ -82,7 +84,27 @@ 100MB - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line]- %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n + + + + + ${LOG_HOME}/process.log + + ${LOG_HOME}/process.%i + 1 + 5 + + + DEBUG + ACCEPT + DENY + + + 100MB + + + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n @@ -110,7 +132,7 @@ - [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] - %msg%n + [%-5level]%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36}[%line] [%tenant] %requestUrl- %msg%n UTF-8 false @@ -137,5 +159,12 @@ + + + + + + + \ No newline at end of file -- Gitee From dc40d3a8746ccef8b642c8a27cf0a3439c33382a Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Fri, 11 Apr 2025 20:57:23 +0800 Subject: [PATCH 3/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/GetJobQueueStatusApi.java | 68 +++++++++++++++++++ .../autoexecrunner/api/job/JobExecApi.java | 2 +- .../queue/NeatLogicUniqueBlockingQueue.java | 17 ++++- .../autoexecrunner/dto/CommandVo.java | 14 ++-- .../startup/handler/AutoexecQueueThread.java | 8 ++- 5 files changed, 101 insertions(+), 8 deletions(-) create mode 100755 src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java new file mode 100755 index 0000000..9898c0a --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java @@ -0,0 +1,68 @@ +/*Copyright (C) 2024 深圳极向量科技有限公司 All Rights Reserved. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see .*/ +package com.neatlogic.autoexecrunner.api.job; + +import com.alibaba.fastjson.JSONObject; +import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue; +import com.neatlogic.autoexecrunner.constvalue.ApiParamType; +import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.restful.annotation.Input; +import com.neatlogic.autoexecrunner.restful.annotation.Param; +import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + + +@Component +public class GetJobQueueStatusApi extends PrivateApiComponentBase { + @Override + public String getName() { + return "获取作业排队状态"; + } + + @Input({ + @Param(name = "jobId", type = ApiParamType.LONG, isRequired = true, desc = "作业id"), + @Param(name = "groupSort", type = ApiParamType.LONG, desc = "阶段组id"), + @Param(name = "phaseName", type = ApiParamType.STRING, desc = "阶段名"), + }) + @Override + public Object myDoService(JSONObject jsonObj) throws Exception { + String jobId = jsonObj.getString("jobId"); + Integer groupSort = jsonObj.getInteger("groupSort"); + NeatLogicUniqueBlockingQueue blockingQueue = AutoexecQueueThread.getBlockingQueue(); + List list = blockingQueue.getQueue(); + JSONObject result = new JSONObject(); + for (int i = 0; i < list.size(); i++) { + CommandVo commandVo = list.get(i); + JSONObject commandJson = new JSONObject(); + commandJson.put("fcd",commandVo.getFcd().getTime()); + commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','"))); + if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) { + result.put(String.valueOf(i + 1), commandJson); + } + } + result.put("count", list.size()); + return result; + } + + @Override + public String getToken() { + return "/job/queue/status/get"; + } +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index 6ed4b7a..ad2ee79 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -82,7 +82,7 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - AutoexecQueueThread.addUpdateTagent(commandVo); + AutoexecQueueThread.addCommand(commandVo); return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java index dea5fb3..e34029b 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java @@ -19,12 +19,16 @@ package com.neatlogic.autoexecrunner.asynchronization.queue; import com.alibaba.fastjson.JSON; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.TenantContext; +import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; public class NeatLogicUniqueBlockingQueue { private static final Logger logger = LoggerFactory.getLogger(NeatLogicUniqueBlockingQueue.class); @@ -69,7 +73,7 @@ public class NeatLogicUniqueBlockingQueue { return task.getT(); } - private static class Task { + public static class Task { private final T t; private final String tenantUuid; @@ -93,10 +97,19 @@ public class NeatLogicUniqueBlockingQueue { } } - public int size(){ + public int size() { return blockingQueue.size(); } + public List getQueue() { + List list = new ArrayList<>(); + List> taskList = new ArrayList<>(blockingQueue); + if (CollectionUtils.isNotEmpty(taskList)) { + list = taskList.stream().map(Task::getT).collect(Collectors.toList()); + } + return list; + } + // public static void main(String[] args) throws InterruptedException { // NeatLogicUniqueBlockingQueue queue = new NeatLogicUniqueBlockingQueue<>(1); // diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java index 34d6c91..d3e3cb7 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java @@ -9,10 +9,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.io.File; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @author lvzk @@ -33,6 +30,7 @@ public class CommandVo { private List jobGroupIdList;//需要执行的组 private JSONArray jobPhaseNodeSqlList; private JSONObject environment;//设置环境变量 + private Date fcd; private String consoleLogPath; @@ -219,4 +217,12 @@ public class CommandVo { this.consoleLogPath = Config.AUTOEXEC_HOME() + File.separator + JobUtil.getJobPath(getJobId(), new StringBuilder()) + File.separator + "log" + File.separator + "console.txt"; return consoleLogPath; } + + public Date getFcd() { + return fcd; + } + + public void setFcd(Date fcd) { + this.fcd = fcd; + } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index a98bfc2..1685a99 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.util.Date; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -138,7 +139,8 @@ public class AutoexecQueueThread implements IStartUp { running = false; } - public static void addUpdateTagent(CommandVo commandVo) { + public static void addCommand(CommandVo commandVo) { + commandVo.setFcd(new Date()); blockingQueue.offer(commandVo); } @@ -161,4 +163,8 @@ public class AutoexecQueueThread implements IStartUp { public static Integer getProcessQueueSize() { return processQueue.size(); } + + public static NeatLogicUniqueBlockingQueue getBlockingQueue(){ + return blockingQueue; + } } -- Gitee From 791eb29c2f77d86db67eebdcfe7902182af246f0 Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Tue, 15 Apr 2025 17:15:49 +0800 Subject: [PATCH 4/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...usApi.java => GetJobWaitingDetailApi.java} | 13 +++------- .../autoexecrunner/api/job/JobExecApi.java | 5 +++- .../exception/job/JobQueueFullException.java | 14 ++++++++++ .../startup/handler/AutoexecQueueThread.java | 26 +++++++++++++------ 4 files changed, 40 insertions(+), 18 deletions(-) rename src/main/java/com/neatlogic/autoexecrunner/api/job/{GetJobQueueStatusApi.java => GetJobWaitingDetailApi.java} (80%) create mode 100755 src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java similarity index 80% rename from src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java rename to src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java index 9898c0a..498fe1e 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobQueueStatusApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java @@ -15,7 +15,6 @@ along with this program. If not, see .*/ package com.neatlogic.autoexecrunner.api.job; import com.alibaba.fastjson.JSONObject; -import com.neatlogic.autoexecrunner.asynchronization.queue.NeatLogicUniqueBlockingQueue; import com.neatlogic.autoexecrunner.constvalue.ApiParamType; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.restful.annotation.Input; @@ -25,12 +24,11 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import org.springframework.stereotype.Component; import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; @Component -public class GetJobQueueStatusApi extends PrivateApiComponentBase { +public class GetJobWaitingDetailApi extends PrivateApiComponentBase { @Override public String getName() { return "获取作业排队状态"; @@ -45,17 +43,14 @@ public class GetJobQueueStatusApi extends PrivateApiComponentBase { public Object myDoService(JSONObject jsonObj) throws Exception { String jobId = jsonObj.getString("jobId"); Integer groupSort = jsonObj.getInteger("groupSort"); - NeatLogicUniqueBlockingQueue blockingQueue = AutoexecQueueThread.getBlockingQueue(); - List list = blockingQueue.getQueue(); + List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort(jobId,groupSort); JSONObject result = new JSONObject(); for (int i = 0; i < list.size(); i++) { CommandVo commandVo = list.get(i); JSONObject commandJson = new JSONObject(); commandJson.put("fcd",commandVo.getFcd().getTime()); commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','"))); - if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) { - result.put(String.valueOf(i + 1), commandJson); - } + result.put(String.valueOf(i + 1), commandJson); } result.put("count", list.size()); return result; @@ -63,6 +58,6 @@ public class GetJobQueueStatusApi extends PrivateApiComponentBase { @Override public String getToken() { - return "/job/queue/status/get"; + return "/job/waiting/detail/get"; } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index ad2ee79..c6596f5 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -18,6 +18,7 @@ import com.alibaba.fastjson.JSONObject; import com.neatlogic.autoexecrunner.asynchronization.threadlocal.UserContext; import com.neatlogic.autoexecrunner.constvalue.JobAction; import com.neatlogic.autoexecrunner.dto.CommandVo; +import com.neatlogic.autoexecrunner.exception.job.JobQueueFullException; import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.util.FileUtil; @@ -82,7 +83,9 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - AutoexecQueueThread.addCommand(commandVo); + if(!AutoexecQueueThread.addCommand(commandVo)){ + throw new JobQueueFullException(); + } return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java new file mode 100755 index 0000000..8b25d14 --- /dev/null +++ b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java @@ -0,0 +1,14 @@ +package com.neatlogic.autoexecrunner.exception.job; + + +import com.neatlogic.autoexecrunner.common.config.Config; +import com.neatlogic.autoexecrunner.exception.core.ApiRuntimeException; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; + +public class JobQueueFullException extends ApiRuntimeException { + + public JobQueueFullException() { + super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.MAX_PROCESS_QUEUE_SIZE() + "),无法执行"); + } + +} diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 1685a99..3b84ec8 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -17,15 +17,14 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; -import java.util.Date; -import java.util.Map; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class AutoexecQueueThread implements IStartUp { - private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE() + 5); + private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_EXECUTE_COUNT() + 5); private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class); - private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(50000); + private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE()); private volatile boolean running = true; @Override @@ -139,9 +138,9 @@ public class AutoexecQueueThread implements IStartUp { running = false; } - public static void addCommand(CommandVo commandVo) { + public static boolean addCommand(CommandVo commandVo) { commandVo.setFcd(new Date()); - blockingQueue.offer(commandVo); + return blockingQueue.offer(commandVo); } public static void addProcess(Process process) { @@ -164,7 +163,18 @@ public class AutoexecQueueThread implements IStartUp { return processQueue.size(); } - public static NeatLogicUniqueBlockingQueue getBlockingQueue(){ - return blockingQueue; + public static Integer getBlockingQueueSize() { + return blockingQueue.size(); + } + + public static List getBlockingQueueByJobIdAndGroupSort(String jobId, Integer groupSort) { + List list = blockingQueue.getQueue(); + List jobCommandList = new ArrayList<>(); + for (CommandVo commandVo : list) { + if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) { + jobCommandList.add(commandVo); + } + } + return jobCommandList; } } -- Gitee From 86e147ec294eaf30014e0ad59442cb45e676e03b Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Wed, 16 Apr 2025 19:40:50 +0800 Subject: [PATCH 5/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/job/GetJobWaitingDetailApi.java | 13 +++++-- .../autoexecrunner/api/job/JobAbortApi.java | 10 +++-- .../autoexecrunner/api/job/JobExecApi.java | 12 ++++-- .../queue/NeatLogicUniqueBlockingQueue.java | 26 +++++++++++-- .../autoexecrunner/dto/CommandVo.java | 39 ++++++++++++++++--- .../startup/handler/AutoexecQueueThread.java | 24 +++++++----- 6 files changed, 97 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java index 498fe1e..ab7ee80 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/GetJobWaitingDetailApi.java @@ -24,6 +24,7 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; @@ -43,16 +44,22 @@ public class GetJobWaitingDetailApi extends PrivateApiComponentBase { public Object myDoService(JSONObject jsonObj) throws Exception { String jobId = jsonObj.getString("jobId"); Integer groupSort = jsonObj.getInteger("groupSort"); - List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort(jobId,groupSort); + List list = AutoexecQueueThread.getBlockingQueueByJobIdAndGroupSort(); JSONObject result = new JSONObject(); for (int i = 0; i < list.size(); i++) { CommandVo commandVo = list.get(i); JSONObject commandJson = new JSONObject(); + commandJson.put("groupSortList",commandVo.getJobGroupSortList()); + commandJson.put("nodeSqlList",commandVo.getJobPhaseNodeSqlList()); + commandJson.put("phaseNameList",commandVo.getJobPhaseNameList()); + commandJson.put("resourceIdList",commandVo.getJobPhaseResourceIdList()); commandJson.put("fcd",commandVo.getFcd().getTime()); commandJson.put("command",commandVo.getCommandList().stream().map(Object::toString).collect(Collectors.joining("','"))); - result.put(String.valueOf(i + 1), commandJson); + if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupSortList().contains(groupSort))) { + result.put(String.valueOf(i + 1), commandJson); + } } - result.put("count", list.size()); + result.put("count", AutoexecQueueThread.getBlockingQueueSize()); return result; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java index 8c46885..b31930f 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobAbortApi.java @@ -20,6 +20,7 @@ import com.neatlogic.autoexecrunner.constvalue.JobAction; import com.neatlogic.autoexecrunner.core.ExecProcessCommand; import com.neatlogic.autoexecrunner.dto.CommandVo; import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentBase; +import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.threadpool.CommonThreadPool; import org.springframework.stereotype.Component; @@ -45,13 +46,16 @@ public class JobAbortApi extends PrivateApiComponentBase { //set command List commandList = Arrays.asList("autoexec", "--jobid", commandVo.getJobId(), "--execuser", UserContext.get().getUserUuid(), "--abort"); commandList = new ArrayList<>(commandList); - if(commandVo.getPassThroughEnv() != null){ + if (commandVo.getPassThroughEnv() != null) { commandList.add("--passthroughenv"); commandList.add(commandVo.getPassThroughEnv().toString()); } commandVo.setCommandList(commandList); - ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); - CommonThreadPool.execute(processCommand); + //队列里不存在才执行abort命令 + if (!AutoexecQueueThread.removeCommand(commandVo)) { + ExecProcessCommand processCommand = new ExecProcessCommand(commandVo); + CommonThreadPool.execute(processCommand); + } return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java index c6596f5..d9c534f 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java +++ b/src/main/java/com/neatlogic/autoexecrunner/api/job/JobExecApi.java @@ -23,6 +23,8 @@ import com.neatlogic.autoexecrunner.restful.core.privateapi.PrivateApiComponentB import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; import com.neatlogic.autoexecrunner.util.FileUtil; import org.apache.commons.collections4.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -36,6 +38,8 @@ import java.util.stream.Collectors; **/ @Component public class JobExecApi extends PrivateApiComponentBase { + private static final Logger logger = LoggerFactory.getLogger(JobExecApi.class); + @Override public String getName() { return "创建执行作业剧本进程"; @@ -65,9 +69,9 @@ public class JobExecApi extends PrivateApiComponentBase { commandList.add("--passthroughenv"); commandList.add(commandVo.getPassThroughEnv().toString()); } - if (CollectionUtils.isNotEmpty(commandVo.getJobGroupIdList())) { + if (CollectionUtils.isNotEmpty(commandVo.getJobGroupSortList())) { commandList.add("--phasegroups"); - commandList.add(commandVo.getJobGroupIdList().stream().map(Object::toString).collect(Collectors.joining("','"))); + commandList.add(commandVo.getJobGroupSortList().stream().map(Object::toString).collect(Collectors.joining("','"))); } if (CollectionUtils.isNotEmpty(commandVo.getJobPhaseNameList())) { commandList.add("--phases"); @@ -83,8 +87,10 @@ public class JobExecApi extends PrivateApiComponentBase { } commandList.add("--reuseconslog"); commandVo.setCommandList(commandList); - if(!AutoexecQueueThread.addCommand(commandVo)){ + if (AutoexecQueueThread.addCommand(commandVo) == -1) { throw new JobQueueFullException(); + } else if (AutoexecQueueThread.addCommand(commandVo) == 0) { + logger.debug("队列里已存在相同的执行命令:{}", String.join(",", commandList)); } return null; } diff --git a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java index e34029b..1af28a5 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java +++ b/src/main/java/com/neatlogic/autoexecrunner/asynchronization/queue/NeatLogicUniqueBlockingQueue.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; import java.util.stream.Collectors; public class NeatLogicUniqueBlockingQueue { @@ -40,7 +41,11 @@ public class NeatLogicUniqueBlockingQueue { this.taskMap = new ConcurrentHashMap<>(); } - public boolean offer(T t) { + /** + * 添加队列成员 + * 返回-1:队列已满,1:添加成功,0:重复添加 + */ + public int offer(T t) { Task task = new Task<>(t); // 保证任务唯一性 if (taskMap.putIfAbsent(task.getUniqueKey(), Boolean.TRUE) == null) { @@ -51,14 +56,29 @@ public class NeatLogicUniqueBlockingQueue { // 如果队列已满,移除任务标记 taskMap.remove(task.getUniqueKey()); logger.error("Queue is full!"); + return -1; } - return added; + return 1; } else { if (t != null) { logger.debug("NeatLogicUniqueBlockingQueue repeat: {}", JSON.toJSONString(t)); } + return 0; } - return false; // 已存在任务,直接返回 false + } + + public boolean remove(Predicate> condition) { + for (Task task : blockingQueue) { + if (condition.test(task)) { + boolean removed = blockingQueue.remove(task); + if (removed) { + taskMap.remove(task.getUniqueKey()); + logger.debug("Removed task: {}", JSON.toJSONString(task)); + return true; + } + } + } + return false; } public T take() throws InterruptedException { diff --git a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java index d3e3cb7..db7a3ca 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java +++ b/src/main/java/com/neatlogic/autoexecrunner/dto/CommandVo.java @@ -27,7 +27,7 @@ public class CommandVo { private JSONObject passThroughEnv;//web端传到runner贯穿autoexec 回调web端会携带该变量 private List jobPhaseNameList;//需要执行的phaseNameList private List jobPhaseResourceIdList;//需要执行的resourceIdList - private List jobGroupIdList;//需要执行的组 + private List jobGroupSortList;//需要执行的组 private JSONArray jobPhaseNodeSqlList; private JSONObject environment;//设置环境变量 private Date fcd; @@ -71,9 +71,9 @@ public class CommandVo { if (CollectionUtils.isNotEmpty(jobPhaseResourceIdArray)) { this.jobPhaseResourceIdList = jobPhaseResourceIdArray.toJavaList(Long.class); } - JSONArray jobGroupIdArray = jsonObj.getJSONArray("jobGroupIdList"); + JSONArray jobGroupIdArray = jsonObj.getJSONArray("jobGroupSortList"); if (CollectionUtils.isNotEmpty(jobGroupIdArray)) { - this.jobGroupIdList = jobGroupIdArray.toJavaList(Integer.class); + this.jobGroupSortList = jobGroupIdArray.toJavaList(Integer.class); } JSONArray jobPhaseNodeSqlList = jsonObj.getJSONArray("jobPhaseNodeSqlList"); @@ -187,8 +187,8 @@ public class CommandVo { return jobPhaseResourceIdList; } - public List getJobGroupIdList() { - return jobGroupIdList; + public List getJobGroupSortList() { + return jobGroupSortList; } public JSONArray getJobPhaseNodeSqlList() { @@ -225,4 +225,33 @@ public class CommandVo { public void setFcd(Date fcd) { this.fcd = fcd; } + + private String getFilteredCommandString() { + if (commandList == null) return ""; + List filtered = new ArrayList<>(); + Iterator iterator = commandList.iterator(); + while (iterator.hasNext()) { + String item = iterator.next(); + if ("--execid".equals(item)) { + // 跳过 "--execid" 和它后面的那个参数 + if (iterator.hasNext()) iterator.next(); + continue; + } + filtered.add(item); + } + return String.join(",", filtered); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CommandVo)) return false; + CommandVo that = (CommandVo) o; + return Objects.equals(getFilteredCommandString(), that.getFilteredCommandString()); + } + + @Override + public int hashCode() { + return getFilteredCommandString().hashCode(); + } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 3b84ec8..7c34974 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -138,11 +138,22 @@ public class AutoexecQueueThread implements IStartUp { running = false; } - public static boolean addCommand(CommandVo commandVo) { + /** + * 进入队列 + * 返回-1:队列已满,1:添加成功,0:重复添加 + */ + public static int addCommand(CommandVo commandVo) { commandVo.setFcd(new Date()); return blockingQueue.offer(commandVo); } + /** + * 删除命令 + */ + public static boolean removeCommand(CommandVo commandVo) { + return blockingQueue.remove(task -> task.getT().getJobId().equals(commandVo.getJobId())); + } + public static void addProcess(Process process) { boolean result = processQueue.offer(process); if (!result) { @@ -167,14 +178,7 @@ public class AutoexecQueueThread implements IStartUp { return blockingQueue.size(); } - public static List getBlockingQueueByJobIdAndGroupSort(String jobId, Integer groupSort) { - List list = blockingQueue.getQueue(); - List jobCommandList = new ArrayList<>(); - for (CommandVo commandVo : list) { - if (Objects.equals(commandVo.getJobId(), jobId) && (groupSort == null || commandVo.getJobGroupIdList().contains(groupSort))) { - jobCommandList.add(commandVo); - } - } - return jobCommandList; + public static List getBlockingQueueByJobIdAndGroupSort() { + return blockingQueue.getQueue(); } } -- Gitee From 82830847eeb1eb3f6d30fac791124cfd2393308f Mon Sep 17 00:00:00 2001 From: lvzk <897706680@qq.com> Date: Thu, 17 Apr 2025 11:48:05 +0800 Subject: [PATCH 6/6] =?UTF-8?q?[=E5=8A=9F=E8=83=BD]=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E4=BD=9C=E4=B8=9A=E5=A2=9E=E5=8A=A0=E6=95=B0=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=88=B6=E6=8E=92=E9=98=9F=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../autoexecrunner/common/config/Config.java | 16 ++++++++-------- .../exception/job/JobQueueFullException.java | 2 +- .../startup/handler/AutoexecQueueThread.java | 10 +++++----- src/main/resources/application.properties | 10 +++++++++- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java index dd660c9..77cfc7b 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java +++ b/src/main/java/com/neatlogic/autoexecrunner/common/config/Config.java @@ -37,8 +37,8 @@ public class Config { private static String DATA_HOME;//文件根目录 private static String DEPLOY_HOME;//发布目录 private static String GITLAB_PASSWORD;// gitlab private_token - private static Integer MAX_PROCESS_QUEUE_SIZE;//最大自动化作业队列数,多余的则丢弃 - private static Integer MAX_PROCESS_EXECUTE_COUNT;//最大执行作业数,超过的则进入队列 + private static Integer SUBPROCESS_COMMAND_QUEUE_MAX_SIZE;//自动化作业命令等待队列的最大容量,超过此数量的新命令将被拒绝或丢弃 + private static Integer SUBPROCESS_EXECUTION_MAX_CONCURRENT;//自动化作业最大并发子进程数,超过此数量的命令将进入等待队列 //neatlogic private static String NEATLOGIC_ROOT; @@ -170,11 +170,11 @@ public class Config { return AUTOEXEC_TOKEN; } - public static Integer MAX_PROCESS_QUEUE_SIZE() { - return MAX_PROCESS_QUEUE_SIZE; + public static Integer SUBPROCESS_COMMAND_QUEUE_MAX_SIZE() { + return SUBPROCESS_COMMAND_QUEUE_MAX_SIZE; } - public static Integer MAX_PROCESS_EXECUTE_COUNT() { - return MAX_PROCESS_EXECUTE_COUNT; + public static Integer SUBPROCESS_EXECUTION_MAX_CONCURRENT() { + return SUBPROCESS_EXECUTION_MAX_CONCURRENT; } @PostConstruct @@ -248,9 +248,9 @@ public class Config { UPDATE_RUNNER_STATUS_PERIOD = Integer.parseInt(prop.getProperty("update.runner.status.period", "1800000")); - MAX_PROCESS_QUEUE_SIZE = Integer.parseInt(prop.getProperty("max.process.queue.size", "1000")); + SUBPROCESS_COMMAND_QUEUE_MAX_SIZE = Integer.parseInt(prop.getProperty("subprocess.command.queue.max-size", "1000")); - MAX_PROCESS_EXECUTE_COUNT = Integer.parseInt(prop.getProperty("max.process.execute.count", "20")); + SUBPROCESS_EXECUTION_MAX_CONCURRENT = Integer.parseInt(prop.getProperty("subprocess.execution.max-concurrent", "20")); } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java index 8b25d14..a72dae6 100755 --- a/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java +++ b/src/main/java/com/neatlogic/autoexecrunner/exception/job/JobQueueFullException.java @@ -8,7 +8,7 @@ import com.neatlogic.autoexecrunner.startup.handler.AutoexecQueueThread; public class JobQueueFullException extends ApiRuntimeException { public JobQueueFullException() { - super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.MAX_PROCESS_QUEUE_SIZE() + "),无法执行"); + super("作业队列已满(当前队列数" + AutoexecQueueThread.getBlockingQueueSize() + ">= 配置队列最大数" + Config.SUBPROCESS_COMMAND_QUEUE_MAX_SIZE() + "),无法执行"); } } diff --git a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java index 7c34974..9f1a4e0 100644 --- a/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java +++ b/src/main/java/com/neatlogic/autoexecrunner/startup/handler/AutoexecQueueThread.java @@ -22,9 +22,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class AutoexecQueueThread implements IStartUp { - private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.MAX_PROCESS_EXECUTE_COUNT() + 5); + private static final BlockingQueue processQueue = new LinkedBlockingQueue<>(Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT() + 5); private static final Logger logger = LoggerFactory.getLogger(AutoexecQueueThread.class); - private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.MAX_PROCESS_QUEUE_SIZE()); + private static final NeatLogicUniqueBlockingQueue blockingQueue = new NeatLogicUniqueBlockingQueue<>(Config.SUBPROCESS_COMMAND_QUEUE_MAX_SIZE()); private volatile boolean running = true; @Override @@ -54,12 +54,12 @@ public class AutoexecQueueThread implements IStartUp { CommandVo commandVo = null; try { // 你的业务逻辑 - if (processQueue.size() <= Config.MAX_PROCESS_EXECUTE_COUNT()) { + if (processQueue.size() <= Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT()) { commandVo = blockingQueue.take(); - logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.MAX_PROCESS_EXECUTE_COUNT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY))); + logger.debug("current autoexec sub process count:{} <= {},autoexec job:{} will create...", processQueue.size(), Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT(), (commandVo.getTenant() + "-" + commandVo.getJobId() + "-" + (MapUtils.isNotEmpty(commandVo.getPassThroughEnv()) ? commandVo.getPassThroughEnv().getString("groupSort") : StringUtils.EMPTY))); createSubProcessAndStart(commandVo); } else { - logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.MAX_PROCESS_EXECUTE_COUNT()); + logger.debug("autoexec sub process limit count :{}, need to wait process finish,then keep on creating sub process!", Config.SUBPROCESS_EXECUTION_MAX_CONCURRENT()); } Thread.sleep(2000); } catch (InterruptedException e) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2606178..81f7bfe 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -14,7 +14,6 @@ spring.servlet.multipart.max-file-size=100MB #NEATLOGIC WEB neatlogic.root=http://127.0.0.1:8080/neatlogic -#认证 连接时校验 #RUNNER @@ -32,3 +31,12 @@ autoexec.home=/Users/cocokong/IdeaProjects/autoexec/data/job deploy.home=/app/autoexec/data/verdata data.home=${runner.home}/data file.mimetype.text.plain=sql text c cc c++ cpp h pl py txt java el gitignore js css properties jsp yml json md vue sh config htm html xml classpath project pm less scss + + +# Maximum number of concurrent subprocesses for autoexec jobs. +# Commands exceeding this limit will be placed in the waiting queue. +subprocess.execution.max-concurrent=20 + +# Maximum capacity of the command waiting queue for autoexec jobs. +# New commands exceeding this limit will be rejected or discarded. +subprocess.command.queue.max-size=1000 -- Gitee