diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 36b5e9176b0..81f922f37b0 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -256,7 +256,20 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, errMsg, statusCode) return } - defer backend.Close() + isExec := strings.HasSuffix(strings.TrimRight(r.URL.Path, "/"), "/exec") + var backendWriteMutex sync.Mutex // Protects backend writes from copyMsgs and deferred exec cleanup + defer func() { + if isExec { + backendWriteMutex.Lock() + _ = backend.SetWriteDeadline(time.Now().Add(websocketTimeout)) + sendExecExitCommand(backend) + _ = backend.SetWriteDeadline(time.Time{}) + backendWriteMutex.Unlock() + } + closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + _ = backend.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(websocketTimeout)) + backend.Close() + }() upgrader := &websocket.Upgrader{ Subprotocols: []string{subProtocol}, @@ -295,7 +308,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Can't just use io.Copy here since browsers care about frame headers. go func() { errc <- copyMsgs(nil, frontend, backend) }() - go func() { errc <- copyMsgs(&writeMutex, backend, frontend) }() + go func() { errc <- copyMsgs(&backendWriteMutex, backend, frontend) }() for { select { @@ -314,6 +327,31 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +// sendExecExitCommand sends "exit\r" to the exec session's STDIN channel +// to terminate the shell process and prevent orphaned processes when the +// frontend WebSocket disconnects. +func sendExecExitCommand(backend *websocket.Conn) { + var msg []byte + var msgType int + + switch backend.Subprotocol() { + case "base64.channel.k8s.io": + exitCmd := base64.StdEncoding.EncodeToString([]byte("exit\r")) + msg = []byte("0" + exitCmd) + msgType = websocket.TextMessage + case "v4.channel.k8s.io", "v5.channel.k8s.io": + msg = append([]byte{0}, []byte("exit\r")...) + msgType = websocket.BinaryMessage + default: + klog.V(4).Infof("Skipping exec exit command for unsupported websocket subprotocol: %q", backend.Subprotocol()) + return + } + + if err := backend.WriteMessage(msgType, msg); err != nil { + klog.V(4).Infof("Failed to send exit command to exec session: %v", err) + } +} + func copyMsgs(writeMutex *sync.Mutex, dest, src *websocket.Conn) error { for { messageType, msg, err := src.ReadMessage() @@ -325,7 +363,9 @@ func copyMsgs(writeMutex *sync.Mutex, dest, src *websocket.Conn) error { err = dest.WriteMessage(messageType, msg) } else { writeMutex.Lock() + _ = dest.SetWriteDeadline(time.Now().Add(websocketTimeout)) err = dest.WriteMessage(messageType, msg) + _ = dest.SetWriteDeadline(time.Time{}) writeMutex.Unlock() }