diff --git a/event/sdk/common.go b/event/sdk/common.go new file mode 100644 index 0000000000000000000000000000000000000000..c02ce90503c5067ec14988c151713b90619bf964 --- /dev/null +++ b/event/sdk/common.go @@ -0,0 +1,56 @@ +package sdk + +import "encoding/json" + +// event消息类型定义 +const ( + // 主机安装软件包 + MsgPackageInstall = 0 + // 主机升级软件包 + MsgPackageUpdate = 1 + // 主机卸载软件包 + MsgPackageUninstall = 2 + // 主机ip变更 + MsgIPChange = 10 + + // 平台新增主机 + MsgHostAdd = 20 + // 平台移除主机 + MsgHostRemove = 21 + + // 插件添加 + MsgPluginAdd = 30 + // 插件卸载 + MsgPluginRemove = 31 +) + +// 将 MessageData json字符串转换成指定结构体的message消息数据 +func ToMessage(d string, s interface{}) error { + return json.Unmarshal([]byte(d), s) +} + +type MDPackageInstall struct { + HostUUID string + Name string + Version string + Time string +} + +type MDPackageUpdate struct { + HostUUID string + Name string + Version string + Time string +} + +type MDPackageUninstall struct { + HostUUID string + Name string + Version string + Time string +} + +type MDIPChange struct { + HostUUID string + NewIP string +} diff --git a/event/sdk/event.go b/event/sdk/event.go new file mode 100644 index 0000000000000000000000000000000000000000..630740e9e59fc1323159732b93c9309dd2db2466 --- /dev/null +++ b/event/sdk/event.go @@ -0,0 +1,121 @@ +package sdk + +import ( + "encoding/json" + "errors" + "io" + "net/http" + "strconv" + + "gitee.com/openeuler/PilotGo/sdk/common" + "gitee.com/openeuler/PilotGo/sdk/logger" + "gitee.com/openeuler/PilotGo/sdk/utils/httputils" + "github.com/gin-gonic/gin" +) + +func RegisterEventHandlers(router *gin.Engine) { + + api := router.Group("/plugin_manage/api/v1/") + { + api.POST("/event", eventHandler) + } + + startEventProcessor() +} + +func eventHandler(c *gin.Context) { + j, err := io.ReadAll(c.Request.Body) // 接收数据 + if err != nil { + logger.Error("没获取到:%s", err.Error()) + return + } + var msg common.EventMessage + if err := json.Unmarshal(j, &msg); err != nil { + logger.Error("反序列化结果失败%s", err.Error()) + return + } + + ProcessEvent(&msg) +} + +// 发布event事件 +func PublishEvent(msg common.EventMessage) error { + eventServer, err := eventPluginServer() + if err != nil { + return err + } + url := eventServer + "/api/v1/pluginapi/publish_event" + r, err := httputils.Put(url, &httputils.Params{ + Body: &msg, + }) + if err != nil { + return err + } + if r.StatusCode != http.StatusOK { + return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) + } + + resp := &common.CommonResult{} + if err := json.Unmarshal(r.Body, resp); err != nil { + return err + } + if resp.Code != http.StatusOK { + return errors.New(resp.Message) + } + + data := &struct { + Status string `json:"status"` + Error string `json:"error"` + }{} + if err := resp.ParseData(data); err != nil { + return err + } + return nil +} +func eventPluginServer() (string, error) { + plugins, err := plugin_client.GetPlugins() + if err != nil { + return "", err + } + + var eventServer string + for _, p := range plugins { + if p.Name == "event" { + eventServer = p.Url + break + } + } + + if eventServer == "" { + return "", errors.New("event plugin not found") + } + + return eventServer, nil +} + +func registerEventCallback(eventType int, callback EventCallback) { + plugin_client.EventCallbackMap[eventType] = callback +} + +func unregisterEventCallback(eventType int) { + delete(plugin_client.EventCallbackMap, eventType) +} + +func ProcessEvent(event *common.EventMessage) { + plugin_client.EventChan <- event +} + +func startEventProcessor() { + go func() { + for { + e := <-plugin_client.EventChan + + // TODO: process event message + cb, ok := plugin_client.EventCallbackMap[e.MessageType] + if ok { + cb(e) + } + } + }() + +} diff --git a/event/sdk/go.mod b/event/sdk/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..a950459fb1d63c865cb3113673f8bdbfe9826f9e --- /dev/null +++ b/event/sdk/go.mod @@ -0,0 +1,5 @@ +module gitee.com/openeuler/PilotGo-plugin-event/sdk + +go 1.21.6 + +require gitee.com/openeuler/PilotGo/sdk v0.0.0-20240724015408-b4fa623c3dd4 diff --git a/event/sdk/go.sum b/event/sdk/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..9a402747c54d6b4ad45c8d820937d7b0979bc719 --- /dev/null +++ b/event/sdk/go.sum @@ -0,0 +1,4 @@ +gitee.com/openeuler/PilotGo/sdk v0.0.0-20240718083350-ec88f0c9c292 h1:0mtSckC/QWY+peJuA/nI6YgdeGoIO9xBHbBqzz1Cw+U= +gitee.com/openeuler/PilotGo/sdk v0.0.0-20240718083350-ec88f0c9c292/go.mod h1:tqVD4Yq10/XdJnS35zMrxqSU8TFyMKtsG6HLiRzcnFk= +gitee.com/openeuler/PilotGo/sdk v0.0.0-20240724015408-b4fa623c3dd4 h1:ibs45+Xj1lqIXUOsP9//zUBzBiGmo87/5VBkgPNj/tk= +gitee.com/openeuler/PilotGo/sdk v0.0.0-20240724015408-b4fa623c3dd4/go.mod h1:tqVD4Yq10/XdJnS35zMrxqSU8TFyMKtsG6HLiRzcnFk= diff --git a/event/sdk/listener.go b/event/sdk/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..817ae799049afeb9c088ad6380e5f82897e01319 --- /dev/null +++ b/event/sdk/listener.go @@ -0,0 +1,105 @@ +package sdk + +import ( + "encoding/json" + "errors" + "net/http" + "strconv" + "strings" + + "gitee.com/openeuler/PilotGo/sdk/common" + "gitee.com/openeuler/PilotGo/sdk/plugin/client" + "gitee.com/openeuler/PilotGo/sdk/utils/httputils" +) + +var plugin_client = client.GetClient() + +type EventCallback func(e *common.EventMessage) + +// 注册event事件监听 +func ListenEvent(eventTypes []int, callbacks []EventCallback) error { + var eventtypes []string + for _, i := range eventTypes { + eventtypes = append(eventtypes, strconv.Itoa(i)) + } + + eventServer, err := eventPluginServer() + if err != nil { + return err + } + + url := eventServer + "/plugin/event/listener/register?eventTypes=" + strings.Join(eventtypes, ",") + r, err := httputils.Put(url, &httputils.Params{ + Body: plugin_client.PluginInfo, + }) + if err != nil { + return err + } + if r.StatusCode != http.StatusOK { + return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) + } + + resp := &common.CommonResult{} + if err := json.Unmarshal(r.Body, resp); err != nil { + return err + } + if resp.Code != http.StatusOK { + return errors.New(resp.Message) + } + + data := &struct { + Status string `json:"status"` + Error string `json:"error"` + }{} + if err := resp.ParseData(data); err != nil { + return err + } + for i, eventType := range eventTypes { + registerEventCallback(eventType, callbacks[i]) + } + return nil +} + +// 取消注册event事件监听 +func UnListenEvent(eventTypes []int) error { + var eventtypes []string + for _, i := range eventTypes { + eventtypes = append(eventtypes, strconv.Itoa(i)) + } + eventServer, err := eventPluginServer() + if err != nil { + return err + } + + url := eventServer + "/api/v1/pluginapi/listener?eventTypes=" + strings.Join(eventtypes, ",") + r, err := httputils.Delete(url, &httputils.Params{ + Body: plugin_client.PluginInfo, + }) + if err != nil { + return err + } + if r.StatusCode != http.StatusOK { + return errors.New("server process error:" + strconv.Itoa(r.StatusCode)) + } + + resp := &common.CommonResult{} + if err := json.Unmarshal(r.Body, resp); err != nil { + return err + } + if resp.Code != http.StatusOK { + return errors.New(resp.Message) + } + + data := &struct { + Status string `json:"status"` + Error string `json:"error"` + }{} + if err := resp.ParseData(data); err != nil { + return err + } + + for _, eventType := range eventTypes { + unregisterEventCallback(eventType) + } + return nil +}