From 9b5a54d1b5f0552e277a0ba421f9ba21b4660c1a Mon Sep 17 00:00:00 2001 From: zhanghan Date: Wed, 4 Dec 2024 14:46:11 +0800 Subject: [PATCH] add handler:query data from db --- event/sdk/common.go | 8 +-- event/server/config.yaml.templete | 5 +- event/server/config/config.go | 3 +- event/server/controller/events.go | 22 +++++++++ event/server/db/db.go | 82 ++++++++++++++++++++++++------- event/server/router/router.go | 7 ++- 6 files changed, 101 insertions(+), 26 deletions(-) create mode 100644 event/server/controller/events.go diff --git a/event/sdk/common.go b/event/sdk/common.go index e621bf08..e7edf2a3 100644 --- a/event/sdk/common.go +++ b/event/sdk/common.go @@ -86,7 +86,7 @@ type MessageData struct { Data interface{} `json:"data"` } -type MDUserSystemSession struct { +type MDUserSystemSession struct { //平台登录、退出 UserName string `json:"user_name"` Email string `json:"email"` } @@ -101,13 +101,13 @@ type MDHostChange struct { // 主机新增、移除、上线、离线 Status string `json:"status"` //在线状态 } -type MDHostPackageOpt struct { +type MDHostPackageOpt struct { //软件包安装、升级、卸载 HostUUID string `json:"host_uuid"` Name string `json:"name"` Version string `json:"version"` } -type MDHostIPChange struct { +type MDHostIPChange struct { //主机ip变更 HostUUID string `json:"host_uuid"` NewIP string `json:"new_ip"` } @@ -117,5 +117,5 @@ type MDPluginChange struct { // 插件新增、移除、上线、离线 Version string `json:"version"` Url string `json:"url"` Description string `json:"description"` - Status string `json:"status"` + Status bool `json:"status"` } diff --git a/event/server/config.yaml.templete b/event/server/config.yaml.templete index 02053303..8d649238 100644 --- a/event/server/config.yaml.templete +++ b/event/server/config.yaml.templete @@ -12,5 +12,6 @@ log: influxd: url: http://localhost:8086 token: "" - organization: "" - bucket: "" \ No newline at end of file + organization: "kylin" + bucket: "test" + measurement: "test_measurement" \ No newline at end of file diff --git a/event/server/config/config.go b/event/server/config/config.go index b2099750..e8352a93 100644 --- a/event/server/config/config.go +++ b/event/server/config/config.go @@ -1,6 +1,6 @@ /* * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo-plugins licensed under the Mulan Permissive Software License, Version 2. + * PilotGo-plugins licensed under the Mulan Permissive Software License, Version 2. * See LICENSE file for more details. * Author: zhanghan2021 * Date: Tue Jun 4 15:19:07 2024 +0800 @@ -34,6 +34,7 @@ type Influxd struct { Token string `yaml:"token"` Organization string `yaml:"organization"` Bucket string `yaml:"bucket"` + Measurement string `yaml:"measurement"` } var config_file string diff --git a/event/server/controller/events.go b/event/server/controller/events.go new file mode 100644 index 00000000..55cbeb15 --- /dev/null +++ b/event/server/controller/events.go @@ -0,0 +1,22 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo-plugins licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Wed Dec 4 14:45:21 2024 +0800 + */ +package controller + +import ( + "gitee.com/openeuler/PilotGo/sdk/response" + "github.com/gin-gonic/gin" + "openeuler.org/PilotGo/PilotGo-plugin-event/db" +) + +func EventsQueryHandler(c *gin.Context) { + _start := c.Query("start") + _stop := c.Query("stop") + searchKey := c.Query("search") + result, _ := db.Query(_start, _stop, searchKey) + response.Success(c, result, "获取到数据") +} diff --git a/event/server/db/db.go b/event/server/db/db.go index 16fecf18..e6d647fb 100644 --- a/event/server/db/db.go +++ b/event/server/db/db.go @@ -1,6 +1,6 @@ /* * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo-plugins licensed under the Mulan Permissive Software License, Version 2. + * PilotGo-plugins licensed under the Mulan Permissive Software License, Version 2. * See LICENSE file for more details. * Author: zhanghan2021 * Date: Fri Jun 7 17:32:09 2024 +0800 @@ -11,6 +11,9 @@ import ( "context" "encoding/json" "fmt" + "regexp" + "sort" + "time" "gitee.com/openeuler/PilotGo-plugins/event/sdk" "gitee.com/openeuler/PilotGo/sdk/logger" @@ -24,6 +27,7 @@ var InfluxDB *InfluxDBClient type InfluxDBClient struct { Organization string Bucket string + Measurement string DBClient influxdb2.Client } @@ -33,45 +37,66 @@ func InfluxdbInit(conf *config.Influxd) { InfluxDB = &InfluxDBClient{ Organization: conf.Organization, Bucket: conf.Bucket, + Measurement: conf.Measurement, DBClient: c, } } -func Query(measurement, start, end string) error { - - query := fmt.Sprintf(`from(bucket:"%s") - |> range(start: -1h) - |> filter(fn: (r) => r._measurement == "%s")`, - InfluxDB.Bucket, measurement) +func Query(start, stop string, filterTagKey string) (interface{}, error) { + query := fmt.Sprintf(` + from(bucket:"%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s") + `, InfluxDB.Bucket, start, stop, InfluxDB.Measurement) + if filterTagKey != "" { + query += fmt.Sprintf(`|> filter(fn: (r) => r.msg_type == "%s")`, filterTagKey) + } queryAPI := InfluxDB.DBClient.QueryAPI(InfluxDB.Organization) - result, err := queryAPI.Query(context.Background(), query) if err != nil { - return err + return result, err } + defer result.Close() + var queryResults []map[string]interface{} for result.Next() { - if result.TableChanged() { - fmt.Printf("table: %s\n", result.TableMetadata().String()) + tags := make(map[string]string) + for k, v := range result.Record().Values() { + if k != "_measurement" && k != "_field" && k != "_value" && k != "_time" && k != "result" { + if tagValue, ok := v.(string); ok { + tags[k] = tagValue + } + } } + queryResults = append(queryResults, map[string]interface{}{ + "value": processValue(result.Record().Value()), + "msg_type": tags["msg_type"], + "time": tags["timestamp"], + }) + } - fmt.Printf("time: %v, field: %v, value: %v\n", result.Record().Time().Format("2006-01-02 15:04:05"), result.Record().Field(), result.Record().Value()) + sort.Slice(queryResults, func(i, j int) bool { + timeFormat := "2006-01-02 15:04:05.999999999 -0700 MST" + time1, err1 := time.Parse(timeFormat, queryResults[i]["time"].(string)) + time2, err2 := time.Parse(timeFormat, queryResults[j]["time"].(string)) - } + if err1 != nil || err2 != nil { + return i < j + } + return time1.After(time2) + }) if result.Err() != nil { - fmt.Printf("query parsing error: %s\n", result.Err().Error()) + return nil, fmt.Errorf("查询数据出错: %v", result.Err()) } - - return nil + return queryResults, nil } func WriteToDB(MessageData string) error { writeAPI := InfluxDB.DBClient.WriteAPIBlocking(InfluxDB.Organization, InfluxDB.Bucket) var msg sdk.MessageData - // err := common.ToMessage(MessageData, &msg) err := json.Unmarshal([]byte(MessageData), &msg) if err != nil { logger.Error("解析数据出错: %v", err.Error()) @@ -85,7 +110,7 @@ func WriteToDB(MessageData string) error { fields := map[string]interface{}{ "metadata": msg.Data, } - point := write.NewPoint("test_measurement", tags, fields, msg.TimeStamp) + point := write.NewPoint(InfluxDB.Measurement, tags, fields, msg.TimeStamp) if err := writeAPI.WritePoint(context.Background(), point); err != nil { logger.Error("写入数据出错: %v", err.Error()) @@ -93,3 +118,24 @@ func WriteToDB(MessageData string) error { } return nil } + +func processValue(v interface{}) interface{} { + value := v.(string) + re := regexp.MustCompile(`(\w+):([^\s\]]+)`) + matches := re.FindAllStringSubmatch(value, -1) + + result := make(map[string]string) + for _, match := range matches { + result[match[1]] = match[2] + } + if len(matches) > 0 { + result := make(map[string]string) + for _, match := range matches { + result[match[1]] = match[2] + } + // return result + jsonBytes, _ := json.Marshal(result) + return string(jsonBytes) + } + return value +} diff --git a/event/server/router/router.go b/event/server/router/router.go index 96e0ea91..b41534d4 100644 --- a/event/server/router/router.go +++ b/event/server/router/router.go @@ -1,6 +1,6 @@ /* * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo-plugins licensed under the Mulan Permissive Software License, Version 2. + * PilotGo-plugins licensed under the Mulan Permissive Software License, Version 2. * See LICENSE file for more details. * Author: zhanghan2021 * Date: Wed Jul 24 09:17:31 2024 +0800 @@ -61,6 +61,11 @@ func registerAPIs(router *gin.Engine) { listener.DELETE("unregister", controller.UnregisterListenerHandler) listener.DELETE("unpluginRegister", controller.UnPliginRegisterListenerHandler) } + + eventQuery := api.Group("") + { + eventQuery.GET("query", controller.EventsQueryHandler) + } } func StaticRouter(router *gin.Engine) { -- Gitee