From d58a3fc6c04b5c18d68ee27889dc2f8768cc2da1 Mon Sep 17 00:00:00 2001 From: zhanghan2021 Date: Wed, 24 Jul 2024 09:36:58 +0800 Subject: [PATCH] delete discard code --- sdk/common/event.go | 56 ------ sdk/plugin/client/client.go | 14 +- sdk/plugin/client/event.go | 160 ---------------- sdk/plugin/client/handler.go | 24 --- src/app/server/cmd/commands/server.go | 4 - .../network/controller/pluginapi/event.go | 144 --------------- src/app/server/network/httpserver.go | 4 - src/app/server/service/eventbus/eventbus.go | 171 ------------------ 8 files changed, 5 insertions(+), 572 deletions(-) delete mode 100644 sdk/common/event.go delete mode 100644 sdk/plugin/client/event.go delete mode 100644 src/app/server/network/controller/pluginapi/event.go delete mode 100644 src/app/server/service/eventbus/eventbus.go diff --git a/sdk/common/event.go b/sdk/common/event.go deleted file mode 100644 index 21968b54..00000000 --- a/sdk/common/event.go +++ /dev/null @@ -1,56 +0,0 @@ -package common - -import "encoding/json" - -// event消息类型定义 -const ( - // 主机安装软件包 - MsgPackageInstall = 0 - // 主机升级软件包 - MsgPackageUpdate = 1 - // 主机卸载软件包 - MsgPackageUninstall = 2 - // 主机ip变更 - MsgIPChange = 3 - - // 平台新增主机 - MsgHostAdd = 10 - // 平台移除主机 - MsgHostRemove = 11 - - // 插件添加 - MsgPluginAdd = 20 - // 插件卸载 - MsgPluginRemove = 21 -) - -// 将 MessageData json字符串转换成指定结构体的message消息数据 -func ToMessage(d string, s interface{}) error { - return json.Unmarshal([]byte(d), s) -} - -type MDPackageInstall struct { - HostUUID string - Name string - Version string - Time string -} - -type MDPackageUpdate struct { - HostUUID string - Name string - Version string - Time string -} - -type MDPackageUninstall struct { - HostUUID string - Name string - Version string - Time string -} - -type MDIPChange struct { - HostUUID string - NewIP string -} diff --git a/sdk/plugin/client/client.go b/sdk/plugin/client/client.go index 5c936d4b..b6d6ef20 100644 --- a/sdk/plugin/client/client.go +++ b/sdk/plugin/client/client.go @@ -23,8 +23,8 @@ type Client struct { token string // 用于event消息处理 - eventChan chan *common.EventMessage - eventCallbackMap map[int]EventCallback + EventChan chan *common.EventMessage + EventCallbackMap map[int]EventCallback // 用于异步command及script执行结果处理机 asyncCmdResultChan chan *common.AsyncCmdResult @@ -50,8 +50,8 @@ func DefaultClient(desc *PluginInfo) *Client { global_client = &Client{ PluginInfo: desc, - eventChan: make(chan *common.EventMessage, 20), - eventCallbackMap: make(map[int]EventCallback), + EventChan: make(chan *common.EventMessage, 20), + EventCallbackMap: make(map[int]EventCallback), asyncCmdResultChan: make(chan *common.AsyncCmdResult, 20), cmdProcessorCallbackMap: make(map[string]CallbackHandler), @@ -90,10 +90,6 @@ func (client *Client) RegisterHandlers(router *gin.Engine) { c.Set("__internal__client_instance", client) }, tagsHandler) - api.POST("/event", func(c *gin.Context) { - c.Set("__internal__client_instance", client) - }, eventHandler) - api.PUT("/command_result", func(c *gin.Context) { c.Set("__internal__client_instance", client) }, commandResultHandler) @@ -108,7 +104,7 @@ func (client *Client) RegisterHandlers(router *gin.Engine) { // } // TODO: start command result process service - client.startEventProcessor() + // client.startEventProcessor() client.startCommandResultProcessor() } diff --git a/sdk/plugin/client/event.go b/sdk/plugin/client/event.go deleted file mode 100644 index 8d76833b..00000000 --- a/sdk/plugin/client/event.go +++ /dev/null @@ -1,160 +0,0 @@ -package client - -import ( - "encoding/json" - "errors" - "net/http" - "strconv" - "strings" - - "gitee.com/openeuler/PilotGo/sdk/common" - "gitee.com/openeuler/PilotGo/sdk/utils/httputils" -) - -type EventCallback func(e *common.EventMessage) - -// 注册event事件监听 -func (c *Client) ListenEvent(eventTypes []int, callbacks []EventCallback) error { - var eventtypes []string - for _, i := range eventTypes { - eventtypes = append(eventtypes, strconv.Itoa(i)) - } - - url := c.Server() + "/api/v1/pluginapi/listener?eventTypes=" + strings.Join(eventtypes, ",") - r, err := httputils.Put(url, &httputils.Params{ - Body: c.PluginInfo, - Cookie: map[string]string{ - TokenCookie: c.token, - }, - }) - if err != nil { - return err - } - if r.StatusCode != http.StatusOK { - return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) - } - - resp := &common.CommonResult{} - if err := json.Unmarshal(r.Body, resp); err != nil { - return err - } - if resp.Code != http.StatusOK { - return errors.New(resp.Message) - } - - data := &struct { - Status string `json:"status"` - Error string `json:"error"` - }{} - if err := resp.ParseData(data); err != nil { - return err - } - for i, eventType := range eventTypes { - c.registerEventCallback(eventType, callbacks[i]) - } - return nil -} - -// 取消注册event事件监听 -func (c *Client) UnListenEvent(eventTypes []int) error { - var eventtypes []string - for _, i := range eventTypes { - eventtypes = append(eventtypes, strconv.Itoa(i)) - } - - url := c.Server() + "/api/v1/pluginapi/listener?eventTypes=" + strings.Join(eventtypes, ",") - r, err := httputils.Delete(url, &httputils.Params{ - Body: c.PluginInfo, - Cookie: map[string]string{ - TokenCookie: c.token, - }, - }) - if err != nil { - return err - } - if r.StatusCode != http.StatusOK { - return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) - } - - resp := &common.CommonResult{} - if err := json.Unmarshal(r.Body, resp); err != nil { - return err - } - if resp.Code != http.StatusOK { - return errors.New(resp.Message) - } - - data := &struct { - Status string `json:"status"` - Error string `json:"error"` - }{} - if err := resp.ParseData(data); err != nil { - return err - } - - for _, eventType := range eventTypes { - c.unregisterEventCallback(eventType) - } - return nil -} - -// 发布event事件 -func (c *Client) PublishEvent(msg common.EventMessage) error { - url := c.Server() + "/api/v1/pluginapi/publish_event" - r, err := httputils.Put(url, &httputils.Params{ - Body: &msg, - Cookie: map[string]string{ - TokenCookie: c.token, - }, - }) - if err != nil { - return err - } - if r.StatusCode != http.StatusOK { - return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) - } - - resp := &common.CommonResult{} - if err := json.Unmarshal(r.Body, resp); err != nil { - return err - } - if resp.Code != http.StatusOK { - return errors.New(resp.Message) - } - - data := &struct { - Status string `json:"status"` - Error string `json:"error"` - }{} - if err := resp.ParseData(data); err != nil { - return err - } - return nil -} - -func (c *Client) registerEventCallback(eventType int, callback EventCallback) { - c.eventCallbackMap[eventType] = callback -} - -func (c *Client) unregisterEventCallback(eventType int) { - delete(c.eventCallbackMap, eventType) -} - -func (c *Client) ProcessEvent(event *common.EventMessage) { - c.eventChan <- event -} - -func (c *Client) startEventProcessor() { - go func() { - for { - e := <-c.eventChan - - // TODO: process event message - cb, ok := c.eventCallbackMap[e.MessageType] - if ok { - cb(e) - } - } - }() - -} diff --git a/sdk/plugin/client/handler.go b/sdk/plugin/client/handler.go index b8c9def7..109ffe06 100644 --- a/sdk/plugin/client/handler.go +++ b/sdk/plugin/client/handler.go @@ -85,30 +85,6 @@ func bindHandler(c *gin.Context) { response.Success(c, nil, "bind server success") } -func eventHandler(c *gin.Context) { - j, err := io.ReadAll(c.Request.Body) // 接收数据 - if err != nil { - logger.Error("没获取到:%s", err.Error()) - return - } - var msg common.EventMessage - if err := json.Unmarshal(j, &msg); err != nil { - logger.Error("反序列化结果失败%s", err.Error()) - return - } - - v, ok := c.Get("__internal__client_instance") - if !ok { - return - } - client, ok := v.(*Client) - if !ok { - return - } - - client.ProcessEvent(&msg) -} - func commandResultHandler(c *gin.Context) { j, err := io.ReadAll(c.Request.Body) // 接收数据 if err != nil { diff --git a/src/app/server/cmd/commands/server.go b/src/app/server/cmd/commands/server.go index d50a8258..63c54a55 100644 --- a/src/app/server/cmd/commands/server.go +++ b/src/app/server/cmd/commands/server.go @@ -10,7 +10,6 @@ import ( "gitee.com/openeuler/PilotGo/app/server/network" "gitee.com/openeuler/PilotGo/app/server/network/websocket" "gitee.com/openeuler/PilotGo/app/server/service/auth" - "gitee.com/openeuler/PilotGo/app/server/service/eventbus" "gitee.com/openeuler/PilotGo/app/server/service/plugin" "gitee.com/openeuler/PilotGo/dbmanager" "gitee.com/openeuler/PilotGo/sdk/logger" @@ -131,9 +130,6 @@ func startServices(mysqlInfo *options.MysqlDBInfo, stopCh <-chan struct{}) error // 初始化plugin服务 plugin.Init(stopCh) - //初始化eventbus - eventbus.Init(stopCh) - return nil } diff --git a/src/app/server/network/controller/pluginapi/event.go b/src/app/server/network/controller/pluginapi/event.go deleted file mode 100644 index d3d25282..00000000 --- a/src/app/server/network/controller/pluginapi/event.go +++ /dev/null @@ -1,144 +0,0 @@ -// - -package pluginapi - -import ( - "strconv" - "strings" - - "gitee.com/openeuler/PilotGo/app/server/service/eventbus" - "gitee.com/openeuler/PilotGo/sdk/common" - "gitee.com/openeuler/PilotGo/sdk/plugin/client" - "gitee.com/openeuler/PilotGo/sdk/response" - "github.com/gin-gonic/gin" -) - -func RegisterListenerHandler(c *gin.Context) { - p := client.PluginInfo{} - if err := c.ShouldBind(&p); err != nil { - response.Fail(c, gin.H{"status": false}, err.Error()) - return - } - - l := &eventbus.Listener{ - Name: p.Name, - URL: p.Url, - } - /* - u, err := jwt.ParseUser(c) - if err != nil { - response.Fail(c, nil, "user token error:"+err.Error()) - return - } - log := &auditlog.AuditLog{ - LogUUID: uuid.New().String(), - ParentUUID: "", - Module: auditlog.ModulePlugin, - Status: auditlog.StatusOK, - UserID: u.ID, - Action: "Register Listener", - } - auditlog.Add(log)*/ - eventbus.AddListener(l) - - eventtypes := strings.Split(c.Query("eventTypes"), ",") - for _, v := range eventtypes { - /*log_s := &auditlog.AuditLog{ - LogUUID: uuid.New().String(), - ParentUUID: log.LogUUID, - Module: auditlog.ModulePlugin, - Status: auditlog.StatusOK, - UserID: u.ID, - Action: "Register Listener", - } - auditlog.Add(log_s)*/ - - eventtype, err := strconv.Atoi(v) - if err != nil { - //auditlog.UpdateStatus(log_s, auditlog.StatusFailed) - response.Fail(c, gin.H{"status": false}, err.Error()) - return - } - eventbus.AddEventMap(eventtype, l) - } - response.Success(c, gin.H{"status": "ok"}, "注册eventType成功") -} - -func UnregisterListenerHandler(c *gin.Context) { - p := client.PluginInfo{} - if err := c.ShouldBind(&p); err != nil { - response.Fail(c, gin.H{"status": false}, err.Error()) - return - } - l := &eventbus.Listener{ - Name: p.Name, - URL: p.Url, - } - /* - u, err := jwt.ParseUser(c) - if err != nil { - response.Fail(c, nil, "user token error:"+err.Error()) - return - }*/ - - eventtypes := strings.Split(c.Query("eventTypes"), ",") - for _, v := range eventtypes { - /*log_s := &auditlog.AuditLog{ - LogUUID: uuid.New().String(), - ParentUUID: "", - Module: auditlog.ModulePlugin, - Status: auditlog.StatusOK, - UserID: u.ID, - Action: "delete eventType", - } - auditlog.Add(log_s)*/ - - eventtype, err := strconv.Atoi(v) - if err != nil { - //auditlog.UpdateStatus(log_s, auditlog.StatusFailed) - response.Fail(c, gin.H{"status": false}, err.Error()) - return - } - eventbus.RemoveEventMap(eventtype, l) - } - - if !eventbus.IsExitEventMap(l) { - /* log := &auditlog.AuditLog{ - LogUUID: uuid.New().String(), - ParentUUID: "", - Module: auditlog.ModulePlugin, - Status: auditlog.StatusOK, - UserID: u.ID, - Action: "delete Listener", - } - auditlog.Add(log)*/ - eventbus.RemoveListener(l) - } - response.Success(c, gin.H{"status": "ok"}, "删除eventType成功") -} - -func PublishEventHandler(c *gin.Context) { - msg := &common.EventMessage{} - if err := c.ShouldBind(msg); err != nil { - response.Fail(c, gin.H{"status": false}, err.Error()) - return - } - /* - u, err := jwt.ParseUser(c) - if err != nil { - response.Fail(c, nil, "user token error:"+err.Error()) - return - } - log := &auditlog.AuditLog{ - LogUUID: uuid.New().String(), - ParentUUID: "", - Module: auditlog.ModulePlugin, - Status: auditlog.StatusOK, - UserID: u.ID, - Action: "publish Event", - } - auditlog.Add(log)*/ - - eventbus.PublishEvent(msg) - response.Success(c, gin.H{"status": "ok"}, "publishEvent成功") -} diff --git a/src/app/server/network/httpserver.go b/src/app/server/network/httpserver.go index 9d333db5..f204595b 100644 --- a/src/app/server/network/httpserver.go +++ b/src/app/server/network/httpserver.go @@ -336,10 +336,6 @@ func registerPluginApi(router *gin.Engine) { pluginAPI.POST("/run_command", pluginapi.RunCommandHandler) pluginAPI.POST("/run_script", pluginapi.RunScriptHandler) - pluginAPI.PUT("/listener", pluginapi.RegisterListenerHandler) - pluginAPI.PUT("/publish_event", pluginapi.PublishEventHandler) - pluginAPI.DELETE("/listener", pluginapi.UnregisterListenerHandler) - pluginAPI.PUT("/install_package", pluginapi.InstallPackage) pluginAPI.PUT("/uninstall_package", pluginapi.UninstallPackage) diff --git a/src/app/server/service/eventbus/eventbus.go b/src/app/server/service/eventbus/eventbus.go deleted file mode 100644 index f36365a5..00000000 --- a/src/app/server/service/eventbus/eventbus.go +++ /dev/null @@ -1,171 +0,0 @@ -package eventbus - -import ( - "net/http" - "strconv" - "sync" - - "gitee.com/openeuler/PilotGo/sdk/common" - "gitee.com/openeuler/PilotGo/sdk/logger" - "gitee.com/openeuler/PilotGo/sdk/utils/httputils" - "k8s.io/klog/v2" -) - -type Listener struct { - Name string - URL string -} - -type EventBus struct { - sync.Mutex - listeners []*Listener - stop chan struct{} - event chan *common.EventMessage -} - -var eventTypeMap map[int][]Listener - -// 添加监听事件 -func (e *EventBus) AddListener(l *Listener) { - e.Lock() - defer e.Unlock() - e.listeners = append(e.listeners, l) -} - -// 删除监听事件 -func (e *EventBus) RemoveListener(l *Listener) { - e.Lock() - defer e.Unlock() - - for index, v := range e.listeners { - if v.Name == l.Name && v.URL == l.URL { - if index == len(e.listeners)-1 { - e.listeners = e.listeners[:index] - } else { - e.listeners = append(e.listeners[:index], e.listeners[index+1:]...) - } - break - } - } -} - -// 添加event事件 -func (e *EventBus) AddEventMap(eventtpye int, l *Listener) { - e.Lock() - defer e.Unlock() - eventTypeMap[eventtpye] = append(eventTypeMap[eventtpye], *l) -} - -// 删除event事件 -func (e *EventBus) RemoveEventMap(eventtpye int, l *Listener) { - e.Lock() - defer e.Unlock() - for i, v := range eventTypeMap[eventtpye] { - if (v.Name == l.Name) && (v.URL == l.URL) { - if i == len(eventTypeMap[eventtpye])-1 { - eventTypeMap[eventtpye] = eventTypeMap[eventtpye][:i] - } else { - eventTypeMap[eventtpye] = append(eventTypeMap[eventtpye][:i], eventTypeMap[eventtpye][i+1:]...) - } - break - } - } -} - -// 判断监听是否存在 -func (e *EventBus) IsExitEventMap(l *Listener) bool { - e.Lock() - defer e.Unlock() - for _, value := range eventTypeMap { - for _, v := range value { - if (v.Name == l.Name) && (v.URL == l.URL) { - return true - } - } - } - return false -} - -func (e *EventBus) Run(stopCh <-chan struct{}) { - go func() { - <-stopCh - e.Stop() - klog.Warningln("EventBus prepare stop") - }() - go func(e *EventBus) { - for { - select { - case <-e.stop: - klog.Warningln("EventBus success exit ") - return - case m := <-e.event: - e.broadcast(m) - } - } - }(e) - -} - -func (e *EventBus) Stop() { - e.stop <- struct{}{} -} - -func (e *EventBus) publish(m *common.EventMessage) { - e.event <- m -} - -func (e *EventBus) broadcast(msg *common.EventMessage) { - listeners, ok := eventTypeMap[msg.MessageType] - if ok { - for _, listener := range listeners { - r, err := httputils.Post(listener.URL+"/plugin_manage/api/v1/event", &httputils.Params{ - Body: msg, - }) - if err != nil { - logger.Error(listener.Name + "plugin process error:" + err.Error()) - } - if r.StatusCode != http.StatusOK { - logger.Error(listener.Name + "plugin process error:" + strconv.Itoa(r.StatusCode)) - } - } - } -} - -var globalEventBus *EventBus - -func Init(stopCh <-chan struct{}) { - eventTypeMap = make(map[int][]Listener) - globalEventBus = &EventBus{ - event: make(chan *common.EventMessage, 20), - stop: make(chan struct{}), - } - globalEventBus.Run(stopCh) -} - -func Stop() { - globalEventBus.Stop() -} - -func AddListener(l *Listener) { - globalEventBus.AddListener(l) -} - -func RemoveListener(l *Listener) { - globalEventBus.RemoveListener(l) -} - -func PublishEvent(m *common.EventMessage) { - globalEventBus.publish(m) -} - -func AddEventMap(eventtype int, l *Listener) { - globalEventBus.AddEventMap(eventtype, l) -} - -func RemoveEventMap(eventtype int, l *Listener) { - globalEventBus.RemoveEventMap(eventtype, l) -} - -func IsExitEventMap(l *Listener) bool { - return globalEventBus.IsExitEventMap(l) -} -- Gitee