From cddc1f62bcfca09660537ecacd918b89665d2a06 Mon Sep 17 00:00:00 2001 From: zhanghan Date: Fri, 8 Aug 2025 16:07:04 +0800 Subject: [PATCH] Modify the method for obtaining plugin information and status --- cmd/server/app/cmd/commands/server.go | 4 - .../controller/pluginWebsocketProxy.go | 27 +--- .../network/controller/pluginapi/script.go | 17 +- cmd/server/app/network/httpserver.go | 10 +- .../app/network/middleware/pluginauth.go | 2 - cmd/server/app/service/plugin/event.go | 23 +-- cmd/server/app/service/plugin/heartbeat.go | 150 ------------------ cmd/server/app/service/plugin/proxy.go | 8 - cmd/server/app/service/tag/tag.go | 19 +-- sdk/plugin/client/client.go | 8 +- 10 files changed, 32 insertions(+), 236 deletions(-) delete mode 100644 cmd/server/app/service/plugin/heartbeat.go delete mode 100644 cmd/server/app/service/plugin/proxy.go diff --git a/cmd/server/app/cmd/commands/server.go b/cmd/server/app/cmd/commands/server.go index 8efbf021..0e2f1d99 100644 --- a/cmd/server/app/cmd/commands/server.go +++ b/cmd/server/app/cmd/commands/server.go @@ -17,7 +17,6 @@ import ( "gitee.com/openeuler/PilotGo/cmd/server/app/network" "gitee.com/openeuler/PilotGo/cmd/server/app/network/websocket" "gitee.com/openeuler/PilotGo/cmd/server/app/service/auth" - "gitee.com/openeuler/PilotGo/cmd/server/app/service/plugin" "gitee.com/openeuler/PilotGo/pkg/dbmanager" "gitee.com/openeuler/PilotGo/pkg/utils" "gitee.com/openeuler/PilotGo/sdk/logger" @@ -134,9 +133,6 @@ func startServices(mysqlInfo *options.MysqlDBInfo, stopCh <-chan struct{}) error // verify permission initialize auth.Casbin(mysqlInfo) - // plugin server initialize - plugin.Init(stopCh) - return nil } diff --git a/cmd/server/app/network/controller/pluginWebsocketProxy.go b/cmd/server/app/network/controller/pluginWebsocketProxy.go index 3a442bc1..8522c712 100644 --- a/cmd/server/app/network/controller/pluginWebsocketProxy.go +++ b/cmd/server/app/network/controller/pluginWebsocketProxy.go @@ -16,9 +16,8 @@ import ( "sync" "time" - "gitee.com/openeuler/PilotGo/cmd/server/app/service/plugin" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/sdk/logger" - "gitee.com/openeuler/PilotGo/sdk/utils/httputils" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) @@ -79,28 +78,12 @@ func PluginWebsocketGatewayHandler(c *gin.Context) { close(errChan) }() - name := c.Param("plugin_name") - p, err := plugin.GetPlugin(name) - if err != nil { - c.String(http.StatusNotFound, "plugin not found: "+err.Error()) - return - } + name := c.Param("serviceName") + p := global.GW.GetService(name) - target_addr := strings.Replace(strings.Split(p.Url, "//")[1], "/plugin/"+name, "", 1) - targetURL_str := fmt.Sprintf("ws://%s/ws/proxy", target_addr) - ishttp, err := httputils.ServerIsHttp("http://" + target_addr) - if err != nil { - c.String(http.StatusNotFound, "parse plugin url error: "+err.Error()) - return - } - if ishttp && strings.Split(targetURL_str, "://")[0] == "wss" { - targetURL_str = "ws://" + strings.Split(targetURL_str, "://")[1] - } - if !ishttp && strings.Split(targetURL_str, "://")[0] == "ws" { - targetURL_str = "wss://" + strings.Split(targetURL_str, "://")[1] - } + targetURL_str := fmt.Sprintf("ws://%s/ws/proxy", p.Address+":"+p.Port) - logger.Debug("websocket proxy plugin request: %s->%s", c.Request.RemoteAddr, target_addr) + logger.Debug("websocket proxy plugin request: %s->%s", c.Request.RemoteAddr, targetURL_str) client_wsconn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { diff --git a/cmd/server/app/network/controller/pluginapi/script.go b/cmd/server/app/network/controller/pluginapi/script.go index cb2433fc..ba8e0d82 100644 --- a/cmd/server/app/network/controller/pluginapi/script.go +++ b/cmd/server/app/network/controller/pluginapi/script.go @@ -10,13 +10,11 @@ package pluginapi import ( - "net/url" "strings" "time" "gitee.com/openeuler/PilotGo/cmd/server/app/agentmanager" "gitee.com/openeuler/PilotGo/cmd/server/app/service/batch" - "gitee.com/openeuler/PilotGo/cmd/server/app/service/plugin" "gitee.com/openeuler/PilotGo/cmd/server/app/service/script" "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/sdk/common" @@ -195,18 +193,9 @@ func RunCommandAsyncHandler(c *gin.Context) { // 获取插件地址和回调url name := c.Query("plugin_name") - p, err := plugin.GetPlugin(name) - if err != nil { - response.Fail(c, nil, "plugin not found: "+err.Error()) - return - } - parsedURL, err := url.Parse(p.Url) - if err != nil { - logger.Error("URL解析失败:%v", err) - response.Fail(c, nil, "解析插件url失败") - return - } - caller := "http://" + parsedURL.Host + "/plugin_manage/api/v1/command_result" + p := global.GW.GetService(name) + + caller := "http://" + p.Address + ":" + p.Port + "/plugin_manage/api/v1/command_result" taskId := time.Now().Format("20060102150405") macuuids := batch.GetBatchMachineUUIDS(d.Batch) diff --git a/cmd/server/app/network/httpserver.go b/cmd/server/app/network/httpserver.go index f3294b6e..57849746 100644 --- a/cmd/server/app/network/httpserver.go +++ b/cmd/server/app/network/httpserver.go @@ -236,12 +236,16 @@ func registerAPIs(router *gin.Engine) { plugin := tokenApi.Group("") // 插件管理 { + plugin.GET("/plugins_paged", controller.GetPluginServicesPaged) p := plugin.Group("/plugins") { p.GET("/", controller.GetPluginServices) p.POST("/toggle", controller.TogglePluginService) } + + gateway := router.Group("/plugin") + gateway.GET("/ws/:serviceName", controller.PluginWebsocketGatewayHandler) } } @@ -271,10 +275,4 @@ func registerPluginApi(router *gin.Engine) { pluginAPI.GET("/batch_uuid", pluginapi.MachineListOfBatch) pluginAPI.POST("/getnodefiles", pluginapi.GetNodeFiles) } - // plugin - { - pluginAPI.GET("/plugins", pluginapi.PluginList) - pluginAPI.POST("/heartbeat", pluginapi.PluginHeartbeat) - pluginAPI.POST("/has_permission", pluginapi.HasPermission) - } } diff --git a/cmd/server/app/network/middleware/pluginauth.go b/cmd/server/app/network/middleware/pluginauth.go index dee1d79e..f4b36fa0 100644 --- a/cmd/server/app/network/middleware/pluginauth.go +++ b/cmd/server/app/network/middleware/pluginauth.go @@ -10,7 +10,6 @@ package middleware import ( "net/http" - "gitee.com/openeuler/PilotGo/sdk/logger" "gitee.com/openeuler/PilotGo/sdk/plugin/jwt" "github.com/gin-gonic/gin" ) @@ -23,7 +22,6 @@ func PluginAuthServiceCheck(c *gin.Context) { "code": 401, "msg": "plugin token check error:" + err.Error()}) c.Abort() - logger.Info("ssss %v", err.Error()) return } diff --git a/cmd/server/app/service/plugin/event.go b/cmd/server/app/service/plugin/event.go index b34e00c3..854d3a31 100644 --- a/cmd/server/app/service/plugin/event.go +++ b/cmd/server/app/service/plugin/event.go @@ -10,9 +10,11 @@ package plugin import ( "encoding/json" "errors" + "fmt" "net/http" "strconv" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/sdk/common" "gitee.com/openeuler/PilotGo/sdk/utils/httputils" ) @@ -58,24 +60,11 @@ func publishEvent(eventServer string, msg common.EventMessage) error { } func isPluginEventConnected() (string, bool) { - var eventServer string - var connected bool - - plugins, err := GetPlugins() - if err != nil { + ok := global.GW.GetServiceStatus("event-service") + if !ok { return "", false } - for _, p := range plugins { - if p.Name == "event" { - eventServer = p.Url - connected = p.ConnectStatus - break - } - } - - if eventServer == "" || !connected { - return "", false - } - return eventServer, connected + service := global.GW.GetService("event-service") + return fmt.Sprintf("http://%s:%s", service.Address, service.Port), true } diff --git a/cmd/server/app/service/plugin/heartbeat.go b/cmd/server/app/service/plugin/heartbeat.go deleted file mode 100644 index 077052f9..00000000 --- a/cmd/server/app/service/plugin/heartbeat.go +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: linjieren - * Date: Thu Jul 25 16:18:53 2024 +0800 - */ -package plugin - -import ( - "fmt" - "time" - - eventSDK "gitee.com/openeuler/PilotGo-plugins/event/sdk" - "gitee.com/openeuler/PilotGo/cmd/server/app/service/internal/dao" - "gitee.com/openeuler/PilotGo/pkg/dbmanager/redismanager" - "gitee.com/openeuler/PilotGo/pkg/global" - "gitee.com/openeuler/PilotGo/sdk/logger" - "gitee.com/openeuler/PilotGo/sdk/plugin/client" - - commonSDK "gitee.com/openeuler/PilotGo/sdk/common" - "k8s.io/apimachinery/pkg/util/wait" -) - -func CheckPluginHeartbeats(stopCh <-chan struct{}) { - go wait.Until(func() { - checkAndRebind() - }, client.HeartbeatInterval, stopCh) -} - -func checkAndRebind() { - plugins, err := GetPlugins() - if err != nil { - logger.Error("get plugins failed:%v", err.Error()) - } - for _, p := range plugins { - key := client.HeartbeatKey + p.Url - plugin_status, err := redismanager.Get(key, &client.PluginStatus{}) - if err != nil { - logger.Error("Error getting %v last heartbeat: %v", p.Url, err) - continue - } - - if !plugin_status.(*client.PluginStatus).Connected || time.Since(plugin_status.(*client.PluginStatus).LastConnect) > client.HeartbeatInterval+1*time.Second { - err := Handshake(p.Url, p) - if err != nil { - if time.Since(plugin_status.(*client.PluginStatus).LastConnect) <= client.HeartbeatInterval*2+1*time.Second { - _addr := p.Url - if p.Url == "localhost" || p.Url == "127.0.0.1" { - node, err := dao.MachineInfoByUUID(p.UUID) - if err == nil { - _addr = node.IP - } else { - logger.Error("fail to get machineinfo by uuid: %s", err.Error()) - } - } - global.SendRemindMsg( - global.PluginSendMsg, - fmt.Sprintf("%s 插件离线 %s", p.Name, _addr), - ) - } - - logger.Error("rebind plugin and pilotgo server failed:%v", err.Error()) - value := client.PluginStatus{ - Connected: false, - LastConnect: plugin_status.(*client.PluginStatus).LastConnect, - } - redismanager.Set(key, value) - - // 缓存,发布“插件离线”事件 - offlineKey := "offline:" + p.UUID - offlineValue := struct { - OfflineTime time.Time - }{ - OfflineTime: time.Now(), - } - ok, err := redismanager.SetNX(offlineKey, offlineValue) - if ok && err == nil { - msgData := commonSDK.MessageData{ - MsgType: eventSDK.MsgPluginOffline, - MessageType: eventSDK.GetMessageTypeString(eventSDK.MsgPluginOffline), - TimeStamp: time.Now(), - Data: eventSDK.MDPluginChange{ - PluginName: p.Name, - Version: p.Version, - Url: p.Url, - Description: p.Description, - Status: false, - }, - } - msgDataString, err := msgData.ToMessageDataString() - if err != nil { - logger.Error("event message data marshal failed:%v", err.Error()) - } - ms := commonSDK.EventMessage{ - MessageType: eventSDK.MsgPluginOffline, - MessageData: msgDataString, - } - PublishEvent(ms) - } - } else { - if time.Since(plugin_status.(*client.PluginStatus).LastConnect) > client.HeartbeatInterval*2+1*time.Second { - _addr := p.Url - if p.Url == "localhost" || p.Url == "127.0.0.1" { - node, err := dao.MachineInfoByUUID(p.UUID) - if err == nil { - _addr = node.IP - } else { - logger.Error("fail to get machineinfo by uuid: %s", err.Error()) - } - } - global.SendRemindMsg( - global.PluginSendMsg, - fmt.Sprintf("%s 插件上线 %s", p.Name, _addr), - ) - } - - value := client.PluginStatus{ - Connected: true, - LastConnect: time.Now(), - } - redismanager.Set(key, value) - - //删除缓存,发布“插件上线”事件 - redismanager.Delete("offline:" + p.UUID) - msgData := commonSDK.MessageData{ - MsgType: eventSDK.MsgPluginOnline, - MessageType: eventSDK.GetMessageTypeString(eventSDK.MsgPluginOnline), - TimeStamp: time.Now(), - Data: eventSDK.MDPluginChange{ - PluginName: p.Name, - Version: p.Version, - Url: p.Url, - Description: p.Description, - Status: true, - }, - } - msgDataString, err := msgData.ToMessageDataString() - if err != nil { - logger.Error("event message data marshal failed:%v", err.Error()) - } - ms := commonSDK.EventMessage{ - MessageType: eventSDK.MsgPluginOnline, - MessageData: msgDataString, - } - PublishEvent(ms) - } - } - } -} diff --git a/cmd/server/app/service/plugin/proxy.go b/cmd/server/app/service/plugin/proxy.go deleted file mode 100644 index 7b5816e6..00000000 --- a/cmd/server/app/service/plugin/proxy.go +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: linjieren - * Date: Thu Jul 25 16:18:53 2024 +0800 - */ -package plugin diff --git a/cmd/server/app/service/tag/tag.go b/cmd/server/app/service/tag/tag.go index 0285f95b..6c81efb2 100644 --- a/cmd/server/app/service/tag/tag.go +++ b/cmd/server/app/service/tag/tag.go @@ -9,10 +9,11 @@ package tag import ( "encoding/json" + "fmt" "net/http" "strconv" - "gitee.com/openeuler/PilotGo/cmd/server/app/service/plugin" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/sdk/common" "gitee.com/openeuler/PilotGo/sdk/logger" "gitee.com/openeuler/PilotGo/sdk/utils/httputils" @@ -21,15 +22,13 @@ import ( // 向所有插件发送uuidlist func RequestTag(UUIDList []string) ([]common.Tag, error) { //TODO:获取在线插件列表 - plugins, err := plugin.GetPlugins() - if err != nil { - return nil, err - } + p := global.GW.GetAllServices() + msg := []common.Tag{} //向url发送请求 - for _, v := range plugins { + for _, v := range p { //TODO:规定插件接收请求的api - url := v.Url + "/plugin_manage/api/v1/gettags" + url := fmt.Sprintf("http://%s:%s/plugin_manage/api/v1/gettags", v["address"], v["port"]) uuidTags := &struct { UUIDS []string `json:"uuids"` }{ @@ -60,10 +59,12 @@ func RequestTag(UUIDList []string) ([]common.Tag, error) { if err := resp.ParseData(&tags); err != nil { logger.Error(err.Error()) } + + servicename := v["serviceName"].(string) for _, vt := range tags { - vt.PluginName = v.Name + vt.PluginName = servicename msg = append(msg, vt) } } - return msg, err + return msg, nil } diff --git a/sdk/plugin/client/client.go b/sdk/plugin/client/client.go index 3752fc45..2de32e75 100644 --- a/sdk/plugin/client/client.go +++ b/sdk/plugin/client/client.go @@ -58,19 +58,19 @@ func NewClient(serviceName string, reg registry.Registry) (*Client, error) { } // RegisterHandlers 注册一些插件标准的API接口,清单如下: -func (c *Client) RegisterHandlers(router *gin.Engine) { +func (cli *Client) RegisterHandlers(router *gin.Engine) { api := router.Group("/plugin_manage/api/v1/") { api.GET("/gettags", func(c *gin.Context) { - c.Set("__internal__client_instance", c) + c.Set("__internal__client_instance", cli) }, TagsHandler) api.PUT("/command_result", func(c *gin.Context) { - c.Set("__internal__client_instance", c) + c.Set("__internal__client_instance", cli) }, RunCommandResultHandler) } // TODO: start command result process service // c.startEventProcessor() - c.startCommandResultProcessor() + cli.startCommandResultProcessor() } -- Gitee