From 42b9f086f6262db88210914febc6fcce8f61d960 Mon Sep 17 00:00:00 2001 From: mabofu Date: Thu, 29 Dec 2022 14:04:15 +0800 Subject: [PATCH] update node osd add and remove feature --- .../node/service/impl/NodeServiceImpl.java | 58 ++++++++++++++----- .../dsms/modules/node/task/AddOsdTask.java | 21 +++++++ .../dsms/modules/node/task/RemoveOsdTask.java | 18 ++++++ .../modules/task/service/TaskServiceTest.java | 49 ++++++++++++++-- .../src/test/resources/db/schema-h2.sql | 2 +- .../com/dsms/common/constant/ResultCode.java | 2 + .../{TaskFactory.java => TaskContext.java} | 18 +++++- .../dsms/common/taskmanager/TaskStrategy.java | 1 + .../dsms/common/taskmanager/job/TaskJob.java | 7 ++- 9 files changed, 151 insertions(+), 25 deletions(-) rename dsms-engine-common/src/main/java/com/dsms/common/taskmanager/{TaskFactory.java => TaskContext.java} (61%) diff --git a/dsms-engine-application/src/main/java/com/dsms/modules/node/service/impl/NodeServiceImpl.java b/dsms-engine-application/src/main/java/com/dsms/modules/node/service/impl/NodeServiceImpl.java index ebe4550..ac01177 100644 --- a/dsms-engine-application/src/main/java/com/dsms/modules/node/service/impl/NodeServiceImpl.java +++ b/dsms-engine-application/src/main/java/com/dsms/modules/node/service/impl/NodeServiceImpl.java @@ -16,10 +16,12 @@ package com.dsms.modules.node.service.impl; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dsms.common.constant.*; import com.dsms.common.exception.DsmsEngineException; import com.dsms.common.remotecall.model.RemoteResponse; import com.dsms.common.taskmanager.TaskException; +import com.dsms.common.taskmanager.TaskContext; import com.dsms.common.taskmanager.model.AsyncTask; import com.dsms.common.taskmanager.model.Task; import com.dsms.common.taskmanager.service.ITaskService; @@ -50,6 +52,9 @@ public class NodeServiceImpl implements INodeService { @Autowired private ITaskService taskService; + @Autowired + private TaskContext taskContext; + @Override public List list() { @@ -135,6 +140,10 @@ public class NodeServiceImpl implements INodeService { @Override public boolean addOsd(String nodeName, String devicePath) { + + if (!taskContext.validateTask(TaskTypeEnum.ADD_OSD, nodeName, devicePath)) { + throw new DsmsEngineException(ResultCode.NODE_ADD_OSD_TASK_EXIST); + } //1.校验 node 和磁盘 if (!validateNode(nodeName)) { throw new DsmsEngineException(ResultCode.NODE_NOT_EXIST); @@ -166,6 +175,10 @@ public class NodeServiceImpl implements INodeService { @Override public boolean removeOsd(Integer osdId) { + + if (!taskContext.validateTask(TaskTypeEnum.REMOVE_OSD, String.valueOf(osdId))) { + throw new DsmsEngineException(ResultCode.NODE_REMOVE_OSD_TASK_EXIST); + } //1.校验osdId if (!validateOsdId(osdId)) { throw new DsmsEngineException(ResultCode.NODE_OSD_NOT_EXIST); @@ -192,10 +205,20 @@ public class NodeServiceImpl implements INodeService { @Override public boolean validateNode(String nodeName) { - List nodeList = list(); + List monList = null; boolean nodeExist = false; - for (Node node : nodeList) { - if (node.getNodeName().equals(nodeName)) { + + try { + monList = nodeApi.getNodeList(RemoteCallUtil.generateRemoteRequest()); + } catch (Throwable e) { + log.warn("get node list from dsms-storage fail,fail message:{}", e.getMessage(), e); + return false; + } + if (ObjectUtils.isEmpty(monList)) { + return false; + } + for (ResponseMon mon : monList) { + if (mon.getName().equals(nodeName)) { nodeExist = true; break; } @@ -207,22 +230,27 @@ public class NodeServiceImpl implements INodeService { @Override public boolean validateDeviceAvailable(String nodeName, String devicePath) { - List nodeList = list(); - boolean nodeExist = false; + OrchDeviceLsResult orchDeviceLs = null; boolean deviceAvailable = false; - for (Node node : nodeList) { - if (node.getNodeName().equals(nodeName)) { - nodeExist = true; - for (Device device : node.getDevices()) { - if (device.getDevicePath().equals(devicePath) && DeviceStatusEnum.AVAILABLE.getStatus().equals(device.getDeviceStatus())) { - deviceAvailable = true; - break; - } - } + + try { + orchDeviceLs = nodeApi.getOrchDeviceLs(RemoteCallUtil.generateRemoteRequest(), nodeName); + } catch (Throwable e) { + log.warn("get device count from dsms-storage fail,fail message:{}", e.getMessage(), e); + return false; + } + List devices = Device.orchDeviceLsResultParseDevice(orchDeviceLs); + if (ObjectUtils.isEmpty(devices)) { + return false; + } + for (Device device : devices) { + if (device.getDevicePath().equals(devicePath) && DeviceStatusEnum.AVAILABLE.getStatus().equals(device.getDeviceStatus())) { + deviceAvailable = true; + break; } } - return nodeExist && deviceAvailable; + return deviceAvailable; } @Override diff --git a/dsms-engine-application/src/main/java/com/dsms/modules/node/task/AddOsdTask.java b/dsms-engine-application/src/main/java/com/dsms/modules/node/task/AddOsdTask.java index 5614e32..a110632 100644 --- a/dsms-engine-application/src/main/java/com/dsms/modules/node/task/AddOsdTask.java +++ b/dsms-engine-application/src/main/java/com/dsms/modules/node/task/AddOsdTask.java @@ -1,13 +1,16 @@ package com.dsms.modules.node.task; import com.alibaba.fastjson2.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dsms.common.constant.RemoteResponseStatusEnum; import com.dsms.common.constant.TaskStatusEnum; +import com.dsms.common.constant.TaskTypeEnum; import com.dsms.common.remotecall.model.FinishedDetail; import com.dsms.common.remotecall.model.RemoteResponse; import com.dsms.common.taskmanager.TaskException; import com.dsms.common.taskmanager.TaskStrategy; import com.dsms.common.taskmanager.model.Task; +import com.dsms.common.taskmanager.service.ITaskService; import com.dsms.dfsbroker.node.api.NodeApi; import com.dsms.dfsbroker.node.request.OsdAddRequest; import com.dsms.modules.util.RemoteCallUtil; @@ -27,6 +30,9 @@ public class AddOsdTask implements TaskStrategy { @Autowired private NodeApi nodeApi; + @Autowired + private ITaskService taskService; + @Override public Task execute(Task task) { String taskParam = task.getTaskParam(); @@ -60,4 +66,19 @@ public class AddOsdTask implements TaskStrategy { return task; } + + @Override + public boolean validateTask(String[] validateParam) { + LambdaQueryWrapper taskQuery = new LambdaQueryWrapper<>(); + taskQuery.in(Task::getTaskStatus, TaskStatusEnum.QUEUE.getStatus(), TaskStatusEnum.EXECUTING.getStatus()) + .eq(Task::getTaskType, TaskTypeEnum.ADD_OSD.getType()); + List list = taskService.list(taskQuery); + + for (Task task : list) { + if (task.getTaskMessage().contains(validateParam[0]) && task.getTaskMessage().contains(validateParam[1])) { + return false; + } + } + return true; + } } diff --git a/dsms-engine-application/src/main/java/com/dsms/modules/node/task/RemoveOsdTask.java b/dsms-engine-application/src/main/java/com/dsms/modules/node/task/RemoveOsdTask.java index 68791f5..78684d9 100644 --- a/dsms-engine-application/src/main/java/com/dsms/modules/node/task/RemoveOsdTask.java +++ b/dsms-engine-application/src/main/java/com/dsms/modules/node/task/RemoveOsdTask.java @@ -1,13 +1,18 @@ package com.dsms.modules.node.task; import com.alibaba.fastjson2.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dsms.common.constant.RemoteResponseStatusEnum; +import com.dsms.common.constant.ResultCode; import com.dsms.common.constant.TaskStatusEnum; +import com.dsms.common.constant.TaskTypeEnum; +import com.dsms.common.exception.DsmsEngineException; import com.dsms.common.remotecall.model.FinishedDetail; import com.dsms.common.remotecall.model.RemoteResponse; import com.dsms.common.taskmanager.TaskException; import com.dsms.common.taskmanager.TaskStrategy; import com.dsms.common.taskmanager.model.Task; +import com.dsms.common.taskmanager.service.ITaskService; import com.dsms.dfsbroker.node.api.NodeApi; import com.dsms.dfsbroker.node.model.remote.OsdRmStatusResult; import com.dsms.dfsbroker.node.request.RmOsdStatusRequest; @@ -28,6 +33,8 @@ import java.util.Objects; public class RemoveOsdTask implements TaskStrategy { @Autowired private NodeApi nodeApi; + @Autowired + private ITaskService taskService; @Override public Task execute(Task task) { @@ -74,4 +81,15 @@ public class RemoveOsdTask implements TaskStrategy { return task; } + @Override + public boolean validateTask(String[] validateParam) { + LambdaQueryWrapper taskQuery = new LambdaQueryWrapper<>(); + taskQuery.in(Task::getTaskStatus, TaskStatusEnum.QUEUE.getStatus(), TaskStatusEnum.EXECUTING.getStatus()) + .eq(Task::getTaskType, TaskTypeEnum.REMOVE_OSD.getType()) + .eq(Task::getTaskParam, validateParam[0]); + long count = taskService.count(taskQuery); + + return count <= 0; + } + } diff --git a/dsms-engine-application/src/test/java/com/dsms/modules/task/service/TaskServiceTest.java b/dsms-engine-application/src/test/java/com/dsms/modules/task/service/TaskServiceTest.java index 69d5b09..73c8117 100644 --- a/dsms-engine-application/src/test/java/com/dsms/modules/task/service/TaskServiceTest.java +++ b/dsms-engine-application/src/test/java/com/dsms/modules/task/service/TaskServiceTest.java @@ -16,13 +16,17 @@ package com.dsms.modules.task.service; +import com.dsms.common.constant.TaskStatusEnum; +import com.dsms.common.constant.TaskTypeEnum; +import com.dsms.common.taskmanager.TaskContext; +import com.dsms.common.taskmanager.model.AsyncTask; import com.dsms.common.taskmanager.model.Task; import com.dsms.common.taskmanager.service.ITaskService; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; @@ -34,8 +38,10 @@ import static org.junit.jupiter.api.Assertions.*; public class TaskServiceTest { @Autowired private ITaskService taskService; - - + + @Autowired + private TaskContext taskContext; + @Test @DisplayName("任务模块crud") @@ -67,5 +73,40 @@ public class TaskServiceTest { Task taskToDelete = taskService.getById(taskEntity.getId()); assertNull(taskToDelete); } - + + @Test + @Transactional + void testOsdRemoveTaskExist() throws Exception { + Integer osdId = 0; + // 创建一个任务对象 + Task task = new AsyncTask(); + task.setTaskName(TaskTypeEnum.REMOVE_OSD.getName()); + task.setTaskType(TaskTypeEnum.REMOVE_OSD.getType()); + task.setTaskParam(String.valueOf(osdId)); + task.setTaskMessage(""); + task.setTaskStatus(TaskStatusEnum.QUEUE.getStatus()); + taskService.save(task); + boolean b = taskContext.validateTask(TaskTypeEnum.REMOVE_OSD, String.valueOf(osdId)); + Assertions.assertFalse(b); + + } + + @Test + @Transactional + void testOsdAddTaskExist() throws Exception { + String nodeName = "node1"; + String devicePath = "/dev/sda"; + // 创建一个任务对象 + Task task = new AsyncTask(); + task.setTaskName(TaskTypeEnum.ADD_OSD.getName()); + task.setTaskType(TaskTypeEnum.ADD_OSD.getType()); + task.setTaskParam(""); + task.setTaskMessage(nodeName + devicePath); + task.setTaskStatus(TaskStatusEnum.QUEUE.getStatus()); + taskService.save(task); + boolean validateTask = taskContext.validateTask(TaskTypeEnum.ADD_OSD, nodeName, devicePath); + Assertions.assertFalse(validateTask); + + } + } \ No newline at end of file diff --git a/dsms-engine-application/src/test/resources/db/schema-h2.sql b/dsms-engine-application/src/test/resources/db/schema-h2.sql index 9943f57..9839b07 100644 --- a/dsms-engine-application/src/test/resources/db/schema-h2.sql +++ b/dsms-engine-application/src/test/resources/db/schema-h2.sql @@ -69,6 +69,6 @@ CREATE TABLE `task` ( `task_message` varchar(32) NOT NULL DEFAULT '' COMMENT '任务信息', `task_param` varchar(32) NOT NULL DEFAULT '' COMMENT '任务参数', `task_start_time` datetime NOT NULL DEFAULT current_timestamp() COMMENT '任务开始时间', - `task_end_time` datetime NOT NULL COMMENT '任务结束时间', + `task_end_time` datetime COMMENT '任务结束时间', PRIMARY KEY (`id`) ); \ No newline at end of file diff --git a/dsms-engine-common/src/main/java/com/dsms/common/constant/ResultCode.java b/dsms-engine-common/src/main/java/com/dsms/common/constant/ResultCode.java index 29c8b13..d494786 100644 --- a/dsms-engine-common/src/main/java/com/dsms/common/constant/ResultCode.java +++ b/dsms-engine-common/src/main/java/com/dsms/common/constant/ResultCode.java @@ -55,6 +55,8 @@ public enum ResultCode { NODE_NOT_EXIST(1204, "节点不存在"), NODE_DEVICE_NOT_AVAILABLE(1205, "磁盘不可用"), NODE_OSD_NOT_EXIST(1206, "osd不存在"), + NODE_REMOVE_OSD_TASK_EXIST(1207, "节点移除磁盘任务已存在"), + NODE_ADD_OSD_TASK_EXIST(1208, "节点管理磁盘任务已存在"), ; /** diff --git a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskFactory.java b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskContext.java similarity index 61% rename from dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskFactory.java rename to dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskContext.java index c75d843..83e49d2 100644 --- a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskFactory.java +++ b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskContext.java @@ -1,5 +1,6 @@ package com.dsms.common.taskmanager; +import com.dsms.common.constant.TaskTypeEnum; import com.dsms.common.taskmanager.model.Task; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -8,7 +9,7 @@ import java.util.Map; import java.util.Optional; @Component -public class TaskFactory { +public class TaskContext { /** * Spring会自动将TaskStrategy接口的实现类注入到这个Map中,key为bean id,value值则为对应的策略实现类 */ @@ -23,8 +24,21 @@ public class TaskFactory { */ public Task execute(Task executeTask) { TaskStrategy taskStrategy = Optional.ofNullable(dsmsStorageTaskMap.get(executeTask.getTaskType())) - .orElseThrow(() -> new IllegalArgumentException("Invalid task type")); + .orElseThrow(() -> new IllegalArgumentException("Invalid task type")); return taskStrategy.execute(executeTask); } + /** + * 校验任务唯一性 + * + * @param taskTypeEnum 任务类型 + * @param validateParam 校验参数 + * @return 校验结果 + */ + public boolean validateTask(TaskTypeEnum taskTypeEnum, String... validateParam) { + TaskStrategy taskStrategy = Optional.ofNullable(dsmsStorageTaskMap.get(taskTypeEnum.getType())) + .orElseThrow(() -> new IllegalArgumentException("Invalid task type")); + return taskStrategy.validateTask(validateParam); + } + } diff --git a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskStrategy.java b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskStrategy.java index c7a1ffb..aab656c 100644 --- a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskStrategy.java +++ b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/TaskStrategy.java @@ -9,4 +9,5 @@ public interface TaskStrategy { */ Task execute(Task task); + boolean validateTask(String[] validateParam); } diff --git a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/job/TaskJob.java b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/job/TaskJob.java index de97fe3..7b4e300 100644 --- a/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/job/TaskJob.java +++ b/dsms-engine-common/src/main/java/com/dsms/common/taskmanager/job/TaskJob.java @@ -17,7 +17,8 @@ package com.dsms.common.taskmanager.job; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dsms.common.constant.TaskStatusEnum; -import com.dsms.common.taskmanager.TaskFactory; +import com.dsms.common.taskmanager.TaskContext; +import com.dsms.common.taskmanager.TaskContext; import com.dsms.common.taskmanager.model.Task; import com.dsms.common.taskmanager.service.ITaskService; import lombok.extern.slf4j.Slf4j; @@ -36,7 +37,7 @@ public class TaskJob { private ITaskService taskService; @Autowired - private TaskFactory taskFactory; + private TaskContext taskContext; @Autowired private ThreadPoolTaskExecutor dsmsExecutor; @@ -63,7 +64,7 @@ public class TaskJob { //更新任务状态为执行中,防止重复执行 task.setTaskStatus(TaskStatusEnum.EXECUTING.getStatus()); taskService.updateById(task); - Task executeEndTask = taskFactory.execute(task); + Task executeEndTask = taskContext.execute(task); taskService.updateById(executeEndTask); taskService.updateFrontTaskInfo(executeEndTask); }); -- Gitee