From 02b048a24f0ac65907ba3f48ca7177e6903ba8ea Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Tue, 5 Oct 2021 10:39:53 +0800 Subject: [PATCH 01/11] fix(middle-driver):updata middledriver Init() func --- pkg/middle/define.go | 16 +++++++++++++--- pkg/middle/middleware_driver.go | 34 +++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) create mode 100644 pkg/middle/middleware_driver.go diff --git a/pkg/middle/define.go b/pkg/middle/define.go index 678c75b..fcc5f63 100644 --- a/pkg/middle/define.go +++ b/pkg/middle/define.go @@ -1,7 +1,17 @@ package middle -import getMiddlewareMap "gitee.com/timedb/wheatCache/plugins/config" +import ( + "sync" -func Init() { - getMiddlewareMap.GetMiddlewareMap() + "gitee.com/timedb/wheatCache/plugins" + getMiddlewareMap "gitee.com/timedb/wheatCache/plugins/config" +) + +func Init() map[string]plugins.MiddleToolsInterface { + return getMiddlewareMap.GetMiddlewareMap() } + +var ( + oneMiddle sync.Once + MiddleWareDriver *MiddleWare +) diff --git a/pkg/middle/middleware_driver.go b/pkg/middle/middleware_driver.go new file mode 100644 index 0000000..5c58e03 --- /dev/null +++ b/pkg/middle/middleware_driver.go @@ -0,0 +1,34 @@ +package middle + +import ( + "context" + "fmt" + + "gitee.com/timedb/wheatCache/pkg/event" +) + +type MiddleWare struct { + eventDriver event.DriverInterface + eventConsumer event.ConsumerInterface +} + +func NewMiddleWare() *MiddleWare { + oneMiddle.Do(func() { + driver := event.NewDriver(1000) + MiddleWareDriver = &MiddleWare{ + eventDriver: driver, + eventConsumer: event.NewConsumer(driver), + } + }) + return MiddleWareDriver +} + +func (m *MiddleWare) GetEventDriver() event.DriverInterface { + return m.eventDriver +} + +func (m *MiddleWare) work(ctx context.Context) { + getMiddlewareMap := Init() + fmt.Println(getMiddlewareMap) + +} -- Gitee From e08646b6a120d6d2b3bb8fcbf23cf0053fb2dcaf Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Tue, 5 Oct 2021 20:52:01 +0800 Subject: [PATCH 02/11] feat(middleware_driver): add middle driver --- conf/wheat-cache.yaml | 5 +++ pkg/middle/middleware_driver.go | 12 ++++--- pkg/middle/middleware_driver_test.go | 50 ++++++++++++++++++++++++++++ plugins/define.go | 8 +++-- plugins/log-middle/middleware.go | 8 ++--- 5 files changed, 71 insertions(+), 12 deletions(-) create mode 100644 pkg/middle/middleware_driver_test.go diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index 981dc43..0a71f5d 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -14,8 +14,13 @@ lruCache: eventDriverSize: 2000 workTime: 1 + logPrint: stath: [ "debug", "error" ] + +middleChain: + middles:["logMiddle","mapKey"] + diff --git a/pkg/middle/middleware_driver.go b/pkg/middle/middleware_driver.go index 5c58e03..7d1d45f 100644 --- a/pkg/middle/middleware_driver.go +++ b/pkg/middle/middleware_driver.go @@ -2,9 +2,10 @@ package middle import ( "context" - "fmt" + _ "gitee.com/timedb/wheatCache/conf" "gitee.com/timedb/wheatCache/pkg/event" + "github.com/spf13/viper" ) type MiddleWare struct { @@ -28,7 +29,10 @@ func (m *MiddleWare) GetEventDriver() event.DriverInterface { } func (m *MiddleWare) work(ctx context.Context) { - getMiddlewareMap := Init() - fmt.Println(getMiddlewareMap) - + middlewareMap := Init() + event := m.eventConsumer.Receive(ctx) + middleConf := viper.GetStringSlice("middleChain.middles") + for _, middle := range middleConf { + middlewareMap[middle].Exec(event) + } } diff --git a/pkg/middle/middleware_driver_test.go b/pkg/middle/middleware_driver_test.go new file mode 100644 index 0000000..f5856f6 --- /dev/null +++ b/pkg/middle/middleware_driver_test.go @@ -0,0 +1,50 @@ +package middle + +import ( + "context" + "fmt" + "testing" + + "gitee.com/timedb/wheatCache/pkg/event" +) + +func Test_middleware_driver(t *testing.T) { + + ctx := context.Background() + type Library struct { + driver *MiddleWare + } + + library := &Library{ + driver: NewMiddleWare(), + } + + go library.driver.work(ctx) + + type A struct { + produce event.ProduceInterface + } + + a := &A{ + produce: event.NewProduce(library.driver.eventDriver), + } + + type B struct { + cumsumer event.ConsumerInterface + } + + b := &B{ + cumsumer: event.NewConsumer(library.driver.eventDriver), + } + + book := event.NewEvent("book") + book.SetMsg("title", "goland") + book.SetValue("page", 100) + + go func() { + a.produce.Call(ctx, book) + }() + + b_book := b.cumsumer.Receive(ctx) + fmt.Println(b_book.GetMsg("title")) +} diff --git a/plugins/define.go b/plugins/define.go index 37538c9..bd438d8 100644 --- a/plugins/define.go +++ b/plugins/define.go @@ -1,7 +1,9 @@ package plugins +import "gitee.com/timedb/wheatCache/pkg/event" + type MiddleToolsInterface interface { - Init() // 初始化 - Exec(interface{}) (interface{}, error) // 处理用户发送事件 - Name() string // 获取中间件名称 + Init() // 初始化 + Exec(e event.Event) (event.Event, error) // 处理用户发送事件 + Name() string // 获取中间件名称 } diff --git a/plugins/log-middle/middleware.go b/plugins/log-middle/middleware.go index c6ff102..23e8f03 100644 --- a/plugins/log-middle/middleware.go +++ b/plugins/log-middle/middleware.go @@ -1,8 +1,7 @@ package log_middle import ( - "fmt" - + "gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/plugins" ) @@ -12,9 +11,8 @@ type logMiddle struct { func (i *logMiddle) Init() { } -func (i *logMiddle) Exec(interface{}) (interface{}, error) { - fmt.Println(1) - return nil, nil +func (i *logMiddle) Exec(e event.Event) (event.Event, error) { + return e, nil } func (i *logMiddle) Name() string { -- Gitee From 06b7c05a6d23da7f42290ad5985b7d55571439b6 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Fri, 8 Oct 2021 19:57:25 +0800 Subject: [PATCH 03/11] fix(middle-template): update middle template --- plugins/config/middle.gen.go | 6 ++++-- plugins/config/middle.template | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/plugins/config/middle.gen.go b/plugins/config/middle.gen.go index addf8c1..15a055c 100644 --- a/plugins/config/middle.gen.go +++ b/plugins/config/middle.gen.go @@ -12,9 +12,11 @@ import ( func GetMiddlewareMap() map[string]plugins.MiddleToolsInterface { + logMiddle := logMiddle.NewMiddleware() + mapKey := mapKey.NewMiddleware() return map[string]plugins.MiddleToolsInterface{ - "logMiddle": logMiddle.NewMiddleware(), - "mapKey": mapKey.NewMiddleware(), + logMiddle.Name(): logMiddle, + mapKey.Name(): mapKey, } } diff --git a/plugins/config/middle.template b/plugins/config/middle.template index 10b56e8..d904530 100644 --- a/plugins/config/middle.template +++ b/plugins/config/middle.template @@ -12,10 +12,12 @@ import ( func GetMiddlewareMap() map[string]plugins.MiddleToolsInterface { - + {%for dir in dirs %} + {{dir[0]}}:={{dir[0]}}.NewMiddleware() + {%- endfor%} return map[string]plugins.MiddleToolsInterface{ {%for dir in dirs %} - "{{dir[0]}}":{{dir[0]}}.NewMiddleware(), + {{dir[0]}}.Name():{{dir[0]}}, {%- endfor%} } } \ No newline at end of file -- Gitee From 55f5d7ecd9714521a90c6592d2ce40d55944a475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=8E=E7=99=BD=E5=8D=97?= <1762386214@qq.com> Date: Fri, 8 Oct 2021 21:04:54 +0800 Subject: [PATCH 04/11] fix(log) fix produce msg, remove more route --- pkg/logx/logx.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/logx/logx.go b/pkg/logx/logx.go index 8ae7ebd..1f70005 100644 --- a/pkg/logx/logx.go +++ b/pkg/logx/logx.go @@ -40,7 +40,12 @@ func (l *upLogger) Print(level string, format string, msg ...interface{}) { Print(level, format, msg...) eventMiddle := event.NewEvent(middleMsg.EventNameLog) - eventMiddle.SetValue(middleMsg.EventKeyLog, middleMsg.LogContext{}) + eventMiddle.SetValue(middleMsg.EventKeyLog, middleMsg.LogContext{ + Level: level, + Data: time.Now(), + Msg: fmt.Sprintf(format, msg...), + Route: findPlace(), + }) l.produce.Call(l.ctx, eventMiddle) } @@ -80,7 +85,7 @@ func findPlace() string { var ( place string - i = 0 + i = 3 ) for { -- Gitee From f351183ee990ec23e7adba6e8d52788124930cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=8E=E7=99=BD=E5=8D=97?= <1762386214@qq.com> Date: Fri, 8 Oct 2021 21:19:04 +0800 Subject: [PATCH 05/11] fix(log) fix produce msg, remove more route --- pkg/logx/logx.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/logx/logx.go b/pkg/logx/logx.go index 1f70005..a173a73 100644 --- a/pkg/logx/logx.go +++ b/pkg/logx/logx.go @@ -11,7 +11,12 @@ import ( "time" ) +var ( + floor = 3 +) + func With(ctx context.Context, p event.ProduceInterface) *upLogger { + floor = 4 return &upLogger{ ctx: ctx, produce: p, @@ -85,7 +90,7 @@ func findPlace() string { var ( place string - i = 3 + i = floor ) for { -- Gitee From 79bef5ba728787be333201bccea914eedf340037 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 17:43:34 +0800 Subject: [PATCH 06/11] feat(middle): add middle worker function --- pkg/middle/worker.go | 17 +++++++++++++++++ pkg/middle/worker_test.go | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 pkg/middle/worker.go create mode 100644 pkg/middle/worker_test.go diff --git a/pkg/middle/worker.go b/pkg/middle/worker.go new file mode 100644 index 0000000..7d78b2d --- /dev/null +++ b/pkg/middle/worker.go @@ -0,0 +1,17 @@ +package middle + +import ( + "context" +) + +func (m *MiddleWare) middleWorker() { + ctx := context.Background() + workEvent := m.eventConsumer.Receive(ctx) + m.loadPlugins() + for _, singles := range m.plugins { + for _, single := range singles { + single.Exec(workEvent) + } + } + +} diff --git a/pkg/middle/worker_test.go b/pkg/middle/worker_test.go new file mode 100644 index 0000000..4bdbcf5 --- /dev/null +++ b/pkg/middle/worker_test.go @@ -0,0 +1,16 @@ +package middle + +import ( + "testing" + + "gitee.com/timedb/wheatCache/pkg/event" +) + +func TestWorker(t *testing.T) { + event := event.NewEvent("LogContext") + m := NewMiddleWare() + m.eventDriver.Put(event) + + m.middleWorker() + +} -- Gitee From dca19238da6b59feccfe690b3019f15506d6fc5a Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 17:50:54 +0800 Subject: [PATCH 07/11] feat(middle): add middle worker function --- pkg/middle/worker.go | 1 + pkg/middle/worker_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/middle/worker.go b/pkg/middle/worker.go index 7d78b2d..67eadb1 100644 --- a/pkg/middle/worker.go +++ b/pkg/middle/worker.go @@ -11,6 +11,7 @@ func (m *MiddleWare) middleWorker() { for _, singles := range m.plugins { for _, single := range singles { single.Exec(workEvent) + } } diff --git a/pkg/middle/worker_test.go b/pkg/middle/worker_test.go index 4bdbcf5..332d7ef 100644 --- a/pkg/middle/worker_test.go +++ b/pkg/middle/worker_test.go @@ -8,6 +8,7 @@ import ( func TestWorker(t *testing.T) { event := event.NewEvent("LogContext") + m := NewMiddleWare() m.eventDriver.Put(event) -- Gitee From 60a4a4a3de6440420d7fcff91b7a574d2fbddd1d Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 17:55:37 +0800 Subject: [PATCH 08/11] fix(middle):fix wheat-cache.yaml --- conf/wheat-cache.yaml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/conf/wheat-cache.yaml b/conf/wheat-cache.yaml index 0a71f5d..fa196c3 100644 --- a/conf/wheat-cache.yaml +++ b/conf/wheat-cache.yaml @@ -21,6 +21,10 @@ logPrint: "error" ] -middleChain: - middles:["logMiddle","mapKey"] +middleware-driver: + driverCount: 1000, + middleConsumerCount: 5 + +plugins-control: + LogContext: ["logMiddle"] -- Gitee From 12f594b24684e8c59b0013fc5dda67266d20af2e Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 18:22:06 +0800 Subject: [PATCH 09/11] feat(middle): add middledriver loadPlugins function --- pkg/middle/middleware_driver.go | 30 ++++++++++++++----- pkg/middle/middleware_driver_test.go | 44 ++-------------------------- 2 files changed, 25 insertions(+), 49 deletions(-) diff --git a/pkg/middle/middleware_driver.go b/pkg/middle/middleware_driver.go index 7d1d45f..656b3dc 100644 --- a/pkg/middle/middleware_driver.go +++ b/pkg/middle/middleware_driver.go @@ -1,16 +1,18 @@ package middle import ( - "context" - _ "gitee.com/timedb/wheatCache/conf" "gitee.com/timedb/wheatCache/pkg/event" + "gitee.com/timedb/wheatCache/plugins" + pcg "gitee.com/timedb/wheatCache/plugins/config" "github.com/spf13/viper" ) type MiddleWare struct { eventDriver event.DriverInterface eventConsumer event.ConsumerInterface + plugins map[string][]plugins.MiddleToolsInterface + consumerCount int } func NewMiddleWare() *MiddleWare { @@ -20,6 +22,8 @@ func NewMiddleWare() *MiddleWare { eventDriver: driver, eventConsumer: event.NewConsumer(driver), } + + MiddleWareDriver.loadPlugins() }) return MiddleWareDriver } @@ -28,11 +32,21 @@ func (m *MiddleWare) GetEventDriver() event.DriverInterface { return m.eventDriver } -func (m *MiddleWare) work(ctx context.Context) { - middlewareMap := Init() - event := m.eventConsumer.Receive(ctx) - middleConf := viper.GetStringSlice("middleChain.middles") - for _, middle := range middleConf { - middlewareMap[middle].Exec(event) +func (m *MiddleWare) loadPlugins() { + plug := viper.GetStringMapStringSlice("plugins-control") + + pluginsMap := pcg.GetMiddlewareMap() + + pluginsContext := make(map[string][]plugins.MiddleToolsInterface) + + for middleMsg, pluNames := range plug { + pulgSingle := make([]plugins.MiddleToolsInterface, 0) + for _, name := range pluNames { + pulgSingle = append(pulgSingle, pluginsMap[name]) + } + + pluginsContext[middleMsg] = pulgSingle } + + m.plugins = pluginsContext } diff --git a/pkg/middle/middleware_driver_test.go b/pkg/middle/middleware_driver_test.go index f5856f6..536be6d 100644 --- a/pkg/middle/middleware_driver_test.go +++ b/pkg/middle/middleware_driver_test.go @@ -1,50 +1,12 @@ package middle import ( - "context" - "fmt" "testing" - "gitee.com/timedb/wheatCache/pkg/event" + "github.com/stretchr/testify/require" ) func Test_middleware_driver(t *testing.T) { - - ctx := context.Background() - type Library struct { - driver *MiddleWare - } - - library := &Library{ - driver: NewMiddleWare(), - } - - go library.driver.work(ctx) - - type A struct { - produce event.ProduceInterface - } - - a := &A{ - produce: event.NewProduce(library.driver.eventDriver), - } - - type B struct { - cumsumer event.ConsumerInterface - } - - b := &B{ - cumsumer: event.NewConsumer(library.driver.eventDriver), - } - - book := event.NewEvent("book") - book.SetMsg("title", "goland") - book.SetValue("page", 100) - - go func() { - a.produce.Call(ctx, book) - }() - - b_book := b.cumsumer.Receive(ctx) - fmt.Println(b_book.GetMsg("title")) + middleware := NewMiddleWare() + require.Equal(t, middleware.plugins["logcontext"][0].Name(), "logMiddle") } -- Gitee From 67747a5b6a55c82d74972b13ee5b9768285eb4ec Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 18:33:42 +0800 Subject: [PATCH 10/11] fix(middle): add middle worker function --- pkg/middle-msg/logx.go | 2 +- pkg/middle/worker.go | 14 +++++++++++--- pkg/middle/worker_test.go | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/middle-msg/logx.go b/pkg/middle-msg/logx.go index 67d887d..dcd9a83 100644 --- a/pkg/middle-msg/logx.go +++ b/pkg/middle-msg/logx.go @@ -3,7 +3,7 @@ package middle_msg import "time" var ( - EventNameLog = "LogContext" + EventNameLog = "logcontext" EventKeyLog = "LogContext" ) diff --git a/pkg/middle/worker.go b/pkg/middle/worker.go index 67eadb1..d617901 100644 --- a/pkg/middle/worker.go +++ b/pkg/middle/worker.go @@ -2,15 +2,23 @@ package middle import ( "context" + + msg "gitee.com/timedb/wheatCache/pkg/middle-msg" ) func (m *MiddleWare) middleWorker() { ctx := context.Background() workEvent := m.eventConsumer.Receive(ctx) m.loadPlugins() - for _, singles := range m.plugins { - for _, single := range singles { - single.Exec(workEvent) + switch workEvent.GetEventName() { + case msg.EventNameLog: + for middleMsg, singles := range m.plugins { + + if middleMsg == msg.EventNameLog { + for _, single := range singles { + single.Exec(workEvent) + } + } } } diff --git a/pkg/middle/worker_test.go b/pkg/middle/worker_test.go index 332d7ef..0f36a26 100644 --- a/pkg/middle/worker_test.go +++ b/pkg/middle/worker_test.go @@ -7,7 +7,7 @@ import ( ) func TestWorker(t *testing.T) { - event := event.NewEvent("LogContext") + event := event.NewEvent("logcontext") m := NewMiddleWare() m.eventDriver.Put(event) -- Gitee From 89ee969ef78dec65180369f0f1576ba291c7b286 Mon Sep 17 00:00:00 2001 From: Sodesnei <1452401269@qq.com> Date: Sat, 9 Oct 2021 18:46:57 +0800 Subject: [PATCH 11/11] fix(middleware): fix plugins Interface --- plugins/define.go | 7 ++++--- plugins/log-middle/middleware.go | 9 ++++++++- plugins/map-key/middleware.go | 13 ++++++++++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/plugins/define.go b/plugins/define.go index bd438d8..c8144dc 100644 --- a/plugins/define.go +++ b/plugins/define.go @@ -3,7 +3,8 @@ package plugins import "gitee.com/timedb/wheatCache/pkg/event" type MiddleToolsInterface interface { - Init() // 初始化 - Exec(e event.Event) (event.Event, error) // 处理用户发送事件 - Name() string // 获取中间件名称 + Init() // 初始化 + Exec(e *event.Event) (*event.Event, error) // 处理用户发送事件 + Name() string // 获取中间件名称 + Describe() string // 描述 } diff --git a/plugins/log-middle/middleware.go b/plugins/log-middle/middleware.go index 23e8f03..a9873c0 100644 --- a/plugins/log-middle/middleware.go +++ b/plugins/log-middle/middleware.go @@ -1,6 +1,8 @@ package log_middle import ( + "fmt" + "gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/plugins" ) @@ -11,7 +13,9 @@ type logMiddle struct { func (i *logMiddle) Init() { } -func (i *logMiddle) Exec(e event.Event) (event.Event, error) { +func (i *logMiddle) Exec(e *event.Event) (*event.Event, error) { + + fmt.Println(e.GetEventName()) return e, nil } @@ -19,6 +23,9 @@ func (i *logMiddle) Name() string { return "logMiddle" } +func (i *logMiddle) Describe() string { + return "" +} func NewMiddleware() plugins.MiddleToolsInterface { return &logMiddle{} } diff --git a/plugins/map-key/middleware.go b/plugins/map-key/middleware.go index 31fa509..bcccac1 100644 --- a/plugins/map-key/middleware.go +++ b/plugins/map-key/middleware.go @@ -1,6 +1,9 @@ package log_middle import ( + "fmt" + + "gitee.com/timedb/wheatCache/pkg/event" "gitee.com/timedb/wheatCache/plugins" ) @@ -10,15 +13,19 @@ type mapKey struct { func (i *mapKey) Init() { } -func (i *mapKey) Exec(interface{}) (interface{}, error) { - - return nil, nil +func (i *mapKey) Exec(e *event.Event) (*event.Event, error) { + fmt.Println(e.GetEventName()) + return e, nil } func (i *mapKey) Name() string { return "mapKey" } +func (i *mapKey) Describe() string { + return "" +} + func NewMiddleware() plugins.MiddleToolsInterface { return &mapKey{} } -- Gitee