Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,20 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, errMsg, statusCode)
return
}
defer backend.Close()
isExec := strings.HasSuffix(strings.TrimRight(r.URL.Path, "/"), "/exec")
var backendWriteMutex sync.Mutex // Protects backend writes from copyMsgs and deferred exec cleanup
defer func() {
if isExec {
backendWriteMutex.Lock()
_ = backend.SetWriteDeadline(time.Now().Add(websocketTimeout))
sendExecExitCommand(backend)
_ = backend.SetWriteDeadline(time.Time{})
backendWriteMutex.Unlock()
}
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
_ = backend.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(websocketTimeout))
backend.Close()
}()

upgrader := &websocket.Upgrader{
Subprotocols: []string{subProtocol},
Expand Down Expand Up @@ -295,7 +308,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Can't just use io.Copy here since browsers care about frame headers.
go func() { errc <- copyMsgs(nil, frontend, backend) }()
go func() { errc <- copyMsgs(&writeMutex, backend, frontend) }()
go func() { errc <- copyMsgs(&backendWriteMutex, backend, frontend) }()

for {
select {
Expand All @@ -314,6 +327,31 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

// sendExecExitCommand sends "exit\r" to the exec session's STDIN channel
// to terminate the shell process and prevent orphaned processes when the
// frontend WebSocket disconnects.
func sendExecExitCommand(backend *websocket.Conn) {
var msg []byte
var msgType int

switch backend.Subprotocol() {
case "base64.channel.k8s.io":
exitCmd := base64.StdEncoding.EncodeToString([]byte("exit\r"))
msg = []byte("0" + exitCmd)
msgType = websocket.TextMessage
case "v4.channel.k8s.io", "v5.channel.k8s.io":
msg = append([]byte{0}, []byte("exit\r")...)
msgType = websocket.BinaryMessage
default:
klog.V(4).Infof("Skipping exec exit command for unsupported websocket subprotocol: %q", backend.Subprotocol())
return
}

if err := backend.WriteMessage(msgType, msg); err != nil {
klog.V(4).Infof("Failed to send exit command to exec session: %v", err)
}
}

func copyMsgs(writeMutex *sync.Mutex, dest, src *websocket.Conn) error {
for {
messageType, msg, err := src.ReadMessage()
Expand All @@ -325,7 +363,9 @@ func copyMsgs(writeMutex *sync.Mutex, dest, src *websocket.Conn) error {
err = dest.WriteMessage(messageType, msg)
} else {
writeMutex.Lock()
_ = dest.SetWriteDeadline(time.Now().Add(websocketTimeout))
err = dest.WriteMessage(messageType, msg)
_ = dest.SetWriteDeadline(time.Time{})
writeMutex.Unlock()
}

Expand Down