diff --git a/cmd/server/app/network/httpserver.go b/cmd/server/app/network/httpserver.go index 9eba573b6ca6931305fa5cc0ac969d8ec61ecdb3..6b4ad178ba03d05a95921e7760608f957b68569c 100644 --- a/cmd/server/app/network/httpserver.go +++ b/cmd/server/app/network/httpserver.go @@ -32,7 +32,7 @@ func HttpServerInit(conf *options.HttpServer, stopCh <-chan struct{}) error { } go func() { - r := setupRouter() + r := SetupRouter() // 启动websocket服务 go websocket.CliManager.Start(stopCh) shutdownCtx, cancel := context.WithCancel(context.Background()) @@ -101,7 +101,7 @@ func HttpServerInit(conf *options.HttpServer, stopCh <-chan struct{}) error { return nil } -func setupRouter() *gin.Engine { +func SetupRouter() *gin.Engine { gin.SetMode(gin.ReleaseMode) router := gin.New() router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) diff --git a/sdk/go-micro/gateway/gateway.go b/sdk/go-micro/gateway/gateway.go new file mode 100644 index 0000000000000000000000000000000000000000..c68c2bf9471240fcd218f9c0c7ae048456cc7e6c --- /dev/null +++ b/sdk/go-micro/gateway/gateway.go @@ -0,0 +1,225 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Mon Dec 16 10:36:05 2024 +0800 + */ +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httputil" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "gitee.com/openeuler/PilotGo/sdk/go-micro/proxy" + "gitee.com/openeuler/PilotGo/sdk/go-micro/registry" + "gitee.com/openeuler/PilotGo/sdk/logger" + "github.com/gin-gonic/gin" +) + +// Gateway represents the API gateway +type Gateway struct { + registry registry.Registry + services map[string][]*registry.ServiceInfo + serviceLock sync.RWMutex + balancer proxy.LoadBalancer + proxy *proxy.Proxy + server *http.Server + cancel context.CancelFunc +} + +// NewGateway creates a new API gateway +func NewGateway(reg registry.Registry) *Gateway { + return &Gateway{ + registry: reg, + services: make(map[string][]*registry.ServiceInfo), + balancer: proxy.NewRoundRobinBalancer(), + proxy: proxy.NewProxy(reg), + } +} + +// Run starts the gateway and handles graceful shutdown +func (g *Gateway) Run(addr string, router *gin.Engine) error { + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + errChan := make(chan error, 1) + + // 启动gateway + go func() { + if err := g.Start(addr, router); err != nil { + errChan <- err + } + }() + + select { + case err := <-errChan: + return fmt.Errorf("gateway error: %v", err) + case sig := <-sigChan: + logger.Info("Received signal: %v, Shutting down gateway...", sig) + if err := g.Stop(); err != nil { + return fmt.Errorf("error stopping gateway: %v", err) + } + } + + return nil +} + +// Start starts the gateway server +func (g *Gateway) Start(addr string, router *gin.Engine) error { + ctx, cancel := context.WithCancel(context.Background()) + g.cancel = cancel + + // Start watching for service changes + if err := g.watchServices(router); err != nil { + return fmt.Errorf("failed to start service watcher: %v", err) + } + + g.server = &http.Server{ + Addr: addr, + Handler: router, + } + + go func() { + logger.Info("Starting gateway server on %s\n", addr) + if err := g.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("HTTP server error: %v\n", err) + } + }() + + <-ctx.Done() + return nil +} + +// Stop stops the gateway +func (g *Gateway) Stop() error { + if g.cancel != nil { + g.cancel() + } + + // 创建一个带超时的上下文用于关闭 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // 优雅关闭HTTP服务器 + if g.server != nil { + if err := g.server.Shutdown(ctx); err != nil { + logger.Error("HTTP server Shutdown error: %v", err) + } + } + + // 关闭注册中心连接 + if err := g.registry.Close(); err != nil { + logger.Error("Registry close error: %v", err) + return err + } + + return nil +} + +// watchServices watches for service changes in the registry +func (g *Gateway) watchServices(router *gin.Engine) error { + callback := func(event registry.Event) { + switch event.Type { + case registry.EventTypePut: + var service registry.ServiceInfo + if err := json.Unmarshal([]byte(event.Value), &service); err != nil { + logger.Error("Failed to unmarshal service info: %v\n", err) + return + } + g.addService(&service) + g.updateRouter(router, service.ServiceName) + logger.Info("Service added/updated: %s at %s:%s\n", service.ServiceName, service.Address, service.Port) + + case registry.EventTypeDelete: + g.removeService(event.Key) + logger.Info("Service removed: %s\n", event.Key) + } + } + + return g.registry.Watch("/services/", callback) +} + +// updateRouter updates the router with the given service's routes +func (g *Gateway) updateRouter(router *gin.Engine, serviceName string) { + // 动态绑定服务名到代理 + router.Any(fmt.Sprintf("/%s/*path", serviceName), func(c *gin.Context) { + targetService, err := g.getService(serviceName) + if err != nil { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": fmt.Sprintf("Service %s unavailable", serviceName)}) + return + } + + // 构造目标URL + targetURL := fmt.Sprintf("http://%s:%s%s", targetService.Address, targetService.Port, c.Param("path")) + logger.Info("Proxying request to: %s", targetURL) + + // 使用反向代理转发请求 + proxy := &httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL.Scheme = "http" + req.URL.Host = fmt.Sprintf("%s:%s", targetService.Address, targetService.Port) + req.URL.Path = c.Param("path") + req.Header = c.Request.Header + }, + } + proxy.ServeHTTP(c.Writer, c.Request) + }) +} + +// addService adds a service to the gateway +func (g *Gateway) addService(service *registry.ServiceInfo) { + g.serviceLock.Lock() + defer g.serviceLock.Unlock() + + services := g.services[service.ServiceName] + // Check if service already exists + for i, s := range services { + if s.Address == service.Address && s.Port == service.Port { + services[i] = service + return + } + } + // Add new service + g.services[service.ServiceName] = append(services, service) +} + +// removeService removes a service from the gateway +func (g *Gateway) removeService(key string) { + g.serviceLock.Lock() + defer g.serviceLock.Unlock() + + for name, services := range g.services { + for i, service := range services { + if fmt.Sprintf("/services/%s/%s:%s", service.ServiceName, service.Address, service.Port) == key { + g.services[name] = append(services[:i], services[i+1:]...) + if len(g.services[name]) == 0 { + delete(g.services, name) + } + return + } + } + } +} + +// getService returns a service instance using the load balancer +func (g *Gateway) getService(name string) (*registry.ServiceInfo, error) { + g.serviceLock.RLock() + defer g.serviceLock.RUnlock() + + services := g.services[name] + if len(services) == 0 { + return nil, fmt.Errorf("no available services for %s", name) + } + + return g.balancer.Next(services), nil +} diff --git a/sdk/go-micro/middleware/middleware.go b/sdk/go-micro/middleware/middleware.go new file mode 100644 index 0000000000000000000000000000000000000000..6624c36b0c9fc0f324a8b44c3a67234b39ee7fa1 --- /dev/null +++ b/sdk/go-micro/middleware/middleware.go @@ -0,0 +1,134 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Mon Dec 16 10:36:05 2024 +0800 + */ +package middleware + +import ( + "net/http" + "sync" + "time" + + "gitee.com/openeuler/PilotGo/sdk/logger" + "golang.org/x/time/rate" +) + +// Middleware represents a chain of http handlers +type Middleware func(http.Handler) http.Handler + +// Chain chains multiple middleware together +func Chain(middlewares ...Middleware) Middleware { + return func(next http.Handler) http.Handler { + for i := len(middlewares) - 1; i >= 0; i-- { + next = middlewares[i](next) + } + return next + } +} + +// RateLimiter implements rate limiting middleware +type RateLimiter struct { + visitors map[string]*rate.Limiter + mu sync.RWMutex + r rate.Limit + b int +} + +func NewRateLimiter(r rate.Limit, b int) *RateLimiter { + return &RateLimiter{ + visitors: make(map[string]*rate.Limiter), + r: r, + b: b, + } +} + +func (rl *RateLimiter) getLimiter(ip string) *rate.Limiter { + rl.mu.Lock() + defer rl.mu.Unlock() + + limiter, exists := rl.visitors[ip] + if !exists { + limiter = rate.NewLimiter(rl.r, rl.b) + rl.visitors[ip] = limiter + } + + return limiter +} + +func (rl *RateLimiter) RateLimit(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + limiter := rl.getLimiter(r.RemoteAddr) + if !limiter.Allow() { + http.Error(w, "Too many requests", http.StatusTooManyRequests) + return + } + next.ServeHTTP(w, r) + }) +} + +// AuthMiddleware implements authentication middleware +type AuthMiddleware struct { + tokenValidator func(string) bool +} + +func NewAuthMiddleware(validator func(string) bool) *AuthMiddleware { + return &AuthMiddleware{ + tokenValidator: validator, + } +} + +func (am *AuthMiddleware) Authenticate(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + token := r.Header.Get("Authorization") + if token == "" { + http.Error(w, "Authorization token required", http.StatusUnauthorized) + return + } + + if !am.tokenValidator(token) { + http.Error(w, "Invalid token", http.StatusUnauthorized) + return + } + + next.ServeHTTP(w, r) + }) +} + +// LoggingMiddleware implements request logging +func LoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + + // Call the next handler + next.ServeHTTP(w, r) + + // Log the request + duration := time.Since(start) + logger.Info( + "%s %s %s %v", + r.Method, + r.RequestURI, + r.RemoteAddr, + duration, + ) + }) +} + +// CORSMiddleware implements CORS support +func CORSMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") + + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + + next.ServeHTTP(w, r) + }) +} diff --git a/sdk/go-micro/proxy/balancer.go b/sdk/go-micro/proxy/balancer.go new file mode 100644 index 0000000000000000000000000000000000000000..aeab154601d0e678aca6425cf0c58bd61ab26604 --- /dev/null +++ b/sdk/go-micro/proxy/balancer.go @@ -0,0 +1,149 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Mon Dec 16 10:36:05 2024 +0800 + */ +package proxy + +import ( + "sync" + "sync/atomic" + + "gitee.com/openeuler/PilotGo/sdk/go-micro/registry" +) + +// LoadBalancer interface defines methods for load balancing +type LoadBalancer interface { + Next(services []*registry.ServiceInfo) *registry.ServiceInfo + UpdateWeight(address string, weight int) + GetWeight(address string) int +} + +// RoundRobinBalancer implements round-robin load balancing +type RoundRobinBalancer struct { + counter uint64 + weights map[string]int + mu sync.RWMutex +} + +func NewRoundRobinBalancer() *RoundRobinBalancer { + return &RoundRobinBalancer{ + weights: make(map[string]int), + } +} + +func (b *RoundRobinBalancer) UpdateWeight(address string, weight int) { + b.mu.Lock() + defer b.mu.Unlock() + b.weights[address] = weight +} + +func (b *RoundRobinBalancer) GetWeight(address string) int { + b.mu.RLock() + defer b.mu.RUnlock() + return b.weights[address] +} + +func (b *RoundRobinBalancer) Next(services []*registry.ServiceInfo) *registry.ServiceInfo { + if len(services) == 0 { + return nil + } + + b.mu.RLock() + defer b.mu.RUnlock() + + // Get total weight + totalWeight := 0 + for _, service := range services { + weight := b.weights[service.Address] + if weight == 0 { + weight = 1 + } + totalWeight += weight + } + + // Use atomic counter for thread safety + count := atomic.AddUint64(&b.counter, 1) + targetWeight := count % uint64(totalWeight) + + // Find service based on weight + currentWeight := 0 + for _, service := range services { + weight := b.weights[service.Address] + if weight == 0 { + weight = 1 + } + currentWeight += weight + if uint64(currentWeight) > targetWeight { + return service + } + } + + return services[0] +} + +// WeightedRoundRobinBalancer implements weighted round-robin load balancing +type WeightedRoundRobinBalancer struct { + mu sync.RWMutex + current int + weights map[string]int +} + +func NewWeightedRoundRobinBalancer() *WeightedRoundRobinBalancer { + return &WeightedRoundRobinBalancer{ + weights: make(map[string]int), + } +} + +func (b *WeightedRoundRobinBalancer) UpdateWeight(address string, weight int) { + b.mu.Lock() + defer b.mu.Unlock() + b.weights[address] = weight +} + +func (b *WeightedRoundRobinBalancer) GetWeight(address string) int { + b.mu.RLock() + defer b.mu.RUnlock() + return b.weights[address] +} + +func (b *WeightedRoundRobinBalancer) Next(services []*registry.ServiceInfo) *registry.ServiceInfo { + if len(services) == 0 { + return nil + } + + b.mu.Lock() + defer b.mu.Unlock() + + // Simple weighted round-robin implementation + totalWeight := 0 + maxWeight := 0 + for _, service := range services { + weight := b.weights[service.Address] + if weight == 0 { + weight = 1 + } + if weight > maxWeight { + maxWeight = weight + } + totalWeight += weight + } + + for { + b.current = (b.current + 1) % maxWeight + for _, service := range services { + weight := b.weights[service.Address] + if weight == 0 { + weight = 1 + } + if weight >= b.current { + return service + } + } + if b.current >= maxWeight { + b.current = 0 + } + } +} diff --git a/sdk/go-micro/proxy/proxy.go b/sdk/go-micro/proxy/proxy.go new file mode 100644 index 0000000000000000000000000000000000000000..88406d423c30ae54fcfe8873d5a56226088aeb7d --- /dev/null +++ b/sdk/go-micro/proxy/proxy.go @@ -0,0 +1,192 @@ +/* + * Copyright (c) KylinSoft Co., Ltd. 2024.All rights reserved. + * PilotGo licensed under the Mulan Permissive Software License, Version 2. + * See LICENSE file for more details. + * Author: zhanghan2021 + * Date: Mon Dec 16 10:36:05 2024 +0800 + */ +package proxy + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "net/http/httputil" + "net/url" + "sync" + "time" + + "gitee.com/openeuler/PilotGo/sdk/go-micro/registry" +) + +// Proxy represents the API proxy +type Proxy struct { + registry registry.Registry + services map[string][]*registry.ServiceInfo + serviceLock sync.RWMutex + balancer LoadBalancer + retries int +} + +// NewProxy creates a new API proxy +func NewProxy(reg registry.Registry) *Proxy { + return &Proxy{ + registry: reg, + services: make(map[string][]*registry.ServiceInfo), + balancer: NewRoundRobinBalancer(), + retries: 3, // Default number of retries + } +} + +// Start starts the proxy server +func (p *Proxy) Start(addr string) error { + // Watch for service changes + if err := p.watchServices(); err != nil { + return fmt.Errorf("failed to start service watcher: %v", err) + } + + log.Printf("Starting proxy server on %s\n", addr) + http.HandleFunc("/", p.ProxyHandler) + return http.ListenAndServe(addr, nil) +} + +// watchServices watches for service changes in the registry +func (p *Proxy) watchServices() error { + callback := func(event registry.Event) { + switch event.Type { + case registry.EventTypePut: + var service registry.ServiceInfo + if err := json.Unmarshal([]byte(event.Value), &service); err != nil { + log.Printf("Failed to unmarshal service info: %v\n", err) + return + } + p.addService(&service) + log.Printf("Service added/updated: %s at %s:%s\n", service.ServiceName, service.Address, service.Port) + + case registry.EventTypeDelete: + p.removeService(event.Key) + log.Printf("Service removed: %s\n", event.Key) + } + } + + return p.registry.Watch("/services/", callback) +} + +// addService adds a service to the proxy +func (p *Proxy) addService(service *registry.ServiceInfo) { + p.serviceLock.Lock() + defer p.serviceLock.Unlock() + + services := p.services[service.ServiceName] + // Check if service already exists + for i, s := range services { + if s.Address == service.Address && s.Port == service.Port { + services[i] = service + return + } + } + // Add new service + p.services[service.ServiceName] = append(services, service) +} + +// removeService removes a service from the proxy +func (p *Proxy) removeService(key string) { + p.serviceLock.Lock() + defer p.serviceLock.Unlock() + + for name, services := range p.services { + for i, service := range services { + if fmt.Sprintf("/services/%s", service.ServiceName) == key { + p.services[name] = append(services[:i], services[i+1:]...) + if len(p.services[name]) == 0 { + delete(p.services, name) + } + return + } + } + } +} + +// ProxyHandler handles the proxying of requests to services +func (p *Proxy) ProxyHandler(w http.ResponseWriter, r *http.Request) { + serviceName := r.Header.Get("X-Service-Name") + if serviceName == "" { + http.Error(w, "Service name not specified", http.StatusBadRequest) + return + } + + var lastErr error + for i := 0; i <= p.retries; i++ { + service, err := p.getService(serviceName) + if err != nil { + lastErr = err + continue + } + + target := fmt.Sprintf("http://%s:%s", service.Address, service.Port) + targetURL, err := url.Parse(target) + if err != nil { + lastErr = err + continue + } + + proxy := httputil.NewSingleHostReverseProxy(targetURL) + + // Add custom error handling + proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { + log.Printf("Proxy error: %v", err) + lastErr = err + // Don't write error here, let retry logic handle it + } + + // Add custom request modification + originalDirector := proxy.Director + proxy.Director = func(req *http.Request) { + originalDirector(req) + req.URL.Host = targetURL.Host + req.URL.Scheme = targetURL.Scheme + req.Header.Set("X-Forwarded-Host", req.Header.Get("Host")) + req.Header.Set("X-Real-IP", req.RemoteAddr) + req.Header.Set("X-Retry-Count", fmt.Sprintf("%d", i)) + } + + // Attempt to proxy the request + var proxyErr error + done := make(chan bool) + go func() { + proxy.ServeHTTP(w, r) + done <- true + }() + + select { + case <-done: + return // Request completed successfully + case <-time.After(10 * time.Second): + proxyErr = fmt.Errorf("request timed out") + } + + if proxyErr != nil { + lastErr = proxyErr + continue + } + + return // Request completed successfully + } + + // All retries failed + http.Error(w, fmt.Sprintf("Service unavailable after %d retries: %v", p.retries, lastErr), http.StatusServiceUnavailable) +} + +// getService returns a service instance using the configured load balancer +func (p *Proxy) getService(name string) (*registry.ServiceInfo, error) { + p.serviceLock.RLock() + defer p.serviceLock.RUnlock() + + services, ok := p.services[name] + if !ok || len(services) == 0 { + return nil, fmt.Errorf("no available services for %s", name) + } + + return p.balancer.Next(services), nil +} diff --git a/sdk/go-micro/registry/service.go b/sdk/go-micro/registry/service.go index cd2712280a0c364f5024f27e327ab02d8cd6ea44..20d180e7b3ad72235ab5d7d4696543265d951665 100644 --- a/sdk/go-micro/registry/service.go +++ b/sdk/go-micro/registry/service.go @@ -25,11 +25,11 @@ type ServiceRegistrar struct { } // NewServiceRegistrar creates a new service registrar -func NewServiceRegistrar(opts *Options) error { +func NewServiceRegistrar(opts *Options) (Registry, error) { // Create registry client reg, err := NewRegistry(RegistryTypeEtcd, opts) if err != nil { - return fmt.Errorf("failed to create registry: %v", err) + return nil, fmt.Errorf("failed to create registry: %v", err) } // Create service info @@ -46,7 +46,7 @@ func NewServiceRegistrar(opts *Options) error { } if err := sr.Start(); err != nil { - return err + return nil, err } // Handle graceful shutdown @@ -64,7 +64,7 @@ func NewServiceRegistrar(opts *Options) error { }() logger.Info("service registered to etcd successfully") - return nil + return reg, nil } // Start registers the service and starts keeping it alive