diff --git a/event/sdk/event.go b/event/sdk/event.go index e484c2eff3f0225f403ecf19205f7a1f611c22b1..51486a00f3b602cfac6c37b210db2183f66f5c35 100644 --- a/event/sdk/event.go +++ b/event/sdk/event.go @@ -4,13 +4,10 @@ import ( "encoding/json" "errors" "io" - "net/http" - "strconv" "gitee.com/openeuler/PilotGo/sdk/common" "gitee.com/openeuler/PilotGo/sdk/logger" "gitee.com/openeuler/PilotGo/sdk/plugin/client" - "gitee.com/openeuler/PilotGo/sdk/utils/httputils" "github.com/gin-gonic/gin" ) @@ -39,40 +36,6 @@ func eventHandler(c *gin.Context) { ProcessEvent(&msg) } -// 发布event事件 -func PublishEvent(msg common.EventMessage) error { - eventServer, err := eventPluginServer() - if err != nil { - return err - } - url := eventServer + "/plugin/event/publish_event" - r, err := httputils.Put(url, &httputils.Params{ - Body: &msg, - }) - if err != nil { - return err - } - if r.StatusCode != http.StatusOK { - return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) - } - - resp := &common.CommonResult{} - if err := json.Unmarshal(r.Body, resp); err != nil { - return err - } - if resp.Code != http.StatusOK { - return errors.New(resp.Message) - } - - data := &struct { - Status string `json:"status"` - Error string `json:"error"` - }{} - if err := resp.ParseData(data); err != nil { - return err - } - return nil -} func eventPluginServer() (string, error) { plugins, err := plugin_client.GetPlugins() if err != nil { diff --git a/event/sdk/listener.go b/event/sdk/listener.go index 028ef32a77ff2c783222c6d09c4784867d4e23dd..f52dbbfb6353d94ee567285acd279687f26f5102 100644 --- a/event/sdk/listener.go +++ b/event/sdk/listener.go @@ -101,3 +101,75 @@ func UnListenEvent(eventTypes []int) error { } return nil } + +// 插件服务退出,取消注册所有本插件的event事件监听 +func UnPluginListenEvent() error { + eventServer, err := eventPluginServer() + if err != nil { + return err + } + + url := eventServer + "/plugin/event/listener/unpluginRegister" + r, err := httputils.Delete(url, &httputils.Params{ + Body: plugin_client.PluginInfo, + }) + if err != nil { + return err + } + if r.StatusCode != http.StatusOK { + return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) + } + + resp := &common.CommonResult{} + if err := json.Unmarshal(r.Body, resp); err != nil { + return err + } + if resp.Code != http.StatusOK { + return errors.New(resp.Message) + } + data := &struct { + EventType []int `json:"eventType"` + Status string `json:"status"` + }{} + if err := resp.ParseData(data); err != nil || data.Status != "ok" { + return err + } + for _, eventType := range data.EventType { + unregisterEventCallback(eventType) + } + return nil +} + +// 发布event事件 +func PublishEvent(msg common.EventMessage) error { + eventServer, err := eventPluginServer() + if err != nil { + return err + } + url := eventServer + "/plugin/event/publishEvent" + r, err := httputils.Put(url, &httputils.Params{ + Body: &msg, + }) + if err != nil { + return err + } + if r.StatusCode != http.StatusOK { + return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) + } + + resp := &common.CommonResult{} + if err := json.Unmarshal(r.Body, resp); err != nil { + return err + } + if resp.Code != http.StatusOK { + return errors.New(resp.Message) + } + data := &struct { + Status string `json:"status"` + Error string `json:"error"` + }{} + if err := resp.ParseData(data); err != nil { + return err + } + return nil +} diff --git a/event/server/controller/listener.go b/event/server/controller/listener.go index c3f260a8aced80908cc9675a160c4442651edf03..c6ac6f049299bea8b0d577d2af7c0268b4c9069b 100644 --- a/event/server/controller/listener.go +++ b/event/server/controller/listener.go @@ -62,6 +62,26 @@ func UnregisterListenerHandler(c *gin.Context) { response.Success(c, gin.H{"status": "ok"}, "删除eventType成功") } +// 取消当前插件的所有event事件注册 +func UnPliginRegisterListenerHandler(c *gin.Context) { + p := client.PluginInfo{} + if err := c.ShouldBind(&p); err != nil { + response.Fail(c, gin.H{"status": false}, err.Error()) + return + } + l := &service.Listener{ + Name: p.Name, + URL: p.Url, + } + + eventTypes := service.GetEventMapTypes(l) + service.RemoveEventMaps(l) + if !service.IsExitEventMap(l) { + service.RemoveListener(l) + } + response.Success(c, gin.H{"eventType": eventTypes, "status": "ok"}, "删除插件event成功") +} + func PublishEventHandler(c *gin.Context) { msg := &common.EventMessage{} if err := c.ShouldBind(msg); err != nil { diff --git a/event/server/router/router.go b/event/server/router/router.go index 6ad73bada6cf28f4ecfe764ce890ceffe4eda269..a5ed86df5cd5fcab9df8859584f7fd4d0c99d657 100644 --- a/event/server/router/router.go +++ b/event/server/router/router.go @@ -4,8 +4,8 @@ import ( "net/http" "strings" + "gitee.com/openeuler/PilotGo-plugins/event/sdk" "gitee.com/openeuler/PilotGo/sdk/logger" - "gitee.com/openeuler/PilotGo/sdk/response" "github.com/gin-gonic/gin" plugin_manage "openeuler.org/PilotGo/PilotGo-plugin-event/client" "openeuler.org/PilotGo/PilotGo-plugin-event/config" @@ -40,31 +40,19 @@ func setupRouter() *gin.Engine { func registerAPIs(router *gin.Engine) { logger.Debug("router register") plugin_manage.EventClient.RegisterHandlers(router) + sdk.RegisterEventHandlers(router, plugin_manage.EventClient) api := router.Group("/plugin/" + plugin_manage.EventClient.PluginInfo.Name) eventpublish := api.Group("") { - eventpublish.PUT("publish_event", controller.PublishEventHandler) + eventpublish.PUT("publishEvent", controller.PublishEventHandler) } listener := api.Group("listener") { listener.PUT("register", controller.RegisterListenerHandler) listener.DELETE("unregister", controller.UnregisterListenerHandler) - } - test := api.Group("") - { - test.GET("test", func(ctx *gin.Context) { - s := plugin_manage.EventClient.Server() - logger.Info("%v", s) - ss, err := plugin_manage.EventClient.GetPlugins() - if err != nil { - response.Fail(ctx, nil, err.Error()) - return - } - response.Success(ctx, ss, "chajian") - }) - + listener.DELETE("unpluginRegister", controller.UnPliginRegisterListenerHandler) } } @@ -82,14 +70,3 @@ func StaticRouter(router *gin.Engine) { c.Status(http.StatusNotFound) }) } - -type PluginInfo struct { - Name string `json:"name"` - Version string `json:"version"` - Description string `json:"description"` - Author string `json:"author"` - Email string `json:"email"` - Url string `json:"url"` - PluginType string `json:"plugin_type"` - ReverseDest string `json:"reverse_dest"` -} diff --git a/event/server/service/listenerservice.go b/event/server/service/listenerservice.go index d629d93b542eafd89648f9acef97f4ab288e2146..ac24697085a091367e5fc923aa4459a21b22f195 100644 --- a/event/server/service/listenerservice.go +++ b/event/server/service/listenerservice.go @@ -53,6 +53,9 @@ func PublishEvent(m *common.EventMessage) { globalEventBus.publish(m) } +func GetEventMapTypes(l *Listener) []int { + return globalEventBus.GetEventMapTypes(l) +} func AddEventMap(eventtype int, l *Listener) { globalEventBus.AddEventMap(eventtype, l) } @@ -61,6 +64,10 @@ func RemoveEventMap(eventtype int, l *Listener) { globalEventBus.RemoveEventMap(eventtype, l) } +func RemoveEventMaps(l *Listener) { + globalEventBus.RemoveEventMaps(l) +} + func IsExitEventMap(l *Listener) bool { return globalEventBus.IsExitEventMap(l) } @@ -89,6 +96,20 @@ func (e *EventBus) RemoveListener(l *Listener) { } } +func (e *EventBus) GetEventMapTypes(l *Listener) []int { + e.Lock() + defer e.Unlock() + var eventTypes []int + for eventType, values := range eventTypeMap { + for _, v := range values { + if v.Name == l.Name && v.URL == l.URL { + eventTypes = append(eventTypes, eventType) + } + } + } + return eventTypes +} + // 添加event事件 func (e *EventBus) AddEventMap(eventtpye int, l *Listener) { e.Lock() @@ -112,6 +133,24 @@ func (e *EventBus) RemoveEventMap(eventtpye int, l *Listener) { } } +// 删除整个插件的event事件 +func (e *EventBus) RemoveEventMaps(l *Listener) { + e.Lock() + defer e.Unlock() + for i, value := range eventTypeMap { + for j, v := range value { + if (v.Name == l.Name) && (v.URL == l.URL) { + if j == len(value)-1 { + eventTypeMap[i] = eventTypeMap[i][:j] + } else { + eventTypeMap[i] = append(eventTypeMap[i][:j], eventTypeMap[i][j+1:]...) + } + break + } + } + } +} + // 判断监听是否存在 func (e *EventBus) IsExitEventMap(l *Listener) bool { e.Lock()