From fb41cc3f184bb0a3792c4085a4df6dd57983d1ab Mon Sep 17 00:00:00 2001 From: zhanghan Date: Tue, 10 Dec 2024 14:05:34 +0800 Subject: [PATCH] etcd services watch modify listen --- sdk/etcd/client.go | 7 +++ sdk/etcd/watch.go | 115 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 sdk/etcd/watch.go diff --git a/sdk/etcd/client.go b/sdk/etcd/client.go index 3e32fd52..1ad00a4d 100644 --- a/sdk/etcd/client.go +++ b/sdk/etcd/client.go @@ -1,3 +1,10 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Mon Dec 09 13:56:05 2024 +0800 + */ package etcd import ( diff --git a/sdk/etcd/watch.go b/sdk/etcd/watch.go new file mode 100644 index 00000000..74b8fe4a --- /dev/null +++ b/sdk/etcd/watch.go @@ -0,0 +1,115 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Tue Dec 10 13:56:05 2024 +0800 + */ +package etcd + +import ( + "context" + "encoding/json" + "fmt" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +// EventType represents the type of event +type EventType int32 + +const ( + EventTypePut EventType = 0 + EventTypeDelete EventType = 1 +) + +type WatchCallback func(eventType EventType, key, value string) + +type Watcher struct { + client *Client + prefix string + callback WatchCallback + ctx context.Context + cancel context.CancelFunc +} + +// NewWatcher creates a new watcher +func NewWatcher(client *Client, prefix string, callback WatchCallback) *Watcher { + ctx, cancel := context.WithCancel(context.Background()) + return &Watcher{ + client: client, + prefix: prefix, + callback: callback, + ctx: ctx, + cancel: cancel, + } +} + +// Start starts watching for changes +func (w *Watcher) Start() { + watchChan := w.client.client.Watch(w.ctx, w.prefix, clientv3.WithPrefix()) + go func() { + for watchResp := range watchChan { + for _, event := range watchResp.Events { + var eventType EventType + switch event.Type { + case clientv3.EventTypePut: + eventType = EventTypePut + case clientv3.EventTypeDelete: + eventType = EventTypeDelete + } + w.callback(eventType, string(event.Kv.Key), string(event.Kv.Value)) + } + } + }() +} + +// Stop stops watching +func (w *Watcher) Stop() { + w.cancel() +} + +// WatchService watches for service changes +func WatchService(client *Client, serviceName string) (*Watcher, error) { + servicePath := fmt.Sprintf("/services/%s/", serviceName) + services := make(map[string]*ServiceInfo) + + callback := func(eventType EventType, key, value string) { + switch eventType { + case EventTypePut: + var service ServiceInfo + if err := json.Unmarshal([]byte(value), &service); err != nil { + fmt.Printf("Failed to unmarshal service info: %v\n", err) + return + } + services[key] = &service + fmt.Printf("Service added/updated: %s at %s:%v\n", service.ServiveName, service.Address, service.Port) + + case EventTypeDelete: + if service, ok := services[key]; ok { + delete(services, key) + fmt.Printf("Service removed: %s\n", service.ServiveName) + } + } + } + + watcher := NewWatcher(client, servicePath, callback) + watcher.Start() + return watcher, nil +} + +// WatchConfig watches for configuration changes +func WatchConfig(client *Client, prefix string) (*Watcher, error) { + callback := func(eventType EventType, key, value string) { + switch eventType { + case EventTypePut: + fmt.Printf("Configuration updated - Key: %s, Value: %s\n", key, value) + case EventTypeDelete: + fmt.Printf("Configuration deleted - Key: %s\n", key) + } + } + + watcher := NewWatcher(client, prefix, callback) + watcher.Start() + return watcher, nil +} -- Gitee