From 5169f93d33aa2d19e9dda7ef1d11af1eea31ca14 Mon Sep 17 00:00:00 2001 From: zhanghan Date: Tue, 10 Dec 2024 20:06:57 +0800 Subject: [PATCH] Encapsulate microservice universal interface --- sdk/etcd/client/client.go | 61 ----------- sdk/etcd/client/register.go | 160 ----------------------------- sdk/etcd/client/watch.go | 115 --------------------- sdk/etcd/etcd.go | 72 ------------- sdk/etcd/gateway/gateway.go | 155 ---------------------------- sdk/etcd/middleware/middleware.go | 134 ------------------------ sdk/etcd/proxy/balancer.go | 83 --------------- sdk/etcd/proxy/proxy.go | 82 --------------- sdk/go-micro/registry/etcd.go | 164 ++++++++++++++++++++++++++++++ sdk/go-micro/registry/registry.go | 83 +++++++++++++++ sdk/go-micro/registry/service.go | 84 +++++++++++++++ 11 files changed, 331 insertions(+), 862 deletions(-) delete mode 100644 sdk/etcd/client/client.go delete mode 100644 sdk/etcd/client/register.go delete mode 100644 sdk/etcd/client/watch.go delete mode 100644 sdk/etcd/etcd.go delete mode 100644 sdk/etcd/gateway/gateway.go delete mode 100644 sdk/etcd/middleware/middleware.go delete mode 100644 sdk/etcd/proxy/balancer.go delete mode 100644 sdk/etcd/proxy/proxy.go create mode 100644 sdk/go-micro/registry/etcd.go create mode 100644 sdk/go-micro/registry/registry.go create mode 100644 sdk/go-micro/registry/service.go diff --git a/sdk/etcd/client/client.go b/sdk/etcd/client/client.go deleted file mode 100644 index 12f36e3b..00000000 --- a/sdk/etcd/client/client.go +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: zhanghan2021 - * Date: Mon Dec 09 13:56:05 2024 +0800 - */ -package client - -import ( - "context" - "time" - - clientv3 "go.etcd.io/etcd/client/v3" -) - -type Client struct { - client *clientv3.Client - ctx context.Context -} - -// NewClient creates a new etcd client -func NewClient(endpoints []string, dialTimeout time.Duration) (*Client, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, - DialTimeout: dialTimeout, - }) - if err != nil { - return nil, err - } - - return &Client{ - client: cli, - ctx: context.Background(), - }, nil -} - -// Close closes the client -func (c *Client) Close() error { - return c.client.Close() -} - -// Get gets the value for a key -func (c *Client) Get(key string) (*clientv3.GetResponse, error) { - return c.client.Get(c.ctx, key) -} - -// Put puts a key-value pair -func (c *Client) Put(key, value string) (*clientv3.PutResponse, error) { - return c.client.Put(c.ctx, key, value) -} - -// Delete deletes a key -func (c *Client) Delete(key string) (*clientv3.DeleteResponse, error) { - return c.client.Delete(c.ctx, key) -} - -// Watch watches for changes on a key -func (c *Client) Watch(key string) clientv3.WatchChan { - return c.client.Watch(c.ctx, key) -} diff --git a/sdk/etcd/client/register.go b/sdk/etcd/client/register.go deleted file mode 100644 index 15e919e6..00000000 --- a/sdk/etcd/client/register.go +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: zhanghan2021 - * Date: Tue Dec 10 10:17:05 2024 +0800 - */ -package client - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "time" - - clientv3 "go.etcd.io/etcd/client/v3" -) - -type RegisterOptions struct { - Endpoints []string - ServiceName string - ServiceAddr string - Version string - DialTimeout time.Duration -} - -type ServiceInfo struct { - ServiveName string `json:"serviceName"` - Address string `json:"address"` - Port string `json:"port"` - Metadata map[string]string `json:"metadata,omitempty"` -} - -type ServiceRegister struct { - client *Client - leaseID clientv3.LeaseID - servicePath string - serviceInfo *ServiceInfo - keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse - cancel context.CancelFunc // To cancel the keep-alive goroutine -} - -// NewServiceRegister creates a new service register -func NewServiceRegister(client *Client, info *ServiceInfo, ttl int64) (*ServiceRegister, error) { - // 首先检查客户端是否可用 - if client == nil || client.client == nil { - return nil, errors.New("etcd client is not initialized") - } - - ctx, cancel := context.WithCancel(client.ctx) - sr := &ServiceRegister{ - client: client, - serviceInfo: info, - servicePath: fmt.Sprintf("/services/%s", info.ServiveName), - cancel: cancel, - } - - // 检查etcd连接状态 - ctx, cancel = context.WithTimeout(ctx, 3*time.Second) - defer cancel() - - // 尝试简单的etcd操作来验证连接 - _, err := client.client.Get(ctx, "/health", clientv3.WithCountOnly()) - if err != nil { - return nil, err - } - - var opts []clientv3.OpOption - if ttl > 0 { - // Create lease - resp, err := client.client.Grant(ctx, ttl) - if err != nil { - return nil, err - } - sr.leaseID = resp.ID - - // Keep lease alive - keepAliveChan, err := client.client.KeepAlive(context.Background(), resp.ID) - if err != nil { - return nil, err - } - sr.keepAliveChan = keepAliveChan - - opts = append(opts, clientv3.WithLease(sr.leaseID)) - } - - // Register service - if err := sr.register(opts...); err != nil { - return nil, errors.New("failed to register service:" + err.Error()) - } - - // Start keepalive goroutine if using TTL - if ttl > 0 { - go sr.keepAlive() - } - - return sr, nil -} - -// register puts service info into etcd -func (sr *ServiceRegister) register(opts ...clientv3.OpOption) error { - value, err := json.Marshal(sr.serviceInfo) - if err != nil { - return err - } - - _, err = sr.client.client.Put( - context.Background(), - sr.servicePath, - string(value), - opts..., - ) - return err -} -func (sr *ServiceRegister) Deregister() error { - // Cancel the context to stop keep-alive goroutine - if sr.cancel != nil { - sr.cancel() - } - - // Delete the service key from etcd - _, err := sr.client.client.Delete( - context.Background(), - sr.servicePath, - ) - if err != nil { - return fmt.Errorf("failed to deregister service %s: %v", sr.serviceInfo.ServiveName, err) - } - - // Revoke the lease if it exists - if sr.leaseID != 0 { - _, err = sr.client.client.Revoke(context.Background(), sr.leaseID) - if err != nil { - return fmt.Errorf("failed to revoke lease for service %s: %v", sr.serviceInfo.ServiveName, err) - } - } - - return nil -} - -// keepAlive keeps the lease alive -func (sr *ServiceRegister) keepAlive() { - for { - select { - case resp, ok := <-sr.keepAliveChan: - if !ok { - fmt.Printf("Keep-alive channel closed for service %s/%s\n", sr.serviceInfo.ServiveName, sr.serviceInfo.ServiveName) - return - } - if resp == nil { - fmt.Printf("Keep-alive response is nil for service %s/%s\n", sr.serviceInfo.ServiveName, sr.serviceInfo.ServiveName) - return - } - case <-sr.client.ctx.Done(): - fmt.Printf("Context done for service %s/%s\n", sr.serviceInfo.ServiveName, sr.serviceInfo.ServiveName) - return - } - } -} diff --git a/sdk/etcd/client/watch.go b/sdk/etcd/client/watch.go deleted file mode 100644 index d736a536..00000000 --- a/sdk/etcd/client/watch.go +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: zhanghan2021 - * Date: Tue Dec 10 13:56:05 2024 +0800 - */ -package client - -import ( - "context" - "encoding/json" - "fmt" - - clientv3 "go.etcd.io/etcd/client/v3" -) - -// EventType represents the type of event -type EventType int32 - -const ( - EventTypePut EventType = 0 - EventTypeDelete EventType = 1 -) - -type WatchCallback func(eventType EventType, key, value string) - -type Watcher struct { - client *Client - prefix string - callback WatchCallback - ctx context.Context - cancel context.CancelFunc -} - -// NewWatcher creates a new watcher -func NewWatcher(client *Client, prefix string, callback WatchCallback) *Watcher { - ctx, cancel := context.WithCancel(context.Background()) - return &Watcher{ - client: client, - prefix: prefix, - callback: callback, - ctx: ctx, - cancel: cancel, - } -} - -// Start starts watching for changes -func (w *Watcher) Start() { - watchChan := w.client.client.Watch(w.ctx, w.prefix, clientv3.WithPrefix()) - go func() { - for watchResp := range watchChan { - for _, event := range watchResp.Events { - var eventType EventType - switch event.Type { - case clientv3.EventTypePut: - eventType = EventTypePut - case clientv3.EventTypeDelete: - eventType = EventTypeDelete - } - w.callback(eventType, string(event.Kv.Key), string(event.Kv.Value)) - } - } - }() -} - -// Stop stops watching -func (w *Watcher) Stop() { - w.cancel() -} - -// WatchService watches for service changes -func WatchService(client *Client, serviceName string) (*Watcher, error) { - servicePath := fmt.Sprintf("/services/%s/", serviceName) - services := make(map[string]*ServiceInfo) - - callback := func(eventType EventType, key, value string) { - switch eventType { - case EventTypePut: - var service ServiceInfo - if err := json.Unmarshal([]byte(value), &service); err != nil { - fmt.Printf("Failed to unmarshal service info: %v\n", err) - return - } - services[key] = &service - fmt.Printf("Service added/updated: %s at %s:%v\n", service.ServiveName, service.Address, service.Port) - - case EventTypeDelete: - if service, ok := services[key]; ok { - delete(services, key) - fmt.Printf("Service removed: %s\n", service.ServiveName) - } - } - } - - watcher := NewWatcher(client, servicePath, callback) - watcher.Start() - return watcher, nil -} - -// WatchConfig watches for configuration changes -func WatchConfig(client *Client, prefix string) (*Watcher, error) { - callback := func(eventType EventType, key, value string) { - switch eventType { - case EventTypePut: - fmt.Printf("Configuration updated - Key: %s, Value: %s\n", key, value) - case EventTypeDelete: - fmt.Printf("Configuration deleted - Key: %s\n", key) - } - } - - watcher := NewWatcher(client, prefix, callback) - watcher.Start() - return watcher, nil -} diff --git a/sdk/etcd/etcd.go b/sdk/etcd/etcd.go deleted file mode 100644 index 1ea2d9d6..00000000 --- a/sdk/etcd/etcd.go +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: zhanghan2021 - * Date: Tue Dec 10 14:36:05 2024 +0800 - */ -package etcd - -import ( - "context" - "errors" - "os" - "os/signal" - "strings" - "syscall" - - "gitee.com/openeuler/PilotGo/sdk/etcd/client" - "gitee.com/openeuler/PilotGo/sdk/logger" -) - -// SetupEtcdRegistration initializes etcd registration with graceful shutdown -func Register(ctx context.Context, opts *client.RegisterOptions) error { - serviceRegister, err := registerService(opts) - if err != nil { - return err - } - - // Handle graceful shutdown - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - go func() { - select { - case <-sigChan: - case <-ctx.Done(): - } - if serviceRegister != nil { - serviceRegister.Deregister() - } - }() - - return nil -} - -// RegisterToEtcd registers the server to etcd -func registerService(opts *client.RegisterOptions) (*client.ServiceRegister, error) { - // 1. Create etcd client - etcdClient, err := client.NewClient(opts.Endpoints, opts.DialTimeout) - if err != nil { - return nil, errors.New("failed to create etcd client: " + err.Error()) - } - - serviceInfo := &client.ServiceInfo{ - ServiveName: opts.ServiceName, - Address: strings.Split(opts.ServiceAddr, ":")[0], - Port: strings.Split(opts.ServiceAddr, ":")[1], - Metadata: map[string]string{ - "version": opts.Version, - }, - } - - // 2. Register service with TTL - serviceRegister, err := client.NewServiceRegister(etcdClient, serviceInfo, 10) - if err != nil { - etcdClient.Close() - return nil, errors.New("failed to register service: " + err.Error()) - } - logger.Info("service registered to etcd successfully") - - return serviceRegister, nil -} diff --git a/sdk/etcd/gateway/gateway.go b/sdk/etcd/gateway/gateway.go deleted file mode 100644 index ebcbd792..00000000 --- a/sdk/etcd/gateway/gateway.go +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: zhanghan2021 - * Date: Tue Dec 10 13:56:05 2024 +0800 - */ -package gateway - -import ( - "encoding/json" - "fmt" - "net/http" - "net/http/httputil" - "net/url" - "sync" - "time" - - "gitee.com/openeuler/PilotGo/sdk/etcd/client" -) - -// Gateway represents the API gateway -type Gateway struct { - etcdClient *client.Client - services map[string][]*client.ServiceInfo - serviceLock sync.RWMutex - watcher *client.Watcher -} - -// NewGateway creates a new API gateway -func NewGateway(etcdClient *client.Client) *Gateway { - g := &Gateway{ - etcdClient: etcdClient, - services: make(map[string][]*client.ServiceInfo), - } - - // Start watching for service changes - g.watchServices() - return g -} - -// watchServices watches for service changes in etcd -func (g *Gateway) watchServices() { - callback := func(eventType client.EventType, key, value string) { - switch eventType { - case client.EventTypePut: - var service client.ServiceInfo - if err := json.Unmarshal([]byte(value), &service); err != nil { - fmt.Printf("Failed to unmarshal service info: %v\n", err) - return - } - g.addService(&service) - - case client.EventTypeDelete: - g.removeService(key) - } - } - - g.watcher = client.NewWatcher(g.etcdClient, "/services/", callback) - g.watcher.Start() -} - -// addService adds a service to the gateway -func (g *Gateway) addService(service *client.ServiceInfo) { - g.serviceLock.Lock() - defer g.serviceLock.Unlock() - - services := g.services[service.ServiveName] - // Check if service already exists - for i, s := range services { - if s.Address == service.Address { - services[i] = service - return - } - } - // Add new service - g.services[service.ServiveName] = append(services, service) -} - -// removeService removes a service from the gateway -func (g *Gateway) removeService(key string) { - g.serviceLock.Lock() - defer g.serviceLock.Unlock() - - for name, services := range g.services { - for i, service := range services { - if fmt.Sprintf("/services/%s", service.ServiveName) == key { - g.services[name] = append(services[:i], services[i+1:]...) - return - } - } - } -} - -// getService returns a service instance using round-robin load balancing -func (g *Gateway) getService(name string) (*client.ServiceInfo, error) { - g.serviceLock.RLock() - defer g.serviceLock.RUnlock() - - services := g.services[name] - if len(services) == 0 { - return nil, fmt.Errorf("no available services for %s", name) - } - - // Simple round-robin load balancing - service := services[time.Now().UnixNano()%int64(len(services))] - return service, nil -} - -// ProxyHandler handles the proxying of requests to services -func (g *Gateway) ProxyHandler(w http.ResponseWriter, r *http.Request) { - // Extract service name from path - serviceName := r.Header.Get("X-Service-Name") - if serviceName == "" { - http.Error(w, "Service name not specified", http.StatusBadRequest) - return - } - - service, err := g.getService(serviceName) - if err != nil { - http.Error(w, err.Error(), http.StatusServiceUnavailable) - return - } - - // Create the target URL - target := fmt.Sprintf("http://%s:%v", service.Address, service.Port) - targetURL, err := url.Parse(target) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // Create the reverse proxy - proxy := httputil.NewSingleHostReverseProxy(targetURL) - - // Update the headers to allow for SSL redirection - r.URL.Host = targetURL.Host - r.URL.Scheme = targetURL.Scheme - r.Header.Set("X-Forwarded-Host", r.Header.Get("Host")) - - proxy.ServeHTTP(w, r) -} - -// Start starts the gateway server -func (g *Gateway) Start(addr string) error { - http.HandleFunc("/", g.ProxyHandler) - return http.ListenAndServe(addr, nil) -} - -// Stop stops the gateway -func (g *Gateway) Stop() { - if g.watcher != nil { - g.watcher.Stop() - } -} diff --git a/sdk/etcd/middleware/middleware.go b/sdk/etcd/middleware/middleware.go deleted file mode 100644 index 5ccdd2ad..00000000 --- a/sdk/etcd/middleware/middleware.go +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: zhanghan2021 - * Date: Tue Dec 10 14:46:05 2024 +0800 - */ -package middleware - -import ( - "net/http" - "sync" - "time" - - "gitee.com/openeuler/PilotGo/sdk/logger" - "golang.org/x/time/rate" -) - -// Middleware represents a chain of http handlers -type Middleware func(http.Handler) http.Handler - -// Chain chains multiple middleware together -func Chain(middlewares ...Middleware) Middleware { - return func(next http.Handler) http.Handler { - for i := len(middlewares) - 1; i >= 0; i-- { - next = middlewares[i](next) - } - return next - } -} - -// RateLimiter implements rate limiting middleware -type RateLimiter struct { - visitors map[string]*rate.Limiter - mu sync.RWMutex - r rate.Limit - b int -} - -func NewRateLimiter(r rate.Limit, b int) *RateLimiter { - return &RateLimiter{ - visitors: make(map[string]*rate.Limiter), - r: r, - b: b, - } -} - -func (rl *RateLimiter) getLimiter(ip string) *rate.Limiter { - rl.mu.Lock() - defer rl.mu.Unlock() - - limiter, exists := rl.visitors[ip] - if !exists { - limiter = rate.NewLimiter(rl.r, rl.b) - rl.visitors[ip] = limiter - } - - return limiter -} - -func (rl *RateLimiter) RateLimit(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - limiter := rl.getLimiter(r.RemoteAddr) - if !limiter.Allow() { - http.Error(w, "Too many requests", http.StatusTooManyRequests) - return - } - next.ServeHTTP(w, r) - }) -} - -// AuthMiddleware implements authentication middleware -type AuthMiddleware struct { - tokenValidator func(string) bool -} - -func NewAuthMiddleware(validator func(string) bool) *AuthMiddleware { - return &AuthMiddleware{ - tokenValidator: validator, - } -} - -func (am *AuthMiddleware) Authenticate(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - token := r.Header.Get("Authorization") - if token == "" { - http.Error(w, "Authorization token required", http.StatusUnauthorized) - return - } - - if !am.tokenValidator(token) { - http.Error(w, "Invalid token", http.StatusUnauthorized) - return - } - - next.ServeHTTP(w, r) - }) -} - -// LoggingMiddleware implements request logging -func LoggingMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - start := time.Now() - - // Call the next handler - next.ServeHTTP(w, r) - - // Log the request - duration := time.Since(start) - logger.Info( - "%s %s %s %v", - r.Method, - r.RequestURI, - r.RemoteAddr, - duration, - ) - }) -} - -// CORSMiddleware implements CORS support -func CORSMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") - - if r.Method == "OPTIONS" { - w.WriteHeader(http.StatusOK) - return - } - - next.ServeHTTP(w, r) - }) -} diff --git a/sdk/etcd/proxy/balancer.go b/sdk/etcd/proxy/balancer.go deleted file mode 100644 index 06a64a93..00000000 --- a/sdk/etcd/proxy/balancer.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: zhanghan2021 - * Date: Tue Dec 10 14:36:05 2024 +0800 - */ -package proxy - -import ( - "sync" - "sync/atomic" - - "gitee.com/openeuler/PilotGo/sdk/etcd/client" -) - -// LoadBalancer interface defines methods for load balancing -type LoadBalancer interface { - Next(services []*client.ServiceInfo) *client.ServiceInfo -} - -// RoundRobinBalancer implements round-robin load balancing -type RoundRobinBalancer struct { - counter uint64 -} - -func NewRoundRobinBalancer() *RoundRobinBalancer { - return &RoundRobinBalancer{} -} - -func (b *RoundRobinBalancer) Next(services []*client.ServiceInfo) *client.ServiceInfo { - if len(services) == 0 { - return nil - } - count := atomic.AddUint64(&b.counter, 1) - return services[count%uint64(len(services))] -} - -// WeightedRoundRobinBalancer implements weighted round-robin load balancing -type WeightedRoundRobinBalancer struct { - mu sync.Mutex - current int - weights map[string]int -} - -func NewWeightedRoundRobinBalancer() *WeightedRoundRobinBalancer { - return &WeightedRoundRobinBalancer{ - weights: make(map[string]int), - } -} - -func (b *WeightedRoundRobinBalancer) Next(services []*client.ServiceInfo) *client.ServiceInfo { - if len(services) == 0 { - return nil - } - - b.mu.Lock() - defer b.mu.Unlock() - - // Simple weighted round-robin implementation - totalWeight := 0 - for _, service := range services { - weight := b.weights[service.Address] - if weight == 0 { - weight = 1 - } - totalWeight += weight - } - - b.current = (b.current + 1) % totalWeight - for _, service := range services { - weight := b.weights[service.Address] - if weight == 0 { - weight = 1 - } - if b.current < weight { - return service - } - b.current -= weight - } - - return services[0] -} diff --git a/sdk/etcd/proxy/proxy.go b/sdk/etcd/proxy/proxy.go deleted file mode 100644 index 46e34d49..00000000 --- a/sdk/etcd/proxy/proxy.go +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. - * PilotGo licensed under the Mulan Permissive Software License, Version 2. - * See LICENSE file for more details. - * Author: zhanghan2021 - * Date: Tue Dec 10 13:56:05 2024 +0800 - */ -package proxy - -import ( - "fmt" - "net/http" - "net/http/httputil" - "net/url" - "sync" - - "gitee.com/openeuler/PilotGo/sdk/etcd/client" - clientv3 "go.etcd.io/etcd/client/v3" -) - -// Proxy represents the API proxy -type Proxy struct { - client *clientv3.Client - services map[string][]*client.ServiceInfo - serviceLock sync.RWMutex -} - -// NewProxy creates a new API proxy -func NewProxy(cli *clientv3.Client) *Proxy { - return &Proxy{ - client: cli, - services: make(map[string][]*client.ServiceInfo), - } -} - -// Start starts the proxy server -func (p *Proxy) Start(addr string) error { - http.HandleFunc("/", p.ProxyHandler) - return http.ListenAndServe(addr, nil) -} - -// ProxyHandler handles the proxying of requests to services -func (p *Proxy) ProxyHandler(w http.ResponseWriter, r *http.Request) { - serviceName := r.Header.Get("X-Service-Name") - if serviceName == "" { - http.Error(w, "Service name not specified", http.StatusBadRequest) - return - } - - service, err := p.getService(serviceName) - if err != nil { - http.Error(w, fmt.Sprintf("Error getting service: %v", err), http.StatusInternalServerError) - return - } - - targetURL, err := url.Parse(fmt.Sprintf("http://%s:%s", service.Address, service.Port)) - if err != nil { - http.Error(w, fmt.Sprintf("Error parsing service URL: %v", err), http.StatusInternalServerError) - return - } - - proxy := httputil.NewSingleHostReverseProxy(targetURL) - proxy.ServeHTTP(w, r) -} - -// getService returns a service instance using round-robin load balancing -func (p *Proxy) getService(name string) (*client.ServiceInfo, error) { - p.serviceLock.RLock() - defer p.serviceLock.RUnlock() - - services, ok := p.services[name] - if !ok || len(services) == 0 { - return nil, fmt.Errorf("no available services for %s", name) - } - - // Simple round-robin for now - service := services[0] - // Move the first service to the end - p.services[name] = append(services[1:], service) - - return service, nil -} diff --git a/sdk/go-micro/registry/etcd.go b/sdk/go-micro/registry/etcd.go new file mode 100644 index 00000000..a842aefd --- /dev/null +++ b/sdk/go-micro/registry/etcd.go @@ -0,0 +1,164 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Thu Dec 12 17:36:05 2024 +0800 + */ +package registry + +import ( + "context" + "encoding/json" + "fmt" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +type etcdRegistry struct { + client *clientv3.Client + leaseID clientv3.LeaseID + ctx context.Context + servicePath string + serviceInfo *ServiceInfo + keepAlive <-chan *clientv3.LeaseKeepAliveResponse + options *Options + cancel context.CancelFunc +} + +func newEtcdRegistry(opts *Options) (Registry, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: opts.Endpoints, + DialTimeout: opts.DialTimeout, + }) + if err != nil { + return nil, fmt.Errorf("failed to create etcd client: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + return &etcdRegistry{ + client: cli, + ctx: ctx, + cancel: cancel, + options: opts, + }, nil +} + +func (e *etcdRegistry) Register(info *ServiceInfo) error { + e.serviceInfo = info + + // Create lease + lease, err := e.client.Grant(e.ctx, 10) + if err != nil { + return fmt.Errorf("failed to create lease: %v", err) + } + e.leaseID = lease.ID + + key := fmt.Sprintf("/services/%s", info.ServiceName) + value, err := json.Marshal(info) + if err != nil { + return fmt.Errorf("failed to marshal service info: %v", err) + } + e.servicePath = key + + // Register service with lease + _, err = e.client.Put(e.ctx, key, string(value), clientv3.WithLease(lease.ID)) + if err != nil { + return fmt.Errorf("failed to put service info: %v", err) + } + + // Keep lease alive + keepAliveChan, err := e.client.KeepAlive(e.ctx, lease.ID) + if err != nil { + return fmt.Errorf("failed to keep lease alive: %v", err) + } + e.keepAlive = keepAliveChan + + go e.keepAliveLoop() + + return nil +} + +func (e *etcdRegistry) Deregister() error { + // Delete the service key from etcd + _, err := e.client.Delete(e.ctx, e.servicePath) + if err != nil { + return fmt.Errorf("failed to deregister service %s: %v", e.serviceInfo.ServiceName, err) + } + + // Revoke the lease if it exists + if e.leaseID != 0 { + _, err := e.client.Revoke(e.ctx, e.leaseID) + if err != nil { + return fmt.Errorf("failed to revoke lease: %v", err) + } + } + + e.cancel() + return nil +} + +func (e *etcdRegistry) Get(key string) (string, error) { + resp, err := e.client.Get(e.ctx, key) + if err != nil { + return "", err + } + if len(resp.Kvs) == 0 { + return "", nil + } + return string(resp.Kvs[0].Value), nil +} + +func (e *etcdRegistry) Put(key string, value string) error { + _, err := e.client.Put(e.ctx, key, value) + return err +} + +func (e *etcdRegistry) Delete(key string) error { + _, err := e.client.Delete(e.ctx, key) + return err +} + +func (e *etcdRegistry) Close() error { + e.cancel() + return e.client.Close() +} + +func (e *etcdRegistry) Watch(key string, callback WatchCallback) error { + watchChan := e.client.Watch(e.ctx, key, clientv3.WithPrefix()) + go func() { + for resp := range watchChan { + for _, ev := range resp.Events { + var eventType EventType + switch ev.Type { + case clientv3.EventTypePut: + eventType = EventTypePut + case clientv3.EventTypeDelete: + eventType = EventTypeDelete + } + callback(Event{ + Type: eventType, + Key: string(ev.Kv.Key), + Value: string(ev.Kv.Value), + }) + } + } + }() + return nil +} + +func (e *etcdRegistry) keepAliveLoop() { + for { + select { + case resp, ok := <-e.keepAlive: + if !ok { + return + } + if resp == nil { + return + } + case <-e.ctx.Done(): + return + } + } +} diff --git a/sdk/go-micro/registry/registry.go b/sdk/go-micro/registry/registry.go new file mode 100644 index 00000000..5b2bab5e --- /dev/null +++ b/sdk/go-micro/registry/registry.go @@ -0,0 +1,83 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Thu Dec 12 17:36:05 2024 +0800 + */ +package registry + +import ( + "errors" + "time" +) + +// ServiceInfo represents the basic information of a service +type ServiceInfo struct { + ServiceName string `json:"serviceName"` + Address string `json:"address"` + Port string `json:"port"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +// Options represents the configuration options for service registry +type Options struct { + Endpoints []string + ServiceName string + ServiceAddr string + Version string + DialTimeout time.Duration +} + +// EventType represents the type of service registry events +type EventType int32 + +const ( + EventTypePut EventType = 0 + EventTypeDelete EventType = 1 +) + +// Event represents a service registry event +type Event struct { + Type EventType + Key string + Value string +} + +// WatchCallback is the callback function for watch events +type WatchCallback func(event Event) + +// Registry defines the interface for service registry operations +type Registry interface { + // Register registers a service + Register(info *ServiceInfo) error + // Deregister removes a service registration + Deregister() error + // Get retrieves service information + Get(key string) (string, error) + // Put stores service information + Put(key string, value string) error + // Delete removes service information + Delete(key string) error + // Watch watches for service changes + Watch(key string, callback WatchCallback) error + // Close closes the registry client + Close() error +} + +// RegistryType represents the type of registry +type RegistryType string + +const ( + RegistryTypeEtcd RegistryType = "etcd" +) + +// NewRegistry creates a new registry client based on the registry type +func NewRegistry(registryType RegistryType, opts *Options) (Registry, error) { + switch registryType { + case RegistryTypeEtcd: + return newEtcdRegistry(opts) + default: + return nil, errors.New("unsupported registry type") + } +} diff --git a/sdk/go-micro/registry/service.go b/sdk/go-micro/registry/service.go new file mode 100644 index 00000000..cd271228 --- /dev/null +++ b/sdk/go-micro/registry/service.go @@ -0,0 +1,84 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Thu Dec 12 17:36:05 2024 +0800 + */ + +package registry + +import ( + "fmt" + "os" + "os/signal" + "strings" + "syscall" + + "gitee.com/openeuler/PilotGo/sdk/logger" +) + +// ServiceRegistrar handles service registration and lifecycle management +type ServiceRegistrar struct { + registry Registry + info *ServiceInfo +} + +// NewServiceRegistrar creates a new service registrar +func NewServiceRegistrar(opts *Options) error { + // Create registry client + reg, err := NewRegistry(RegistryTypeEtcd, opts) + if err != nil { + return fmt.Errorf("failed to create registry: %v", err) + } + + // Create service info + info := &ServiceInfo{ + ServiceName: opts.ServiceName, + Address: strings.Split(opts.ServiceAddr, ":")[0], + Port: strings.Split(opts.ServiceAddr, ":")[1], + Metadata: map[string]string{"version": opts.Version}, + } + + sr := &ServiceRegistrar{ + registry: reg, + info: info, + } + + if err := sr.Start(); err != nil { + return err + } + + // Handle graceful shutdown + go func() { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan // 等待信号 + + if err := sr.Stop(); err != nil { + logger.Error("failed to stop service registrar: %v", err) + } else { + logger.Info("service deregistered successfully") + } + os.Exit(0) + }() + + logger.Info("service registered to etcd successfully") + return nil +} + +// Start registers the service and starts keeping it alive +func (s *ServiceRegistrar) Start() error { + if err := s.registry.Register(s.info); err != nil { + return fmt.Errorf("failed to register service: %v", err) + } + return nil +} + +// Stop deregisters the service and cleans up resources +func (s *ServiceRegistrar) Stop() error { + if err := s.registry.Deregister(); err != nil { + return fmt.Errorf("failed to deregister service: %v", err) + } + return s.registry.Close() +} -- Gitee