From 3032d2b450f43dbb74a6ac8fa754ffccf1e654e4 Mon Sep 17 00:00:00 2001 From: zhanghan2021 Date: Wed, 24 Jul 2024 10:38:22 +0800 Subject: [PATCH] add event bus related functions adn update vendor --- event/sdk/go.mod | 2 +- event/server/go.mod | 2 +- event/server/go.sum | 4 +- .../{eventservice.go => listenerservice.go} | 19 ++- .../openeuler/PilotGo/sdk/common/event.go | 56 ------ .../PilotGo/sdk/plugin/client/client.go | 14 +- .../PilotGo/sdk/plugin/client/event.go | 160 ------------------ .../PilotGo/sdk/plugin/client/handler.go | 24 --- event/server/vendor/modules.txt | 2 +- 9 files changed, 20 insertions(+), 263 deletions(-) rename event/server/service/{eventservice.go => listenerservice.go} (95%) delete mode 100644 event/server/vendor/gitee.com/openeuler/PilotGo/sdk/common/event.go delete mode 100644 event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/event.go diff --git a/event/sdk/go.mod b/event/sdk/go.mod index a950459f..d8f645aa 100644 --- a/event/sdk/go.mod +++ b/event/sdk/go.mod @@ -1,4 +1,4 @@ -module gitee.com/openeuler/PilotGo-plugin-event/sdk +module gitee.com/openeuler/PilotGo-plugins/event/sdk go 1.21.6 diff --git a/event/server/go.mod b/event/server/go.mod index 655ead3e..eaefcde5 100644 --- a/event/server/go.mod +++ b/event/server/go.mod @@ -2,7 +2,7 @@ module openeuler.org/PilotGo/PilotGo-plugin-event go 1.20 -require gitee.com/openeuler/PilotGo/sdk v0.0.0-20240723080027-961f32bdf2b3 +require gitee.com/openeuler/PilotGo/sdk v0.0.0-20240724015408-b4fa623c3dd4 require ( github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect diff --git a/event/server/go.sum b/event/server/go.sum index 51a65620..1d5d519d 100644 --- a/event/server/go.sum +++ b/event/server/go.sum @@ -36,8 +36,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -gitee.com/openeuler/PilotGo/sdk v0.0.0-20240723080027-961f32bdf2b3 h1:EcPT47pO/qdqdPo1eqAywPphnp5UT33fXGbwOq7VAx8= -gitee.com/openeuler/PilotGo/sdk v0.0.0-20240723080027-961f32bdf2b3/go.mod h1:tqVD4Yq10/XdJnS35zMrxqSU8TFyMKtsG6HLiRzcnFk= +gitee.com/openeuler/PilotGo/sdk v0.0.0-20240724015408-b4fa623c3dd4 h1:ibs45+Xj1lqIXUOsP9//zUBzBiGmo87/5VBkgPNj/tk= +gitee.com/openeuler/PilotGo/sdk v0.0.0-20240724015408-b4fa623c3dd4/go.mod h1:tqVD4Yq10/XdJnS35zMrxqSU8TFyMKtsG6HLiRzcnFk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= diff --git a/event/server/service/eventservice.go b/event/server/service/listenerservice.go similarity index 95% rename from event/server/service/eventservice.go rename to event/server/service/listenerservice.go index 2a535d51..d629d93b 100644 --- a/event/server/service/eventservice.go +++ b/event/server/service/listenerservice.go @@ -15,18 +15,20 @@ type Listener struct { URL string } +var ( + eventTypeMap map[int][]Listener + globalEventBus *EventBus +) + type EventBus struct { sync.Mutex listeners []*Listener stop chan struct{} + wait sync.WaitGroup event chan *common.EventMessage } -var eventTypeMap map[int][]Listener - -var globalEventBus *EventBus - -func Init() { +func EventBusInit() { eventTypeMap = make(map[int][]Listener) globalEventBus = &EventBus{ event: make(chan *common.EventMessage, 20), @@ -125,14 +127,11 @@ func (e *EventBus) IsExitEventMap(l *Listener) bool { } func (e *EventBus) Run() { - go func() { - e.Stop() - }() go func(e *EventBus) { for { select { case <-e.stop: - return + e.wait.Done() case m := <-e.event: e.broadcast(m) } @@ -142,7 +141,9 @@ func (e *EventBus) Run() { } func (e *EventBus) Stop() { + e.wait.Add(1) e.stop <- struct{}{} + e.wait.Done() } func (e *EventBus) publish(m *common.EventMessage) { diff --git a/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/common/event.go b/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/common/event.go deleted file mode 100644 index 21968b54..00000000 --- a/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/common/event.go +++ /dev/null @@ -1,56 +0,0 @@ -package common - -import "encoding/json" - -// event消息类型定义 -const ( - // 主机安装软件包 - MsgPackageInstall = 0 - // 主机升级软件包 - MsgPackageUpdate = 1 - // 主机卸载软件包 - MsgPackageUninstall = 2 - // 主机ip变更 - MsgIPChange = 3 - - // 平台新增主机 - MsgHostAdd = 10 - // 平台移除主机 - MsgHostRemove = 11 - - // 插件添加 - MsgPluginAdd = 20 - // 插件卸载 - MsgPluginRemove = 21 -) - -// 将 MessageData json字符串转换成指定结构体的message消息数据 -func ToMessage(d string, s interface{}) error { - return json.Unmarshal([]byte(d), s) -} - -type MDPackageInstall struct { - HostUUID string - Name string - Version string - Time string -} - -type MDPackageUpdate struct { - HostUUID string - Name string - Version string - Time string -} - -type MDPackageUninstall struct { - HostUUID string - Name string - Version string - Time string -} - -type MDIPChange struct { - HostUUID string - NewIP string -} diff --git a/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/client.go b/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/client.go index 5c936d4b..b6d6ef20 100644 --- a/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/client.go +++ b/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/client.go @@ -23,8 +23,8 @@ type Client struct { token string // 用于event消息处理 - eventChan chan *common.EventMessage - eventCallbackMap map[int]EventCallback + EventChan chan *common.EventMessage + EventCallbackMap map[int]EventCallback // 用于异步command及script执行结果处理机 asyncCmdResultChan chan *common.AsyncCmdResult @@ -50,8 +50,8 @@ func DefaultClient(desc *PluginInfo) *Client { global_client = &Client{ PluginInfo: desc, - eventChan: make(chan *common.EventMessage, 20), - eventCallbackMap: make(map[int]EventCallback), + EventChan: make(chan *common.EventMessage, 20), + EventCallbackMap: make(map[int]EventCallback), asyncCmdResultChan: make(chan *common.AsyncCmdResult, 20), cmdProcessorCallbackMap: make(map[string]CallbackHandler), @@ -90,10 +90,6 @@ func (client *Client) RegisterHandlers(router *gin.Engine) { c.Set("__internal__client_instance", client) }, tagsHandler) - api.POST("/event", func(c *gin.Context) { - c.Set("__internal__client_instance", client) - }, eventHandler) - api.PUT("/command_result", func(c *gin.Context) { c.Set("__internal__client_instance", client) }, commandResultHandler) @@ -108,7 +104,7 @@ func (client *Client) RegisterHandlers(router *gin.Engine) { // } // TODO: start command result process service - client.startEventProcessor() + // client.startEventProcessor() client.startCommandResultProcessor() } diff --git a/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/event.go b/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/event.go deleted file mode 100644 index 8d76833b..00000000 --- a/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/event.go +++ /dev/null @@ -1,160 +0,0 @@ -package client - -import ( - "encoding/json" - "errors" - "net/http" - "strconv" - "strings" - - "gitee.com/openeuler/PilotGo/sdk/common" - "gitee.com/openeuler/PilotGo/sdk/utils/httputils" -) - -type EventCallback func(e *common.EventMessage) - -// 注册event事件监听 -func (c *Client) ListenEvent(eventTypes []int, callbacks []EventCallback) error { - var eventtypes []string - for _, i := range eventTypes { - eventtypes = append(eventtypes, strconv.Itoa(i)) - } - - url := c.Server() + "/api/v1/pluginapi/listener?eventTypes=" + strings.Join(eventtypes, ",") - r, err := httputils.Put(url, &httputils.Params{ - Body: c.PluginInfo, - Cookie: map[string]string{ - TokenCookie: c.token, - }, - }) - if err != nil { - return err - } - if r.StatusCode != http.StatusOK { - return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) - } - - resp := &common.CommonResult{} - if err := json.Unmarshal(r.Body, resp); err != nil { - return err - } - if resp.Code != http.StatusOK { - return errors.New(resp.Message) - } - - data := &struct { - Status string `json:"status"` - Error string `json:"error"` - }{} - if err := resp.ParseData(data); err != nil { - return err - } - for i, eventType := range eventTypes { - c.registerEventCallback(eventType, callbacks[i]) - } - return nil -} - -// 取消注册event事件监听 -func (c *Client) UnListenEvent(eventTypes []int) error { - var eventtypes []string - for _, i := range eventTypes { - eventtypes = append(eventtypes, strconv.Itoa(i)) - } - - url := c.Server() + "/api/v1/pluginapi/listener?eventTypes=" + strings.Join(eventtypes, ",") - r, err := httputils.Delete(url, &httputils.Params{ - Body: c.PluginInfo, - Cookie: map[string]string{ - TokenCookie: c.token, - }, - }) - if err != nil { - return err - } - if r.StatusCode != http.StatusOK { - return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) - } - - resp := &common.CommonResult{} - if err := json.Unmarshal(r.Body, resp); err != nil { - return err - } - if resp.Code != http.StatusOK { - return errors.New(resp.Message) - } - - data := &struct { - Status string `json:"status"` - Error string `json:"error"` - }{} - if err := resp.ParseData(data); err != nil { - return err - } - - for _, eventType := range eventTypes { - c.unregisterEventCallback(eventType) - } - return nil -} - -// 发布event事件 -func (c *Client) PublishEvent(msg common.EventMessage) error { - url := c.Server() + "/api/v1/pluginapi/publish_event" - r, err := httputils.Put(url, &httputils.Params{ - Body: &msg, - Cookie: map[string]string{ - TokenCookie: c.token, - }, - }) - if err != nil { - return err - } - if r.StatusCode != http.StatusOK { - return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) - } - - resp := &common.CommonResult{} - if err := json.Unmarshal(r.Body, resp); err != nil { - return err - } - if resp.Code != http.StatusOK { - return errors.New(resp.Message) - } - - data := &struct { - Status string `json:"status"` - Error string `json:"error"` - }{} - if err := resp.ParseData(data); err != nil { - return err - } - return nil -} - -func (c *Client) registerEventCallback(eventType int, callback EventCallback) { - c.eventCallbackMap[eventType] = callback -} - -func (c *Client) unregisterEventCallback(eventType int) { - delete(c.eventCallbackMap, eventType) -} - -func (c *Client) ProcessEvent(event *common.EventMessage) { - c.eventChan <- event -} - -func (c *Client) startEventProcessor() { - go func() { - for { - e := <-c.eventChan - - // TODO: process event message - cb, ok := c.eventCallbackMap[e.MessageType] - if ok { - cb(e) - } - } - }() - -} diff --git a/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/handler.go b/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/handler.go index b8c9def7..109ffe06 100644 --- a/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/handler.go +++ b/event/server/vendor/gitee.com/openeuler/PilotGo/sdk/plugin/client/handler.go @@ -85,30 +85,6 @@ func bindHandler(c *gin.Context) { response.Success(c, nil, "bind server success") } -func eventHandler(c *gin.Context) { - j, err := io.ReadAll(c.Request.Body) // 接收数据 - if err != nil { - logger.Error("没获取到:%s", err.Error()) - return - } - var msg common.EventMessage - if err := json.Unmarshal(j, &msg); err != nil { - logger.Error("反序列化结果失败%s", err.Error()) - return - } - - v, ok := c.Get("__internal__client_instance") - if !ok { - return - } - client, ok := v.(*Client) - if !ok { - return - } - - client.ProcessEvent(&msg) -} - func commandResultHandler(c *gin.Context) { j, err := io.ReadAll(c.Request.Body) // 接收数据 if err != nil { diff --git a/event/server/vendor/modules.txt b/event/server/vendor/modules.txt index 1486300f..df873bc5 100644 --- a/event/server/vendor/modules.txt +++ b/event/server/vendor/modules.txt @@ -1,4 +1,4 @@ -# gitee.com/openeuler/PilotGo/sdk v0.0.0-20240723080027-961f32bdf2b3 +# gitee.com/openeuler/PilotGo/sdk v0.0.0-20240724015408-b4fa623c3dd4 ## explicit; go 1.20 gitee.com/openeuler/PilotGo/sdk/common gitee.com/openeuler/PilotGo/sdk/logger -- Gitee