diff --git a/dsms-engine-application/src/main/java/com/dsms/modules/node/service/impl/NodeServiceImpl.java b/dsms-engine-application/src/main/java/com/dsms/modules/node/service/impl/NodeServiceImpl.java index ebe455076b9586fd4c461eebdc6ded3e9fba9624..ac0117738d73c55001889f8a731e75cfcb848a21 100644 --- a/dsms-engine-application/src/main/java/com/dsms/modules/node/service/impl/NodeServiceImpl.java +++ b/dsms-engine-application/src/main/java/com/dsms/modules/node/service/impl/NodeServiceImpl.java @@ -16,10 +16,12 @@ package com.dsms.modules.node.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dsms.common.constant.*; import com.dsms.common.exception.DsmsEngineException; import com.dsms.common.remotecall.model.RemoteResponse; import com.dsms.common.taskmanager.TaskException; +import com.dsms.common.taskmanager.TaskContext; import com.dsms.common.taskmanager.model.AsyncTask; import com.dsms.common.taskmanager.model.Task; import com.dsms.common.taskmanager.service.ITaskService; @@ -50,6 +52,9 @@ public class NodeServiceImpl implements INodeService { @Autowired private ITaskService taskService; + @Autowired + private TaskContext taskContext; + @Override public List list() { @@ -135,6 +140,10 @@ public class NodeServiceImpl implements INodeService { @Override public boolean addOsd(String nodeName, String devicePath) { + + if (!taskContext.validateTask(TaskTypeEnum.ADD_OSD, nodeName, devicePath)) { + throw new DsmsEngineException(ResultCode.NODE_ADD_OSD_TASK_EXIST); + } //1.校验 node 和磁盘 if (!validateNode(nodeName)) { throw new DsmsEngineException(ResultCode.NODE_NOT_EXIST); @@ -166,6 +175,10 @@ public class NodeServiceImpl implements INodeService { @Override public boolean removeOsd(Integer osdId) { + + if (!taskContext.validateTask(TaskTypeEnum.REMOVE_OSD, String.valueOf(osdId))) { + throw new DsmsEngineException(ResultCode.NODE_REMOVE_OSD_TASK_EXIST); + } //1.校验osdId if (!validateOsdId(osdId)) { throw new DsmsEngineException(ResultCode.NODE_OSD_NOT_EXIST); @@ -192,10 +205,20 @@ public class NodeServiceImpl implements INodeService { @Override public boolean validateNode(String nodeName) { - List nodeList = list(); + List monList = null; boolean nodeExist = false; - for (Node node : nodeList) { - if (node.getNodeName().equals(nodeName)) { + + try { + monList = nodeApi.getNodeList(RemoteCallUtil.generateRemoteRequest()); + } catch (Throwable e) { + log.warn("get node list from dsms-storage fail,fail message:{}", e.getMessage(), e); + return false; + } + if (ObjectUtils.isEmpty(monList)) { + return false; + } + for (ResponseMon mon : monList) { + if (mon.getName().equals(nodeName)) { nodeExist = true; break; } @@ -207,22 +230,27 @@ public class NodeServiceImpl implements INodeService { @Override public boolean validateDeviceAvailable(String nodeName, String devicePath) { - List nodeList = list(); - boolean nodeExist = false; + OrchDeviceLsResult orchDeviceLs = null; boolean deviceAvailable = false; - for (Node node : nodeList) { - if (node.getNodeName().equals(nodeName)) { - nodeExist = true; - for (Device device : node.getDevices()) { - if (device.getDevicePath().equals(devicePath) && DeviceStatusEnum.AVAILABLE.getStatus().equals(device.getDeviceStatus())) { - deviceAvailable = true; - break; - } - } + + try { + orchDeviceLs = nodeApi.getOrchDeviceLs(RemoteCallUtil.generateRemoteRequest(), nodeName); + } catch (Throwable e) { + log.warn("get device count from dsms-storage fail,fail message:{}", e.getMessage(), e); + return false; + } + List devices = Device.orchDeviceLsResultParseDevice(orchDeviceLs); + if (ObjectUtils.isEmpty(devices)) { + return false; + } + for (Device device : devices) { + if (device.getDevicePath().equals(devicePath) && DeviceStatusEnum.AVAILABLE.getStatus().equals(device.getDeviceStatus())) { + deviceAvailable = true; + break; } } - return nodeExist && deviceAvailable; + return deviceAvailable; } @Override diff --git a/dsms-engine-application/src/main/java/com/dsms/modules/node/task/AddOsdTask.java b/dsms-engine-application/src/main/java/com/dsms/modules/node/task/AddOsdTask.java index 5614e32b4e9a287702b86e539b7b6129435f2c8e..a110632ce12809691ca3e7f6cf27739ba1c91fa7 100644 --- a/dsms-engine-application/src/main/java/com/dsms/modules/node/task/AddOsdTask.java +++ b/dsms-engine-application/src/main/java/com/dsms/modules/node/task/AddOsdTask.java @@ -1,13 +1,16 @@ package com.dsms.modules.node.task; import com.alibaba.fastjson2.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dsms.common.constant.RemoteResponseStatusEnum; import com.dsms.common.constant.TaskStatusEnum; +import com.dsms.common.constant.TaskTypeEnum; import com.dsms.common.remotecall.model.FinishedDetail; import com.dsms.common.remotecall.model.RemoteResponse; import com.dsms.common.taskmanager.TaskException; import com.dsms.common.taskmanager.TaskStrategy; import com.dsms.common.taskmanager.model.Task; +import com.dsms.common.taskmanager.service.ITaskService; import com.dsms.dfsbroker.node.api.NodeApi; import com.dsms.dfsbroker.node.request.OsdAddRequest; import com.dsms.modules.util.RemoteCallUtil; @@ -27,6 +30,9 @@ public class AddOsdTask implements TaskStrategy { @Autowired private NodeApi nodeApi; + @Autowired + private ITaskService taskService; + @Override public Task execute(Task task) { String taskParam = task.getTaskParam(); @@ -60,4 +66,19 @@ public class AddOsdTask implements TaskStrategy { return task; } + + @Override + public boolean validateTask(String[] validateParam) { + LambdaQueryWrapper taskQuery = new LambdaQueryWrapper<>(); + taskQuery.in(Task::getTaskStatus, TaskStatusEnum.QUEUE.getStatus(), TaskStatusEnum.EXECUTING.getStatus()) + .eq(Task::getTaskType, TaskTypeEnum.ADD_OSD.getType()); + List list = taskService.list(taskQuery); + + for (Task task : list) { + if (task.getTaskMessage().contains(validateParam[0]) && task.getTaskMessage().contains(validateParam[1])) { + return false; + } + } + return true; + } } diff --git a/dsms-engine-application/src/main/java/com/dsms/modules/node/task/RemoveOsdTask.java b/dsms-engine-application/src/main/java/com/dsms/modules/node/task/RemoveOsdTask.java index 68791f5d5f156249e63d6425cef4d2053933526a..78684d949e2c072d239bb6adc150d440c0694394 100644 --- a/dsms-engine-application/src/main/java/com/dsms/modules/node/task/RemoveOsdTask.java +++ b/dsms-engine-application/src/main/java/com/dsms/modules/node/task/RemoveOsdTask.java @@ -1,13 +1,18 @@ package com.dsms.modules.node.task; import com.alibaba.fastjson2.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dsms.common.constant.RemoteResponseStatusEnum; +import com.dsms.common.constant.ResultCode; import com.dsms.common.constant.TaskStatusEnum; +import com.dsms.common.constant.TaskTypeEnum; +import com.dsms.common.exception.DsmsEngineException; import com.dsms.common.remotecall.model.FinishedDetail; import com.dsms.common.remotecall.model.RemoteResponse; import com.dsms.common.taskmanager.TaskException; import com.dsms.common.taskmanager.TaskStrategy; import com.dsms.common.taskmanager.model.Task; +import com.dsms.common.taskmanager.service.ITaskService; import com.dsms.dfsbroker.node.api.NodeApi; import com.dsms.dfsbroker.node.model.remote.OsdRmStatusResult; import com.dsms.dfsbroker.node.request.RmOsdStatusRequest; @@ -28,6 +33,8 @@ import java.util.Objects; public class RemoveOsdTask implements TaskStrategy { @Autowired private NodeApi nodeApi; + @Autowired + private ITaskService taskService; @Override public Task execute(Task task) { @@ -74,4 +81,15 @@ public class RemoveOsdTask implements TaskStrategy { return task; } + @Override + public boolean validateTask(String[] validateParam) { + LambdaQueryWrapper taskQuery = new LambdaQueryWrapper<>(); + taskQuery.in(Task::getTaskStatus, TaskStatusEnum.QUEUE.getStatus(), TaskStatusEnum.EXECUTING.getStatus()) + .eq(Task::getTaskType, TaskTypeEnum.REMOVE_OSD.getType()) + .eq(Task::getTaskParam, validateParam[0]); + long count = taskService.count(taskQuery); + + return count <= 0; + } + } diff --git a/dsms-engine-application/src/test/java/com/dsms/modules/task/service/TaskServiceTest.java b/dsms-engine-application/src/test/java/com/dsms/modules/task/service/TaskServiceTest.java index 69d5b09b3c33f418c5da9aed01c35cee92a89afd..73c81175a911f9522523708552f3f44ffb22e977 100644 --- a/dsms-engine-application/src/test/java/com/dsms/modules/task/service/TaskServiceTest.java +++ b/dsms-engine-application/src/test/java/com/dsms/modules/task/service/TaskServiceTest.java @@ -16,13 +16,17 @@ package com.dsms.modules.task.service; +import com.dsms.common.constant.TaskStatusEnum; +import com.dsms.common.constant.TaskTypeEnum; +import com.dsms.common.taskmanager.TaskContext; +import com.dsms.common.taskmanager.model.AsyncTask; import com.dsms.common.taskmanager.model.Task; import com.dsms.common.taskmanager.service.ITaskService; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; @@ -34,8 +38,10 @@ import static org.junit.jupiter.api.Assertions.*; public class TaskServiceTest { @Autowired private ITaskService taskService; - - + + @Autowired + private TaskContext taskContext; + @Test @DisplayName("任务模块crud") @@ -67,5 +73,40 @@ public class TaskServiceTest { Task taskToDelete = taskService.getById(taskEntity.getId()); assertNull(taskToDelete); } - + + @Test + @Transactional + void testOsdRemoveTaskExist() throws Exception { + Integer osdId = 0; + // 创建一个任务对象 + Task task = new AsyncTask(); + task.setTaskName(TaskTypeEnum.REMOVE_OSD.getName()); + task.setTaskType(TaskTypeEnum.REMOVE_OSD.getType()); + task.setTaskParam(String.valueOf(osdId)); + task.setTaskMessage(""); + task.setTaskStatus(TaskStatusEnum.QUEUE.getStatus()); + taskService.save(task); + boolean b = taskContext.validateTask(TaskTypeEnum.REMOVE_OSD, String.valueOf(osdId)); + Assertions.assertFalse(b); + + } + + @Test + @Transactional + void testOsdAddTaskExist() throws Exception { + String nodeName = "node1"; + String devicePath = "/dev/sda"; + // 创建一个任务对象 + Task task = new AsyncTask(); + task.setTaskName(TaskTypeEnum.ADD_OSD.getName()); + task.setTaskType(TaskTypeEnum.ADD_OSD.getType()); + task.setTaskParam(""); + task.setTaskMessage(nodeName + devicePath); + task.setTaskStatus(TaskStatusEnum.QUEUE.getStatus()); + taskService.save(task); + boolean validateTask = taskContext.validateTask(TaskTypeEnum.ADD_OSD, nodeName, devicePath); + Assertions.assertFalse(validateTask); + + } + } \ No newline at end of file diff --git a/dsms-engine-application/src/test/resources/db/schema-h2.sql b/dsms-engine-application/src/test/resources/db/schema-h2.sql index 9943f5736e83d1aac59a777e638b825bc9aab6a1..9839b070030bda0dc42b8e9a5466a297b7f7c3ca 100644 --- a/dsms-engine-application/src/test/resources/db/schema-h2.sql +++ b/dsms-engine-application/src/test/resources/db/schema-h2.sql @@ -69,6 +69,6 @@ CREATE TABLE `task` ( `task_message` varchar(32) NOT NULL DEFAULT '' COMMENT '任务信息', `task_param` varchar(32) NOT NULL DEFAULT '' COMMENT '任务参数', `task_start_time` datetime NOT NULL DEFAULT current_timestamp() COMMENT '任务开始时间', - `task_end_time` datetime NOT NULL COMMENT '任务结束时间', + `task_end_time` datetime COMMENT '任务结束时间', PRIMARY KEY (`id`) ); \ No newline at end of file diff --git a/dsms-engine-common/src/main/java/com/dsms/common/constant/ResultCode.java b/dsms-engine-common/src/main/java/com/dsms/common/constant/ResultCode.java index 29c8b13a423f47dd927b544e7772282e9abc0a2a..d494786abe8de37f31ebd56219ea973c80595e71 100644 --- a/dsms-engine-common/src/main/java/com/dsms/common/constant/ResultCode.java +++ b/dsms-engine-common/src/main/java/com/dsms/common/constant/ResultCode.java @@ -55,6 +55,8 @@ public enum ResultCode { NODE_NOT_EXIST(1204, "节点不存在"), NODE_DEVICE_NOT_AVAILABLE(1205, "磁盘不可用"), NODE_OSD_NOT_EXIST(1206, "osd不存在"), + NODE_REMOVE_OSD_TASK_EXIST(1207, "节点移除磁盘任务已存在"), + NODE_ADD_OSD_TASK_EXIST(1208, "节点管理磁盘任务已存在"), ; /** diff --git a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskFactory.java b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskContext.java similarity index 61% rename from dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskFactory.java rename to dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskContext.java index c75d84309b64c20c078836b1771fc13a105f9a2d..83e49d2c980472d345e9200182e2d3e480873a43 100644 --- a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskFactory.java +++ b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskContext.java @@ -1,5 +1,6 @@ package com.dsms.common.taskmanager; +import com.dsms.common.constant.TaskTypeEnum; import com.dsms.common.taskmanager.model.Task; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -8,7 +9,7 @@ import java.util.Map; import java.util.Optional; @Component -public class TaskFactory { +public class TaskContext { /** * Spring会自动将TaskStrategy接口的实现类注入到这个Map中,key为bean id,value值则为对应的策略实现类 */ @@ -23,8 +24,21 @@ public class TaskFactory { */ public Task execute(Task executeTask) { TaskStrategy taskStrategy = Optional.ofNullable(dsmsStorageTaskMap.get(executeTask.getTaskType())) - .orElseThrow(() -> new IllegalArgumentException("Invalid task type")); + .orElseThrow(() -> new IllegalArgumentException("Invalid task type")); return taskStrategy.execute(executeTask); } + /** + * 校验任务唯一性 + * + * @param taskTypeEnum 任务类型 + * @param validateParam 校验参数 + * @return 校验结果 + */ + public boolean validateTask(TaskTypeEnum taskTypeEnum, String... validateParam) { + TaskStrategy taskStrategy = Optional.ofNullable(dsmsStorageTaskMap.get(taskTypeEnum.getType())) + .orElseThrow(() -> new IllegalArgumentException("Invalid task type")); + return taskStrategy.validateTask(validateParam); + } + } diff --git a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskStrategy.java b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskStrategy.java index c7a1ffb20d9c00e4caac7bbe9b5d2775852b1ccf..aab656c37340c63145daa445be14a931eb622bbb 100644 --- a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskStrategy.java +++ b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskStrategy.java @@ -9,4 +9,5 @@ public interface TaskStrategy { */ Task execute(Task task); + boolean validateTask(String[] validateParam); } diff --git a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/job/TaskJob.java b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/job/TaskJob.java index de97fe385c8910c0f2f719d10cdbf866b2e80564..7b4e30090897b3d2b32c7e53e996bd77ed0a60ea 100644 --- a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/job/TaskJob.java +++ b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/job/TaskJob.java @@ -17,7 +17,8 @@ package com.dsms.common.taskmanager.job; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dsms.common.constant.TaskStatusEnum; -import com.dsms.common.taskmanager.TaskFactory; +import com.dsms.common.taskmanager.TaskContext; +import com.dsms.common.taskmanager.TaskContext; import com.dsms.common.taskmanager.model.Task; import com.dsms.common.taskmanager.service.ITaskService; import lombok.extern.slf4j.Slf4j; @@ -36,7 +37,7 @@ public class TaskJob { private ITaskService taskService; @Autowired - private TaskFactory taskFactory; + private TaskContext taskContext; @Autowired private ThreadPoolTaskExecutor dsmsExecutor; @@ -63,7 +64,7 @@ public class TaskJob { //更新任务状态为执行中,防止重复执行 task.setTaskStatus(TaskStatusEnum.EXECUTING.getStatus()); taskService.updateById(task); - Task executeEndTask = taskFactory.execute(task); + Task executeEndTask = taskContext.execute(task); taskService.updateById(executeEndTask); taskService.updateFrontTaskInfo(executeEndTask); });