diff --git a/cmd/server/app/network/controller/pluginWebsocketProxy.go b/cmd/server/app/network/controller/pluginWebsocketProxy.go index 8117ed551804b892af5de8684d96a57bc7dea2ad..23919b62cf1ffa718d76ce910073e17d972f7a2d 100644 --- a/cmd/server/app/network/controller/pluginWebsocketProxy.go +++ b/cmd/server/app/network/controller/pluginWebsocketProxy.go @@ -3,6 +3,7 @@ package controller import ( "crypto/tls" "fmt" + "net" "net/http" "strings" "sync" @@ -67,7 +68,9 @@ func (we *WebsocketError) Error() string { func PluginWebsocketGatewayHandler(c *gin.Context) { var wg sync.WaitGroup errChan := make(chan error, 1) + doneChan := make(chan struct{}) defer func() { + close(doneChan) wg.Wait() close(errChan) }() @@ -98,7 +101,7 @@ func PluginWebsocketGatewayHandler(c *gin.Context) { targetURL_str = "wss://" + strings.Split(targetURL_str, "://")[1] } - logger.Debug("proxy plugin request to: %s", targetURL_str) + logger.Debug("websocket proxy plugin request: %s->%s", c.Request.RemoteAddr, target_addr) client_wsconn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { @@ -107,7 +110,7 @@ func PluginWebsocketGatewayHandler(c *gin.Context) { } defer client_wsconn.Close() - header, err := targetDirector(c.Request) + header, err := targetDirector(c) if err != nil { if err := client_wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, err.Error())); err != nil { logger.Error("websocket writemessage close: %s", err.Error()) @@ -116,7 +119,7 @@ func PluginWebsocketGatewayHandler(c *gin.Context) { target_wsconn, _, err := dialer.Dial(targetURL_str, header) if err != nil { - if err := client_wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("dial to target WebSocket failed: %s", err.Error()))); err != nil { + if err := client_wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("dial to plugin server WebSocket failed: %s", err.Error()))); err != nil { logger.Error("websocket writemessage close: %s", err.Error()) } return @@ -124,9 +127,9 @@ func PluginWebsocketGatewayHandler(c *gin.Context) { defer target_wsconn.Close() wg.Add(1) - go transferMessages(client_wsconn, target_wsconn, errChan, &wg) + go transferMessages(client_wsconn, target_wsconn, errChan, &wg, doneChan) wg.Add(1) - go transferMessages(target_wsconn, client_wsconn, errChan, &wg) + go transferMessages(target_wsconn, client_wsconn, errChan, &wg, doneChan) err = <-errChan logger.Error(err.Error()) @@ -134,61 +137,74 @@ func PluginWebsocketGatewayHandler(c *gin.Context) { switch wserr.Code { case WebsocketProxyReadError: if err := wserr.DstConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, wserr.Text)); err != nil { - logger.Error("websocket writemessage close: %s", err.Error()) + logger.Error("write close message error: %s(wserr.text: %s)", err.Error(), wserr.Text) } case WebsocketProxyWriteError: if err := wserr.SrcConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, wserr.Text)); err != nil { - logger.Error("websocket writemessage close: %s", err.Error()) + logger.Error("write close message error: %s(wserr.text: %s)", err.Error(), wserr.Text) } case WebsocketProxySingleError: if err := wserr.SingleConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, wserr.Text)); err != nil { - logger.Error("websocket writemessage close: %s", err.Error()) + logger.Error("write close message error: %s(wserr.text: %s)", err.Error(), wserr.Text) } } } -func targetDirector(_r *http.Request) (http.Header, error) { - cookie, err := _r.Cookie("Admin-Token") - if err != nil { - return nil, fmt.Errorf("fail to get client cookie: %s", err.Error()) +func targetDirector(_ctx *gin.Context) (http.Header, error) { + header := http.Header{} + + header.Set("clientId", _ctx.Query("clientId")) + if clientIP, clientPort, err := net.SplitHostPort(_ctx.Request.RemoteAddr); err == nil { + if prior, ok := _ctx.Request.Header["X-Forwarded-For"]; ok { + clientIP = strings.Join(prior, ", ") + ", " + clientIP + ":" + clientPort + } + header.Set("X-Forwarded-For", clientIP+":"+clientPort) + } + + header.Set("X-Forwarded-Proto", "http") + if _ctx.Request.TLS != nil { + header.Set("X-Forwarded-Proto", "https") } - header := http.Header{} - header.Set("token", cookie.Value) return header, nil } -func transferMessages(_srcConn, _dstConn *websocket.Conn, _err_ch chan error, _wg *sync.WaitGroup) { +func transferMessages(_srcConn, _dstConn *websocket.Conn, _err_ch chan error, _wg *sync.WaitGroup, _donechan chan struct{}) { defer _wg.Done() for { - messageType, message, err := _srcConn.ReadMessage() - if err != nil { - if websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { + select { + case <-_donechan: + return + default: + messageType, message, err := _srcConn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { + _err_ch <- &WebsocketError{ + Code: WebsocketProxyReadError, + SrcConn: _srcConn, + DstConn: _dstConn, + Text: fmt.Sprintf("websocket src conn %s closed(%v->%v): %s", _srcConn.RemoteAddr().String(), _srcConn.RemoteAddr().String(), _dstConn.RemoteAddr().String(), err.Error()), + } + return + } _err_ch <- &WebsocketError{ Code: WebsocketProxyReadError, SrcConn: _srcConn, DstConn: _dstConn, - Text: fmt.Sprintf("websocket src conn %s closed(%v->%v): %s", _srcConn.RemoteAddr().String(), _srcConn.RemoteAddr().String(), _dstConn.RemoteAddr().String(), err.Error()), + Text: fmt.Sprintf("error while reading message(%v->%v, msgType: %d): %s, %s", _srcConn.RemoteAddr().String(), _dstConn.RemoteAddr().String(), messageType, err.Error(), message), } return } - _err_ch <- &WebsocketError{ - Code: WebsocketProxyReadError, - SrcConn: _srcConn, - DstConn: _dstConn, - Text: fmt.Sprintf("error while reading message(%v->%v, msgType: %d): %s, %s", _srcConn.RemoteAddr().String(), _dstConn.RemoteAddr().String(), messageType, err.Error(), message), - } - return - } - if err := _dstConn.WriteMessage(messageType, message); err != nil { - _err_ch <- &WebsocketError{ - Code: WebsocketProxyWriteError, - SrcConn: _srcConn, - DstConn: _dstConn, - Text: fmt.Sprintf("error while writing message(%v->%v): %s", _srcConn.RemoteAddr().String(), _dstConn.RemoteAddr().String(), err.Error()), + if err := _dstConn.WriteMessage(messageType, message); err != nil { + _err_ch <- &WebsocketError{ + Code: WebsocketProxyWriteError, + SrcConn: _srcConn, + DstConn: _dstConn, + Text: fmt.Sprintf("error while writing message(%v->%v): %s", _srcConn.RemoteAddr().String(), _dstConn.RemoteAddr().String(), err.Error()), + } + return } - return } } } diff --git a/vendor/gitee.com/openeuler/PilotGo/sdk/utils/httputils/judgeProtocol.go b/vendor/gitee.com/openeuler/PilotGo/sdk/utils/httputils/judgeProtocol.go index 96d9f61c5311781fae5789d417fed20868f73b94..5d3249364384293bfe4a093ba37ab450f6fe5b48 100644 --- a/vendor/gitee.com/openeuler/PilotGo/sdk/utils/httputils/judgeProtocol.go +++ b/vendor/gitee.com/openeuler/PilotGo/sdk/utils/httputils/judgeProtocol.go @@ -17,7 +17,7 @@ func ServerIsHttp(rawurl string) (bool, error) { } url_string := fmt.Sprintf("http://%s", net.JoinHostPort(url.Hostname(), url.Port())) - req, err := http.NewRequest("GET", url_string, nil) + req, err := http.NewRequest(http.MethodGet, url_string, nil) if err != nil { return false, err }