From 1c6c6c4da878ace12e6547050159c28f6968a228 Mon Sep 17 00:00:00 2001 From: gaohuatao Date: Wed, 3 Feb 2021 15:04:00 +0800 Subject: [PATCH] kubelet support exec websocket protocol Signed-off-by: gaohuatao --- ...elet-support-exec-websocket-protocol.patch | 266 ++++++++++++++++++ kubernetes.spec | 9 +- 2 files changed, 273 insertions(+), 2 deletions(-) create mode 100644 0001-kubelet-support-exec-websocket-protocol.patch diff --git a/0001-kubelet-support-exec-websocket-protocol.patch b/0001-kubelet-support-exec-websocket-protocol.patch new file mode 100644 index 0000000..ad08e35 --- /dev/null +++ b/0001-kubelet-support-exec-websocket-protocol.patch @@ -0,0 +1,266 @@ +From ac8fda0c77cb588f59aff7c86c05933a7a2d77c4 Mon Sep 17 00:00:00 2001 +From: gaohuatao +Date: Wed, 3 Feb 2021 14:59:37 +0800 +Subject: [PATCH] kubelet support exec websocket protocol + +Signed-off-by: gaohuatao +--- + .../cri/streaming/remotecommand/proxy.go | 197 ++++++++++++++++++ + pkg/kubelet/server/server.go | 21 +- + 2 files changed, 214 insertions(+), 4 deletions(-) + create mode 100644 pkg/kubelet/cri/streaming/remotecommand/proxy.go + +diff --git a/pkg/kubelet/cri/streaming/remotecommand/proxy.go b/pkg/kubelet/cri/streaming/remotecommand/proxy.go +new file mode 100644 +index 00000000..5b99747c +--- /dev/null ++++ b/pkg/kubelet/cri/streaming/remotecommand/proxy.go +@@ -0,0 +1,197 @@ ++package remotecommand ++ ++import ( ++ "bytes" ++ "errors" ++ "fmt" ++ "io" ++ "net/http" ++ "net/url" ++ "strings" ++ "time" ++ ++ "github.com/gorilla/websocket" ++ apierrors "k8s.io/apimachinery/pkg/api/errors" ++ "k8s.io/apimachinery/pkg/apis/meta/v1" ++ remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" ++ "k8s.io/apimachinery/pkg/util/runtime" ++ "k8s.io/klog/v2" ++ "k8s.io/kubernetes/staging/src/k8s.io/client-go/util/exec" ++) ++ ++var ( ++ streamIdleTimeout = 4 * time.Hour ++ streamCreationTimeout = remotecommandconsts.DefaultStreamCreationTimeout ++) ++ ++// proxyStreamToWebSocket proxies stream to url with websocket. ++func ProxyToWebSocket(w http.ResponseWriter, r *http.Request, url *url.URL, opts *Options) { ++ klog.V(8).Infof("start proxy request to websocket %+v", r) ++ ctx, ok := createStreams( ++ r, ++ w, ++ opts, ++ remotecommandconsts.SupportedStreamingProtocols, ++ streamIdleTimeout, ++ streamCreationTimeout) ++ if !ok { ++ msg := "failed to create stream to fontend" ++ klog.Error(msg) ++ http.Error(w, msg, http.StatusInternalServerError) ++ return ++ } ++ defer func() { ++ if err := ctx.conn.Close(); err != nil { ++ klog.Errorf("failed to close connection, %v", err) ++ } ++ }() ++ ++ klog.V(8).Infof("start connecting to websocket %s", url.String()) ++ backendConn, err := connectBackend(url.String(), "channel.k8s.io", r) ++ if err != nil { ++ msg := fmt.Sprintf("connectBackend failed: %v", err) ++ klog.Error(msg) ++ http.Error(w, msg, http.StatusInternalServerError) ++ return ++ } ++ defer backendConn.Close() ++ ++ var errConnection error ++ frontendStdinToBackendComplete := make(chan struct{}) ++ frontendResizeToBackendComplete := make(chan struct{}) ++ backendToFrontendComplete := make(chan struct{}) ++ ++ go func() { ++ for { ++ _, msg, err := backendConn.ReadMessage() ++ if err != nil { ++ e, ok := err.(*websocket.CloseError) ++ if !ok || e.Code != websocket.CloseNormalClosure { ++ errConnection = err ++ } ++ break ++ } ++ ++ if len(msg) < 1 { ++ errConnection = fmt.Errorf("received err msg from backEnd (the length less than 1), msg: %s", string(msg)) ++ break ++ } ++ ++ switch msg[0] { ++ case stdoutChannel: ++ _, err = ctx.stdoutStream.Write(msg[1:]) ++ case stderrChannel: ++ _, err = ctx.stderrStream.Write(msg[1:]) ++ case errorChannel: ++ err = ctx.writeStatus(apierrors.NewInternalError(errors.New(string(msg[1:])))) ++ default: ++ err = fmt.Errorf("received invalid msg from backEnd, msg: %s", string(msg)) ++ } ++ ++ if err != nil { ++ errConnection = err ++ break ++ } ++ } ++ close(backendToFrontendComplete) ++ }() ++ ++ if opts.Stdin { ++ go func() { ++ r := &rwc{ ++ c: backendConn, ++ index: stdinChannel, ++ } ++ _, err := io.Copy(r, ctx.stdinStream) ++ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { ++ errConnection = fmt.Errorf("copy data from frontend(stdinStream) to backend failed, err: %v", err) ++ } ++ close(frontendStdinToBackendComplete) ++ }() ++ } ++ ++ if opts.TTY { ++ go func() { ++ r := &rwc{ ++ c: backendConn, ++ index: resizeChannel, ++ } ++ _, err := io.Copy(r, ctx.resizeStream) ++ if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { ++ errConnection = fmt.Errorf("copy data from frontend(resizeStream) to backend failed, err: %v", err) ++ } ++ close(frontendResizeToBackendComplete) ++ }() ++ } ++ select { ++ case <-backendToFrontendComplete: ++ case <-frontendStdinToBackendComplete: ++ case <-frontendResizeToBackendComplete: ++ } ++ ++ if errConnection != nil { ++ klog.Errorf("SpdyProxy: the connection disconnected: %v", errConnection) ++ if exitErr, ok := errConnection.(exec.ExitError); ok && exitErr.Exited() { ++ rc := exitErr.ExitStatus() ++ ctx.writeStatus(&apierrors.StatusError{ErrStatus: v1.Status{ ++ Status: v1.StatusFailure, ++ Reason: remotecommandconsts.NonZeroExitCodeReason, ++ Details: &v1.StatusDetails{ ++ Causes: []v1.StatusCause{ ++ { ++ Type: remotecommandconsts.ExitCodeCauseType, ++ Message: fmt.Sprintf("%d", rc), ++ }, ++ }, ++ }, ++ Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr), ++ }}) ++ } else if closeErr, ok := errConnection.(*websocket.CloseError); !ok || closeErr.Text != io.ErrUnexpectedEOF.Error() { ++ //ignore this ErrUnexpectedEOF because isulad always close the connection while reading a frame ++ err = fmt.Errorf("error executing command in container: %v", errConnection) ++ runtime.HandleError(err) ++ ctx.writeStatus(apierrors.NewInternalError(err)) ++ } ++ } else { ++ ctx.writeStatus(&apierrors.StatusError{ErrStatus: v1.Status{ ++ Status: v1.StatusSuccess, ++ }}) ++ } ++} ++ ++func connectBackend(addr, subprotocol string, r *http.Request) (*websocket.Conn, error) { ++ h := http.Header{} ++ originHeadValue := r.Header.Get("Origin") ++ if originHeadValue != "" { ++ h["Origin"] = []string{originHeadValue} ++ } ++ websocket.DefaultDialer.Subprotocols = []string{subprotocol} ++ websocket.DefaultDialer.ReadBufferSize = 128 * 1024 ++ websocket.DefaultDialer.WriteBufferSize = 128 * 1024 ++ ws, resp, err := websocket.DefaultDialer.Dial(addr, h) ++ if err != nil { ++ var body bytes.Buffer ++ body.ReadFrom(resp.Body) ++ defer resp.Body.Close() ++ msg := fmt.Errorf("dial failed: %v, response is: %v", err, body.String()) ++ return nil, msg ++ } ++ return ws, nil ++} ++ ++type rwc struct { ++ c *websocket.Conn ++ index byte ++} ++ ++func (c *rwc) Write(p []byte) (int, error) { ++ frame := make([]byte, len(p)+1) ++ frame[0] = byte(c.index) ++ copy(frame[1:], p) ++ ++ err := c.c.WriteMessage(websocket.BinaryMessage, frame) ++ if err != nil { ++ return 0, err ++ } ++ return len(p), nil ++} +diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go +index 2b033e2c..1d19fed6 100644 +--- a/pkg/kubelet/server/server.go ++++ b/pkg/kubelet/server/server.go +@@ -782,26 +782,39 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response) + + // getExec handles requests to run a command inside a container. + func (s *Server) getExec(request *restful.Request, response *restful.Response) { +- params := getExecRequestParams(request) + streamOpts, err := remotecommandserver.NewOptions(request.Request) + if err != nil { + utilruntime.HandleError(err) + response.WriteError(http.StatusBadRequest, err) + return + } ++ url, err := s.getExecUrl(request, response, streamOpts) ++ if err != nil { ++ klog.Errorf("failed to get backend url %v", err) ++ return ++ } ++ if url.Scheme == "ws" || url.Scheme == "wss" { ++ remotecommandserver.ProxyToWebSocket(response.ResponseWriter, request.Request, url, streamOpts) ++ } else { ++ proxyStream(response.ResponseWriter, request.Request, url) ++ } ++} ++ ++func (s *Server) getExecUrl(request *restful.Request, response *restful.Response, streamOpts *remotecommandserver.Options) (*url.URL, error) { ++ params := getExecRequestParams(request) + pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) + if !ok { + response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) +- return ++ return nil, fmt.Errorf("pod not found") + } + + podFullName := kubecontainer.GetPodFullName(pod) + url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) + if err != nil { + streaming.WriteError(err, response.ResponseWriter) +- return ++ return nil, err + } +- proxyStream(response.ResponseWriter, request.Request, url) ++ return url, nil + } + + // getRun handles requests to run a command inside a container. +-- +2.20.1 + diff --git a/kubernetes.spec b/kubernetes.spec index 2651a0b..0965a98 100644 --- a/kubernetes.spec +++ b/kubernetes.spec @@ -3,7 +3,7 @@ Name: kubernetes Version: 1.20.2 -Release: 1 +Release: 2 Summary: Container cluster management License: ASL 2.0 URL: https://k8s.io/kubernetes @@ -24,6 +24,8 @@ Source13: kubernetes-accounting.conf Source14: kubeadm.conf Source15: kubernetes.conf +Patch6000: 0001-kubelet-support-exec-websocket-protocol.patch + %description Container cluster management. @@ -83,7 +85,7 @@ Summary: Help documents for kubernetes Help documents for kubernetes. %prep -%setup -q -n kubernetes-1.20.2 +%autosetup -n kubernetes-1.20.2 -Sgit -p1 mkdir -p src/k8s.io/kubernetes mv $(ls | grep -v "^src$") src/k8s.io/kubernetes/. @@ -252,6 +254,9 @@ getent passwd kube >/dev/null || useradd -r -g kube -d / -s /sbin/nologin \ %systemd_postun kubelet kube-proxy %changelog +* Thu Feb 2 2021 gaohuatao - 1.20.2-2 +- Add kubelet support ws + * Fri Jan 20 2021 lixiang - 1.20.2-1 - Bump version to v1.20.2 -- Gitee