From 2c1ac86d674436103943c8f0d13b1855b567f2cc Mon Sep 17 00:00:00 2001 From: zhanghan Date: Wed, 15 Oct 2025 19:15:45 +0800 Subject: [PATCH] Start the task manager on the agent side --- .../agent/exec-script/controller/script.go | 25 ----- automation/agent/exec-script/router.go | 14 --- automation/agent/exec/controller/action.go | 75 +++++++++++++++ .../model/script.go => exec/model/action.go} | 23 ++--- automation/agent/exec/model/common.go | 7 ++ automation/agent/exec/router.go | 16 ++++ automation/agent/exec/service/action.go | 73 +++++++++++++++ .../script.go => exec/service/common.go} | 92 +++++-------------- automation/agent/exec/service/task.go | 23 +++++ automation/agent/pkg/router/router.go | 4 +- .../module/common/exec/{exec.go => action.go} | 8 +- .../module/job_action/service/exec.go | 4 +- 12 files changed, 234 insertions(+), 130 deletions(-) delete mode 100644 automation/agent/exec-script/controller/script.go delete mode 100644 automation/agent/exec-script/router.go create mode 100644 automation/agent/exec/controller/action.go rename automation/agent/{exec-script/model/script.go => exec/model/action.go} (62%) create mode 100644 automation/agent/exec/model/common.go create mode 100644 automation/agent/exec/router.go create mode 100644 automation/agent/exec/service/action.go rename automation/agent/{exec-script/service/script.go => exec/service/common.go} (53%) create mode 100644 automation/agent/exec/service/task.go rename automation/server/internal/module/common/exec/{exec.go => action.go} (84%) diff --git a/automation/agent/exec-script/controller/script.go b/automation/agent/exec-script/controller/script.go deleted file mode 100644 index a593e49c..00000000 --- a/automation/agent/exec-script/controller/script.go +++ /dev/null @@ -1,25 +0,0 @@ -package controller - -import ( - "ant-agent/exec-script/model" - "ant-agent/exec-script/service" - - "gitee.com/openeuler/PilotGo/sdk/logger" - "github.com/gin-gonic/gin" - "openeuler.org/PilotGo/PilotGo-plugin-automation/pkg/response" -) - -func ExecScript(c *gin.Context) { - var script model.ScriptsRun - if err := c.ShouldBindJSON(&script); err != nil { - response.Fail(c, nil, "参数错误") - return - } - logger.Info("script: %v", script) - result, err := service.ExecScript(&script) - if err != nil { - response.Fail(c, nil, err.Error()) - return - } - response.Success(c, result, "脚本执行成功") -} diff --git a/automation/agent/exec-script/router.go b/automation/agent/exec-script/router.go deleted file mode 100644 index 7034821d..00000000 --- a/automation/agent/exec-script/router.go +++ /dev/null @@ -1,14 +0,0 @@ -package execscript - -import ( - "ant-agent/exec-script/controller" - - "github.com/gin-gonic/gin" -) - -func ExecScriptHandler(router *gin.RouterGroup) { - api := router.Group("/script") - { - api.POST("/exec", controller.ExecScript) - } -} diff --git a/automation/agent/exec/controller/action.go b/automation/agent/exec/controller/action.go new file mode 100644 index 00000000..e465a2a0 --- /dev/null +++ b/automation/agent/exec/controller/action.go @@ -0,0 +1,75 @@ +package controller + +import ( + "ant-agent/exec/model" + "ant-agent/exec/service" + + "github.com/gin-gonic/gin" + "openeuler.org/PilotGo/PilotGo-plugin-automation/pkg/response" +) + +func ActionExec(c *gin.Context) { + var task model.ActionTask + if err := c.ShouldBindJSON(&task); err != nil { + response.Fail(c, nil, "参数错误") + return + } + + taskStatus := &service.TaskStatus{ + JobId: task.JobId, + ScriptId: task.ScriptId, + Status: "等待中", + CancelChan: make(chan bool, 1), + } + service.TaskManager.Tasks.Store(task.JobId+"_"+task.ScriptId, &taskStatus) + + response.Success(c, gin.H{ + "job_id": task.JobId, + "status": taskStatus.Status, + }, "任务已接受,正在执行") + go service.ActionExecWithStatus(&task, taskStatus) +} + +func CancelAction(c *gin.Context) { + jobId := c.Query("job_id") + scriptId := c.Query("script_id") + + key := jobId + "_" + scriptId + if status, ok := service.TaskManager.Tasks.Load(key); ok { + taskStatus := status.(*service.TaskStatus) + + // 只能中止运行中或等待中的任务 + if taskStatus.Status != "执行中" && taskStatus.Status != "等待中" { + response.Fail(c, nil, "任务已完成或已取消,无法中止") + return + } + + select { + case taskStatus.CancelChan <- true: + service.UpdateTaskStatus(taskStatus, "已取消") + response.Success(c, nil, "任务中止请求已发送") + default: + response.Success(c, nil, "任务正在中止中") + } + return + } + response.Fail(c, nil, "任务不存在") +} + +func GetActionStatus(c *gin.Context) { + jobId := c.Query("job_id") + scriptId := c.Query("script_id") + + key := jobId + "_" + scriptId + if status, ok := service.TaskManager.Tasks.Load(key); ok { + taskStatus := status.(*service.TaskStatus) + responseData := gin.H{ + "job_id": taskStatus.JobId, + "script_id": taskStatus.ScriptId, + "status": taskStatus.Status, + } + response.Success(c, responseData, "获取任务状态成功") + return + } + response.Fail(c, nil, "任务不存在") +} diff --git a/automation/agent/exec-script/model/script.go b/automation/agent/exec/model/action.go similarity index 62% rename from automation/agent/exec-script/model/script.go rename to automation/agent/exec/model/action.go index efffcf54..eab1a1d5 100644 --- a/automation/agent/exec-script/model/script.go +++ b/automation/agent/exec/model/action.go @@ -1,18 +1,6 @@ package model -type CmdResult struct { - RetCode int - Stdout string - Stderr string -} - -type ResultResponse struct { - JobId string - ScriptId string - Result *CmdResult -} - -type ScriptsRun struct { +type ActionTask struct { JobId string `json:"job_id"` ScriptId string `json:"script_id"` ScriptTag string `json:"script_tag"` @@ -21,3 +9,12 @@ type ScriptsRun struct { Params string `json:"params"` TimeOut int `json:"time_out"` } + +type ActionResultResponse struct { + JobId string + ScriptId string + Status string + StartTime string + EndTime string + Result *CmdResult +} diff --git a/automation/agent/exec/model/common.go b/automation/agent/exec/model/common.go new file mode 100644 index 00000000..73f6780b --- /dev/null +++ b/automation/agent/exec/model/common.go @@ -0,0 +1,7 @@ +package model + +type CmdResult struct { + RetCode int + Stdout string + Stderr string +} diff --git a/automation/agent/exec/router.go b/automation/agent/exec/router.go new file mode 100644 index 00000000..2f2a823c --- /dev/null +++ b/automation/agent/exec/router.go @@ -0,0 +1,16 @@ +package exec + +import ( + "ant-agent/exec/controller" + + "github.com/gin-gonic/gin" +) + +func ExecHandler(router *gin.RouterGroup) { + api := router.Group("/exec") + { + api.POST("/action", controller.ActionExec) + api.POST("/action/cancel", controller.CancelAction) + api.POST("/action/status", controller.GetActionStatus) + } +} diff --git a/automation/agent/exec/service/action.go b/automation/agent/exec/service/action.go new file mode 100644 index 00000000..989db200 --- /dev/null +++ b/automation/agent/exec/service/action.go @@ -0,0 +1,73 @@ +package service + +import ( + "ant-agent/exec/model" + "encoding/json" + "os" + "time" + + "gitee.com/openeuler/PilotGo/sdk/logger" +) + +func ActionExecWithStatus(task *model.ActionTask, taskStatus *TaskStatus) { + startTime := time.Now().Format("2006-01-02 15:04:05") + taskStatus.Status = "执行中" + TaskManager.Tasks.Store(task.JobId+"_"+task.ScriptId, taskStatus) + + logger.Info("开始异步执行脚本, JobId: %s, ScriptId: %s", task.JobId, task.ScriptId) + + suffix, interpreter, err := getScriptInfo(task.ScriptType) + if err != nil { + logger.Error("获取脚本类型失败: %s", err.Error()) + return + } + + scriptPath, err := createScriptFile(task.ScriptTag, task.ScriptId, suffix, task.ScriptContent) + if err != nil { + logger.Error("创建脚本失败: %s", err.Error()) + return + } + + resultFilePath, err := createJobResultPath(task.JobId, task.ScriptId) + if err != nil { + logger.Error("创建任务结果存储路径失败: %s", err.Error()) + return + } + logger.Debug("执行脚本: %s %s %v", interpreter, scriptPath, task.Params) + + stdout, stderr, execCode, err := runScriptWithCancel(interpreter, scriptPath, task.Params, task.TimeOut, taskStatus, taskStatus.CancelChan) + if err != nil { + taskStatus.Status = "失败" + } else { + taskStatus.Status = "成功" + } + + result := &model.CmdResult{ + Stdout: stdout, + Stderr: stderr, + RetCode: execCode, + } + resultResponse := &model.ActionResultResponse{ + JobId: task.JobId, + ScriptId: task.ScriptId, + Status: taskStatus.Status, + StartTime: startTime, + EndTime: time.Now().Format("2006-01-02 15:04:05"), + Result: result, + } + logger.Debug("script execution result: %+v", resultResponse) + + // 将结果写入文件 + data, err := json.MarshalIndent(resultResponse, "", " ") + if err != nil { + logger.Error("序列化结果失败: %s", err.Error()) + return + } + + if err := os.WriteFile(resultFilePath, data, 0644); err != nil { + logger.Error("写入结果文件失败: %s", err.Error()) + return + } + TaskManager.Tasks.Delete(task.JobId + "_" + task.ScriptId) + logger.Info("脚本执行结果已写入文件: %s", resultFilePath) +} diff --git a/automation/agent/exec-script/service/script.go b/automation/agent/exec/service/common.go similarity index 53% rename from automation/agent/exec-script/service/script.go rename to automation/agent/exec/service/common.go index 3d688cb0..e5184b16 100644 --- a/automation/agent/exec-script/service/script.go +++ b/automation/agent/exec/service/common.go @@ -1,18 +1,13 @@ package service import ( - "ant-agent/exec-script/model" "bytes" - "context" "encoding/base64" - "encoding/json" "fmt" "os" "os/exec" "strings" "time" - - "gitee.com/openeuler/PilotGo/sdk/logger" ) func getScriptInfo(scriptType string) (suffix string, interpreter string, err error) { @@ -33,7 +28,6 @@ func getScriptInfo(scriptType string) (suffix string, interpreter string, err er return } -// 解析转义字符的函数 // 简单的转义字符解析函数 func parseEscapeSequences(s string) string { s = strings.ReplaceAll(s, `\n`, "\n") @@ -94,83 +88,41 @@ func createJobResultPath(jobId, scriptId string) (string, error) { return resultFilePath, nil } -func runScript(interpreter, scriptPath string, params string, timeoutSec int) (string, string, int, error) { +func runScriptWithCancel(interpreter, scriptPath string, params string, timeoutSec int, taskStatus *TaskStatus, cancelChan chan bool) (string, string, int, error) { if timeoutSec <= 0 { timeoutSec = 30 } - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutSec)*time.Second) - defer cancel() args := append([]string{scriptPath}, params) - cmd := exec.CommandContext(ctx, interpreter, args...) + cmd := exec.Command(interpreter, args...) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr - err := cmd.Run() - if ctx.Err() == context.DeadlineExceeded { - return stdout.String(), "脚本执行超时", -1, fmt.Errorf("脚本执行超时") + if err := cmd.Start(); err != nil { + return "", "", -1, fmt.Errorf("启动脚本执行失败: %s", err.Error()) } - if err != nil { - retcode := -1 - if cmd.ProcessState != nil { - retcode = cmd.ProcessState.ExitCode() + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case <-time.After(time.Duration(timeoutSec) * time.Second): + if err := cmd.Process.Kill(); err != nil { + return stdout.String(), stderr.String(), -1, fmt.Errorf("脚本执行超时, 但终止进程失败: %s", err.Error()) } - errMsg := stderr.String() - if errMsg == "" { - errMsg = err.Error() + taskStatus.Status = "超时" + return stdout.String(), stderr.String(), -1, fmt.Errorf("脚本执行超时, 已终止进程") + case <-cancelChan: + if err := cmd.Process.Kill(); err != nil { + return stdout.String(), stderr.String(), -1, fmt.Errorf("任务已取消, 但终止进程失败: %s", err.Error()) } - return stdout.String(), errMsg, retcode, err - } - - return stdout.String(), stderr.String(), 0, nil -} - -func ExecScript(script *model.ScriptsRun) (interface{}, error) { - suffix, interpreter, err := getScriptInfo(script.ScriptType) - if err != nil { - return "", fmt.Errorf("获取脚本类型失败: %s", err.Error()) - } - - scriptPath, err := createScriptFile(script.ScriptTag, script.ScriptId, suffix, script.ScriptContent) - if err != nil { - return "", fmt.Errorf("创建脚本失败: %s", err.Error()) - } - - resultFilePath, err := createJobResultPath(script.JobId, script.ScriptId) - if err != nil { - return "", fmt.Errorf("创建任务结果存储路径失败: %s", err.Error()) - } - logger.Debug("run script timeout: %v", script.TimeOut) - logger.Debug("process run script command: %s %s %v", interpreter, scriptPath, script.Params) - - stdout, stderr, execCode, err := runScript(interpreter, scriptPath, script.Params, script.TimeOut) - result := &model.CmdResult{ - Stdout: stdout, - Stderr: stderr, - RetCode: execCode, - } - if err != nil { - return "", fmt.Errorf("脚本执行错误: %s", err.Error()) - } - resultResponse := &model.ResultResponse{ - JobId: script.JobId, - ScriptId: script.ScriptId, - Result: result, - } - logger.Debug("script execution result: %+v", resultResponse) - - // 将结果写入文件 - data, err := json.MarshalIndent(resultResponse, "", " ") - if err != nil { - return "", fmt.Errorf("序列化结果失败: %s", err.Error()) - } - - if err := os.WriteFile(resultFilePath, data, 0644); err != nil { - return "", fmt.Errorf("写入结果文件失败: %s", err.Error()) + taskStatus.Status = "已取消" + return stdout.String(), stderr.String(), -1, fmt.Errorf("任务已取消, 已终止进程") + case err := <-done: + return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), cmd.ProcessState.ExitCode(), err } - logger.Info("脚本执行结果已写入文件: %s", resultFilePath) - return result, nil } diff --git a/automation/agent/exec/service/task.go b/automation/agent/exec/service/task.go new file mode 100644 index 00000000..b3a53695 --- /dev/null +++ b/automation/agent/exec/service/task.go @@ -0,0 +1,23 @@ +package service + +import ( + "sync" +) + +type Manager struct { + Tasks sync.Map +} + +var TaskManager = &Manager{} + +type TaskStatus struct { + JobId string `json:"job_id"` + ScriptId string `json:"script_id"` + Status string `json:"status"` // 等待中, 执行中, 完成, 失败, 已取消 + CancelChan chan bool `json:"-"` // 用于中止任务的通道 +} + +func UpdateTaskStatus(taskStatus *TaskStatus, status string) { + taskStatus.Status = status + TaskManager.Tasks.Store(taskStatus.JobId+"_"+taskStatus.ScriptId, taskStatus) +} diff --git a/automation/agent/pkg/router/router.go b/automation/agent/pkg/router/router.go index 3445f076..517fe696 100644 --- a/automation/agent/pkg/router/router.go +++ b/automation/agent/pkg/router/router.go @@ -1,7 +1,7 @@ package router import ( - execscript "ant-agent/exec-script" + "ant-agent/exec" "ant-agent/pkg/global" "gitee.com/openeuler/PilotGo/sdk/logger" @@ -24,6 +24,6 @@ func initRouters() *gin.Engine { // 注册各自的路由模块 api := Router.Group("/plugin/automation") - execscript.ExecScriptHandler(api) + exec.ExecHandler(api) return Router } diff --git a/automation/server/internal/module/common/exec/exec.go b/automation/server/internal/module/common/exec/action.go similarity index 84% rename from automation/server/internal/module/common/exec/exec.go rename to automation/server/internal/module/common/exec/action.go index b5d0dae2..d247eef2 100644 --- a/automation/server/internal/module/common/exec/exec.go +++ b/automation/server/internal/module/common/exec/action.go @@ -9,7 +9,7 @@ import ( "net/http" ) -type ScriptsRun struct { +type ActionTask struct { JobId string `json:"job_id"` ScriptId string `json:"script_id"` ScriptTag string `json:"script_tag"` @@ -19,8 +19,8 @@ type ScriptsRun struct { TimeOut int `json:"time_out"` } -func ExecScript(ip string, sr ScriptsRun) error { - addr := fmt.Sprintf("http://%s:8277/plugin/automation/script/exec", ip) +func ExecAction(ip string, sr ActionTask) error { + addr := fmt.Sprintf("http://%s:8277/plugin/automation/exec/action", ip) dataMarshed, err := json.Marshal(sr) if err != nil { return err @@ -57,7 +57,7 @@ func ExecScript(ip string, sr ScriptsRun) error { return fmt.Errorf("解析响应体失败: %s", err.Error()) } if result.Code != http.StatusOK { - return fmt.Errorf("脚本执行失败, 报错: %s", result.Message) + return fmt.Errorf("操作任务下发失败, 报错: %s", result.Message) } return nil } diff --git a/automation/server/internal/module/job_action/service/exec.go b/automation/server/internal/module/job_action/service/exec.go index 3fa9fa1c..c5c39237 100644 --- a/automation/server/internal/module/job_action/service/exec.go +++ b/automation/server/internal/module/job_action/service/exec.go @@ -19,7 +19,7 @@ func ExecScript(ips []string, scriptID, params string, timeoutSec int) (string, } for _, ip := range ips { - sr := exec.ScriptsRun{ + sr := exec.ActionTask{ JobId: jobId, ScriptId: scriptID, ScriptTag: scriptTag, @@ -28,7 +28,7 @@ func ExecScript(ips []string, scriptID, params string, timeoutSec int) (string, Params: params, TimeOut: timeoutSec, } - err := exec.ExecScript(ip, sr) + err := exec.ExecAction(ip, sr) if err != nil { return jobId, err } -- Gitee