From 8e14cfcc442f8b03cf31410bdba47c92e831f2fe Mon Sep 17 00:00:00 2001 From: zhanghan Date: Tue, 10 Dec 2024 17:56:49 +0800 Subject: [PATCH] Microservice transformation, adding gateway, load balancing and other related code --- sdk/etcd/{ => client}/client.go | 2 +- sdk/etcd/{ => client}/register.go | 91 +------------------ sdk/etcd/{ => client}/watch.go | 2 +- sdk/etcd/etcd.go | 72 +++++++++++++++ sdk/etcd/gateway/gateway.go | 26 +++--- .../{gateway => middleware}/middleware.go | 2 +- sdk/etcd/{gateway => proxy}/balancer.go | 70 ++------------ sdk/etcd/proxy/proxy.go | 82 +++++++++++++++++ 8 files changed, 181 insertions(+), 166 deletions(-) rename sdk/etcd/{ => client}/client.go (98%) rename sdk/etcd/{ => client}/register.go (61%) rename sdk/etcd/{ => client}/watch.go (99%) create mode 100644 sdk/etcd/etcd.go rename sdk/etcd/{gateway => middleware}/middleware.go (99%) rename sdk/etcd/{gateway => proxy}/balancer.go (46%) create mode 100644 sdk/etcd/proxy/proxy.go diff --git a/sdk/etcd/client.go b/sdk/etcd/client/client.go similarity index 98% rename from sdk/etcd/client.go rename to sdk/etcd/client/client.go index 1ad00a4d..12f36e3b 100644 --- a/sdk/etcd/client.go +++ b/sdk/etcd/client/client.go @@ -5,7 +5,7 @@ * Author: zhanghan2021 * Date: Mon Dec 09 13:56:05 2024 +0800 */ -package etcd +package client import ( "context" diff --git a/sdk/etcd/register.go b/sdk/etcd/client/register.go similarity index 61% rename from sdk/etcd/register.go rename to sdk/etcd/client/register.go index 75f92009..15e919e6 100644 --- a/sdk/etcd/register.go +++ b/sdk/etcd/client/register.go @@ -5,20 +5,15 @@ * Author: zhanghan2021 * Date: Tue Dec 10 10:17:05 2024 +0800 */ -package etcd +package client import ( "context" "encoding/json" "errors" "fmt" - "os" - "os/signal" - "strings" - "syscall" "time" - "gitee.com/openeuler/PilotGo/sdk/logger" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -46,88 +41,8 @@ type ServiceRegister struct { cancel context.CancelFunc // To cancel the keep-alive goroutine } -// SetupEtcdRegistration initializes etcd registration with graceful shutdown -func Register(ctx context.Context, opts *RegisterOptions) error { - serviceRegister, err := registerService(opts) - if err != nil { - return err - } - - // Handle graceful shutdown - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - go func() { - select { - case <-sigChan: - case <-ctx.Done(): - } - if serviceRegister != nil { - serviceRegister.deregister() - } - }() - - return nil -} - -// Deregister removes the service from etcd - -// GetAllServices returns all registered services -func (c *Client) GetAllServices() (map[string][]*ServiceInfo, error) { - services := make(map[string][]*ServiceInfo) - - // Get all services with prefix "/services/" - resp, err := c.client.Get(context.Background(), "/services/", clientv3.WithPrefix()) - if err != nil { - return nil, fmt.Errorf("failed to get services: %v", err) - } - - // Process each service - for _, kv := range resp.Kvs { - var serviceInfo ServiceInfo - err := json.Unmarshal(kv.Value, &serviceInfo) - if err != nil { - fmt.Printf("Failed to unmarshal service info: %v\n", err) - continue - } - - services[serviceInfo.ServiveName] = append(services[serviceInfo.ServiveName], &serviceInfo) - } - - return services, nil -} - -// RegisterToEtcd registers the server to etcd -func registerService(opts *RegisterOptions) (*ServiceRegister, error) { - // Create etcd client - etcdClient, err := NewClient(opts.Endpoints, opts.DialTimeout) - if err != nil { - return nil, errors.New("failed to create etcd client: " + err.Error()) - } - - // Create service info - serviceInfo := &ServiceInfo{ - ServiveName: opts.ServiceName, - Address: strings.Split(opts.ServiceAddr, ":")[0], - Port: strings.Split(opts.ServiceAddr, ":")[1], - Metadata: map[string]string{ - "version": opts.Version, - }, - } - - // Register service with TTL - serviceRegister, err := newServiceRegister(etcdClient, serviceInfo, 10) - if err != nil { - etcdClient.Close() - return nil, errors.New("failed to register service: " + err.Error()) - } - logger.Info("service registered to etcd successfully") - - return serviceRegister, nil -} - // NewServiceRegister creates a new service register -func newServiceRegister(client *Client, info *ServiceInfo, ttl int64) (*ServiceRegister, error) { +func NewServiceRegister(client *Client, info *ServiceInfo, ttl int64) (*ServiceRegister, error) { // 首先检查客户端是否可用 if client == nil || client.client == nil { return nil, errors.New("etcd client is not initialized") @@ -198,7 +113,7 @@ func (sr *ServiceRegister) register(opts ...clientv3.OpOption) error { ) return err } -func (sr *ServiceRegister) deregister() error { +func (sr *ServiceRegister) Deregister() error { // Cancel the context to stop keep-alive goroutine if sr.cancel != nil { sr.cancel() diff --git a/sdk/etcd/watch.go b/sdk/etcd/client/watch.go similarity index 99% rename from sdk/etcd/watch.go rename to sdk/etcd/client/watch.go index 74b8fe4a..d736a536 100644 --- a/sdk/etcd/watch.go +++ b/sdk/etcd/client/watch.go @@ -5,7 +5,7 @@ * Author: zhanghan2021 * Date: Tue Dec 10 13:56:05 2024 +0800 */ -package etcd +package client import ( "context" diff --git a/sdk/etcd/etcd.go b/sdk/etcd/etcd.go new file mode 100644 index 00000000..1ea2d9d6 --- /dev/null +++ b/sdk/etcd/etcd.go @@ -0,0 +1,72 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Tue Dec 10 14:36:05 2024 +0800 + */ +package etcd + +import ( + "context" + "errors" + "os" + "os/signal" + "strings" + "syscall" + + "gitee.com/openeuler/PilotGo/sdk/etcd/client" + "gitee.com/openeuler/PilotGo/sdk/logger" +) + +// SetupEtcdRegistration initializes etcd registration with graceful shutdown +func Register(ctx context.Context, opts *client.RegisterOptions) error { + serviceRegister, err := registerService(opts) + if err != nil { + return err + } + + // Handle graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + select { + case <-sigChan: + case <-ctx.Done(): + } + if serviceRegister != nil { + serviceRegister.Deregister() + } + }() + + return nil +} + +// RegisterToEtcd registers the server to etcd +func registerService(opts *client.RegisterOptions) (*client.ServiceRegister, error) { + // 1. Create etcd client + etcdClient, err := client.NewClient(opts.Endpoints, opts.DialTimeout) + if err != nil { + return nil, errors.New("failed to create etcd client: " + err.Error()) + } + + serviceInfo := &client.ServiceInfo{ + ServiveName: opts.ServiceName, + Address: strings.Split(opts.ServiceAddr, ":")[0], + Port: strings.Split(opts.ServiceAddr, ":")[1], + Metadata: map[string]string{ + "version": opts.Version, + }, + } + + // 2. Register service with TTL + serviceRegister, err := client.NewServiceRegister(etcdClient, serviceInfo, 10) + if err != nil { + etcdClient.Close() + return nil, errors.New("failed to register service: " + err.Error()) + } + logger.Info("service registered to etcd successfully") + + return serviceRegister, nil +} diff --git a/sdk/etcd/gateway/gateway.go b/sdk/etcd/gateway/gateway.go index 6c6a2ec2..ebcbd792 100644 --- a/sdk/etcd/gateway/gateway.go +++ b/sdk/etcd/gateway/gateway.go @@ -16,22 +16,22 @@ import ( "sync" "time" - "gitee.com/openeuler/PilotGo/sdk/etcd" + "gitee.com/openeuler/PilotGo/sdk/etcd/client" ) // Gateway represents the API gateway type Gateway struct { - etcdClient *etcd.Client - services map[string][]*etcd.ServiceInfo + etcdClient *client.Client + services map[string][]*client.ServiceInfo serviceLock sync.RWMutex - watcher *etcd.Watcher + watcher *client.Watcher } // NewGateway creates a new API gateway -func NewGateway(etcdClient *etcd.Client) *Gateway { +func NewGateway(etcdClient *client.Client) *Gateway { g := &Gateway{ etcdClient: etcdClient, - services: make(map[string][]*etcd.ServiceInfo), + services: make(map[string][]*client.ServiceInfo), } // Start watching for service changes @@ -41,27 +41,27 @@ func NewGateway(etcdClient *etcd.Client) *Gateway { // watchServices watches for service changes in etcd func (g *Gateway) watchServices() { - callback := func(eventType etcd.EventType, key, value string) { + callback := func(eventType client.EventType, key, value string) { switch eventType { - case etcd.EventTypePut: - var service etcd.ServiceInfo + case client.EventTypePut: + var service client.ServiceInfo if err := json.Unmarshal([]byte(value), &service); err != nil { fmt.Printf("Failed to unmarshal service info: %v\n", err) return } g.addService(&service) - case etcd.EventTypeDelete: + case client.EventTypeDelete: g.removeService(key) } } - g.watcher = etcd.NewWatcher(g.etcdClient, "/services/", callback) + g.watcher = client.NewWatcher(g.etcdClient, "/services/", callback) g.watcher.Start() } // addService adds a service to the gateway -func (g *Gateway) addService(service *etcd.ServiceInfo) { +func (g *Gateway) addService(service *client.ServiceInfo) { g.serviceLock.Lock() defer g.serviceLock.Unlock() @@ -93,7 +93,7 @@ func (g *Gateway) removeService(key string) { } // getService returns a service instance using round-robin load balancing -func (g *Gateway) getService(name string) (*etcd.ServiceInfo, error) { +func (g *Gateway) getService(name string) (*client.ServiceInfo, error) { g.serviceLock.RLock() defer g.serviceLock.RUnlock() diff --git a/sdk/etcd/gateway/middleware.go b/sdk/etcd/middleware/middleware.go similarity index 99% rename from sdk/etcd/gateway/middleware.go rename to sdk/etcd/middleware/middleware.go index 33496309..5ccdd2ad 100644 --- a/sdk/etcd/gateway/middleware.go +++ b/sdk/etcd/middleware/middleware.go @@ -5,7 +5,7 @@ * Author: zhanghan2021 * Date: Tue Dec 10 14:46:05 2024 +0800 */ -package gateway +package middleware import ( "net/http" diff --git a/sdk/etcd/gateway/balancer.go b/sdk/etcd/proxy/balancer.go similarity index 46% rename from sdk/etcd/gateway/balancer.go rename to sdk/etcd/proxy/balancer.go index 69b60a20..06a64a93 100644 --- a/sdk/etcd/gateway/balancer.go +++ b/sdk/etcd/proxy/balancer.go @@ -5,18 +5,18 @@ * Author: zhanghan2021 * Date: Tue Dec 10 14:36:05 2024 +0800 */ -package gateway +package proxy import ( "sync" "sync/atomic" - "gitee.com/openeuler/PilotGo/sdk/etcd" + "gitee.com/openeuler/PilotGo/sdk/etcd/client" ) // LoadBalancer interface defines methods for load balancing type LoadBalancer interface { - Next(services []*etcd.ServiceInfo) *etcd.ServiceInfo + Next(services []*client.ServiceInfo) *client.ServiceInfo } // RoundRobinBalancer implements round-robin load balancing @@ -28,7 +28,7 @@ func NewRoundRobinBalancer() *RoundRobinBalancer { return &RoundRobinBalancer{} } -func (b *RoundRobinBalancer) Next(services []*etcd.ServiceInfo) *etcd.ServiceInfo { +func (b *RoundRobinBalancer) Next(services []*client.ServiceInfo) *client.ServiceInfo { if len(services) == 0 { return nil } @@ -49,13 +49,7 @@ func NewWeightedRoundRobinBalancer() *WeightedRoundRobinBalancer { } } -func (b *WeightedRoundRobinBalancer) SetWeight(serviceID string, weight int) { - b.mu.Lock() - defer b.mu.Unlock() - b.weights[serviceID] = weight -} - -func (b *WeightedRoundRobinBalancer) Next(services []*etcd.ServiceInfo) *etcd.ServiceInfo { +func (b *WeightedRoundRobinBalancer) Next(services []*client.ServiceInfo) *client.ServiceInfo { if len(services) == 0 { return nil } @@ -63,10 +57,10 @@ func (b *WeightedRoundRobinBalancer) Next(services []*etcd.ServiceInfo) *etcd.Se b.mu.Lock() defer b.mu.Unlock() - // Default weight is 1 if not specified + // Simple weighted round-robin implementation totalWeight := 0 for _, service := range services { - weight := b.weights[service.ServiveName] + weight := b.weights[service.Address] if weight == 0 { weight = 1 } @@ -75,7 +69,7 @@ func (b *WeightedRoundRobinBalancer) Next(services []*etcd.ServiceInfo) *etcd.Se b.current = (b.current + 1) % totalWeight for _, service := range services { - weight := b.weights[service.ServiveName] + weight := b.weights[service.Address] if weight == 0 { weight = 1 } @@ -87,51 +81,3 @@ func (b *WeightedRoundRobinBalancer) Next(services []*etcd.ServiceInfo) *etcd.Se return services[0] } - -// LeastConnectionBalancer implements least connection load balancing -type LeastConnectionBalancer struct { - mu sync.RWMutex - connections map[string]int -} - -func NewLeastConnectionBalancer() *LeastConnectionBalancer { - return &LeastConnectionBalancer{ - connections: make(map[string]int), - } -} - -func (b *LeastConnectionBalancer) IncrementConnections(serviceID string) { - b.mu.Lock() - defer b.mu.Unlock() - b.connections[serviceID]++ -} - -func (b *LeastConnectionBalancer) DecrementConnections(serviceID string) { - b.mu.Lock() - defer b.mu.Unlock() - if b.connections[serviceID] > 0 { - b.connections[serviceID]-- - } -} - -func (b *LeastConnectionBalancer) Next(services []*etcd.ServiceInfo) *etcd.ServiceInfo { - if len(services) == 0 { - return nil - } - - b.mu.RLock() - defer b.mu.RUnlock() - - var minConnections int = -1 - var selectedService *etcd.ServiceInfo - - for _, service := range services { - connections := b.connections[service.ServiveName] - if minConnections == -1 || connections < minConnections { - minConnections = connections - selectedService = service - } - } - - return selectedService -} diff --git a/sdk/etcd/proxy/proxy.go b/sdk/etcd/proxy/proxy.go new file mode 100644 index 00000000..46e34d49 --- /dev/null +++ b/sdk/etcd/proxy/proxy.go @@ -0,0 +1,82 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Tue Dec 10 13:56:05 2024 +0800 + */ +package proxy + +import ( + "fmt" + "net/http" + "net/http/httputil" + "net/url" + "sync" + + "gitee.com/openeuler/PilotGo/sdk/etcd/client" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// Proxy represents the API proxy +type Proxy struct { + client *clientv3.Client + services map[string][]*client.ServiceInfo + serviceLock sync.RWMutex +} + +// NewProxy creates a new API proxy +func NewProxy(cli *clientv3.Client) *Proxy { + return &Proxy{ + client: cli, + services: make(map[string][]*client.ServiceInfo), + } +} + +// Start starts the proxy server +func (p *Proxy) Start(addr string) error { + http.HandleFunc("/", p.ProxyHandler) + return http.ListenAndServe(addr, nil) +} + +// ProxyHandler handles the proxying of requests to services +func (p *Proxy) ProxyHandler(w http.ResponseWriter, r *http.Request) { + serviceName := r.Header.Get("X-Service-Name") + if serviceName == "" { + http.Error(w, "Service name not specified", http.StatusBadRequest) + return + } + + service, err := p.getService(serviceName) + if err != nil { + http.Error(w, fmt.Sprintf("Error getting service: %v", err), http.StatusInternalServerError) + return + } + + targetURL, err := url.Parse(fmt.Sprintf("http://%s:%s", service.Address, service.Port)) + if err != nil { + http.Error(w, fmt.Sprintf("Error parsing service URL: %v", err), http.StatusInternalServerError) + return + } + + proxy := httputil.NewSingleHostReverseProxy(targetURL) + proxy.ServeHTTP(w, r) +} + +// getService returns a service instance using round-robin load balancing +func (p *Proxy) getService(name string) (*client.ServiceInfo, error) { + p.serviceLock.RLock() + defer p.serviceLock.RUnlock() + + services, ok := p.services[name] + if !ok || len(services) == 0 { + return nil, fmt.Errorf("no available services for %s", name) + } + + // Simple round-robin for now + service := services[0] + // Move the first service to the end + p.services[name] = append(services[1:], service) + + return service, nil +} -- Gitee