Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

### CLI
* Show a once-per-day notice after a command when a newer CLI release is available, with a link to the release and the upgrade command for the detected install method. Suppressed for non-interactive/CI runs, JSON output, the Databricks Runtime, and development builds, and can be disabled with `DATABRICKS_CLI_DISABLE_UPDATE_CHECK` ([#5470](https://github.com/databricks/cli/pull/5470)).
* Improved error messages for `ssh connect`: when an SSH connection attempt fails, the client now fetches and prints the server's recent error logs ([#5555](https://github.com/databricks/cli/pull/5555)).

### Bundles
* Remove API enum values and types that are still in development from the `databricks-bundles` Python package; these were never accepted by the backend ([#5484](https://github.com/databricks/cli/pull/5484)).
Expand Down
5 changes: 4 additions & 1 deletion experimental/ssh/FAILURE_MODES.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ job state.

**With the error-handling improvements** the client aborts after a handshake timeout (no SSH
banner from the server) with an actionable hint to install `openssh-server`, and exits
promptly instead of hanging.
promptly instead of hanging. The server also keeps its recent warning/error log lines in
memory and serves them at `/logs` (next to `/metadata`); when `ssh` exits with a
connection-level failure (code 255), the client fetches that endpoint and prints the server's
actual errors in the terminal — the job keeps running throughout.

**Fix.** Install `openssh-server` in the image (`apt-get install -y openssh-server`).

Expand Down
87 changes: 65 additions & 22 deletions experimental/ssh/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func Run(ctx context.Context, client *databricks.WorkspaceClient, opts ClientOpt
return runIDE(ctx, client, userName, keyPath, serverPort, clusterID, opts)
} else {
log.Infof(ctx, "Additional SSH arguments: %v", opts.AdditionalArgs)
return spawnSSHClient(ctx, userName, keyPath, serverPort, clusterID, opts)
return spawnSSHClient(ctx, client, userName, keyPath, serverPort, clusterID, opts)
}
}

Expand Down Expand Up @@ -452,22 +452,11 @@ func getServerMetadata(ctx context.Context, client *databricks.WorkspaceClient,
return 0, "", "", errors.Join(errServerMetadata, errors.New("cluster ID not available in metadata"))
}

workspaceID, err := auth.ResolveWorkspaceID(ctx, client)
if err != nil {
return 0, "", "", err
}
metadataURL := fmt.Sprintf("%s/driver-proxy-api/o/%s/%s/%d/metadata", client.Config.Host, workspaceID, effectiveClusterID, wsMetadata.Port)
log.Debugf(ctx, "Metadata URL: %s", metadataURL)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, metadataURL, nil)
req, err := newDriverProxyRequest(ctx, client, effectiveClusterID, wsMetadata.Port, "metadata", liteswap)
if err != nil {
return 0, "", "", err
}
if liteswap != "" {
req.Header.Set("x-databricks-traffic-id", "testenv://liteswap/"+liteswap)
}
if err := client.Config.Authenticate(req); err != nil {
return 0, "", "", err
}
log.Debugf(ctx, "Metadata URL: %s", req.URL)
httpClient := &http.Client{Transport: client.Config.HTTPTransport}
resp, err := httpClient.Do(req)
if err != nil {
Expand All @@ -489,6 +478,55 @@ func getServerMetadata(ctx context.Context, client *databricks.WorkspaceClient,
return wsMetadata.Port, string(bodyBytes), effectiveClusterID, nil
}

// newDriverProxyRequest builds an authenticated GET request to one of the SSH server's
// HTTP endpoints behind the workspace driver proxy.
func newDriverProxyRequest(ctx context.Context, client *databricks.WorkspaceClient, clusterID string, port int, endpoint, liteswap string) (*http.Request, error) {
workspaceID, err := auth.ResolveWorkspaceID(ctx, client)
if err != nil {
return nil, err
}
url := fmt.Sprintf("%s/driver-proxy-api/o/%s/%s/%d/%s", client.Config.Host, workspaceID, clusterID, port, endpoint)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
if liteswap != "" {
req.Header.Set("x-databricks-traffic-id", "testenv://liteswap/"+liteswap)
}
if err := client.Config.Authenticate(req); err != nil {
return nil, err
}
return req, nil
}

// fetchServerErrorLogs fetches recent warning/error log lines from the running SSH
// server's /logs endpoint. It is best-effort: any failure (including older server
// versions that don't serve /logs) yields an empty string.
func fetchServerErrorLogs(ctx context.Context, client *databricks.WorkspaceClient, clusterID string, serverPort int, liteswap string) string {
req, err := newDriverProxyRequest(ctx, client, clusterID, serverPort, "logs", liteswap)
if err != nil {
log.Debugf(ctx, "Failed to build server logs request: %v", err)
return ""
}
httpClient := &http.Client{Transport: client.Config.HTTPTransport}
resp, err := httpClient.Do(req)
if err != nil {
log.Debugf(ctx, "Failed to fetch server logs: %v", err)
return ""
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Debugf(ctx, "Server logs endpoint returned status %d", resp.StatusCode)
return ""
}
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Debugf(ctx, "Failed to read server logs response: %v", err)
return ""
}
return strings.TrimSpace(string(body))
}

// submitSSHTunnelJob submits the bootstrap job and waits for the SSH server task to start.
// It returns the job run ID (when known) so callers can fetch and surface the run's error
// details if the server never comes up.
Expand Down Expand Up @@ -581,7 +619,7 @@ func submitSSHTunnelJob(ctx context.Context, client *databricks.WorkspaceClient,
return waiter.RunId, waitForJobToStart(ctx, client, waiter.RunId, opts.TaskStartupTimeout)
}

func spawnSSHClient(ctx context.Context, userName, privateKeyPath string, serverPort int, clusterID string, opts ClientOptions) error {
func spawnSSHClient(ctx context.Context, client *databricks.WorkspaceClient, userName, privateKeyPath string, serverPort int, clusterID string, opts ClientOptions) error {
// Create a copy with metadata for the ProxyCommand
optsWithMetadata := opts
optsWithMetadata.ServerMetadata = FormatMetadata(userName, serverPort, clusterID)
Expand Down Expand Up @@ -616,14 +654,19 @@ func spawnSSHClient(ctx context.Context, userName, privateKeyPath string, server

err = sshCmd.Run()
// ssh reserves exit code 255 for its own connection-level failures (a remote command's exit
// code is passed through as-is, 0-254). The most common cause here is the cluster's container
// image missing an OpenSSH server, so the server can't launch sshd once we connect — the
// connection then drops right after "Connected!". Surface an actionable hint rather than
// leaving the user with ssh's opaque "Connection closed" message.
// code is passed through as-is, 0-254). The server keeps running after a failed connection
// attempt, so its error (e.g. sshd missing from the container image) is only visible in its
// own logs — fetch them from the /logs endpoint and show them instead of leaving the user
// with ssh's opaque "Connection closed" message.
if exitErr, ok := errors.AsType[*exec.ExitError](err); ok && exitErr.ExitCode() == 255 {
cmdio.LogString(ctx, cmdio.Yellow(ctx, "The SSH connection closed unexpectedly. If it dropped right after connecting, "+
"the cluster's container image is likely missing an OpenSSH server: ensure 'openssh-server' "+
"is installed (it provides /usr/sbin/sshd), then check the SSH server job run logs."))
if logs := fetchServerErrorLogs(ctx, client, clusterID, serverPort, opts.Liteswap); logs != "" {
cmdio.LogString(ctx, cmdio.Yellow(ctx, "The SSH connection closed unexpectedly. Recent SSH server errors:"))
cmdio.LogString(ctx, truncateTail(logs, maxRunFailureTraceBytes))
} else {
cmdio.LogString(ctx, cmdio.Yellow(ctx, "The SSH connection closed unexpectedly. If it dropped right after connecting, "+
"the cluster's container image is likely missing an OpenSSH server: ensure 'openssh-server' "+
"is installed (it provides /usr/sbin/sshd), then check the SSH server job run logs."))
}
}
return err
}
Expand Down
106 changes: 106 additions & 0 deletions experimental/ssh/internal/server/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package server

import (
"context"
"errors"
"io"
"log/slog"
"net/http"
"strings"
"sync"

"github.com/databricks/cli/libs/log"
)

// maxLogBufferBytes bounds the in-memory log buffer served at /logs.
const maxLogBufferBytes = 16 * 1024

// logBuffer accumulates recent log records in memory so the client can fetch them
// via the /logs endpoint after a failed connection attempt. The Jobs API exposes no
// stdout logs for a running notebook task, so this is the only way for "ssh connect"
// to read the server's errors while the bootstrap job is still alive.
type logBuffer struct {
mu sync.Mutex
lines []string
size int
limit int
}

func newLogBuffer(limit int) *logBuffer {
return &logBuffer{limit: limit}
}

// Write implements io.Writer; slog handlers emit one Write call per record.
func (b *logBuffer) Write(p []byte) (int, error) {
b.mu.Lock()
defer b.mu.Unlock()
b.lines = append(b.lines, string(p))
b.size += len(p)
for b.size > b.limit && len(b.lines) > 0 {
b.size -= len(b.lines[0])
b.lines = b.lines[1:]
}
return len(p), nil
}

func (b *logBuffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return strings.Join(b.lines, "")
}

func (b *logBuffer) serveHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
if _, err := io.WriteString(w, b.String()); err != nil {
http.Error(w, "Failed to write logs", http.StatusInternalServerError)
}
}

// captureWarnLogs returns a context whose logger also records warning-and-above
// log lines into the returned buffer.
func captureWarnLogs(ctx context.Context) (context.Context, *logBuffer) {
buf := newLogBuffer(maxLogBufferBytes)
bufHandler := slog.NewTextHandler(buf, &slog.HandlerOptions{Level: slog.LevelWarn})
logger := slog.New(teeHandler{[]slog.Handler{log.GetLogger(ctx).Handler(), bufHandler}})
return log.NewContext(ctx, logger), buf
}

// teeHandler forwards log records to all underlying handlers.
type teeHandler struct {
handlers []slog.Handler
}

func (t teeHandler) Enabled(ctx context.Context, level slog.Level) bool {
for _, h := range t.handlers {
if h.Enabled(ctx, level) {
return true
}
}
return false
}

func (t teeHandler) Handle(ctx context.Context, r slog.Record) error {
var errs []error
for _, h := range t.handlers {
if h.Enabled(ctx, r.Level) {
errs = append(errs, h.Handle(ctx, r.Clone()))
}
}
return errors.Join(errs...)
}

func (t teeHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
handlers := make([]slog.Handler, len(t.handlers))
for i, h := range t.handlers {
handlers[i] = h.WithAttrs(attrs)
}
return teeHandler{handlers}
}

func (t teeHandler) WithGroup(name string) slog.Handler {
handlers := make([]slog.Handler, len(t.handlers))
for i, h := range t.handlers {
handlers[i] = h.WithGroup(name)
}
return teeHandler{handlers}
}
64 changes: 64 additions & 0 deletions experimental/ssh/internal/server/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package server

import (
"io"
"log/slog"
"net/http"
"net/http/httptest"
"testing"

"github.com/databricks/cli/libs/log"
"github.com/stretchr/testify/assert"
)

func TestLogBufferEvictsOldestLines(t *testing.T) {
buf := newLogBuffer(25)
for _, line := range []string{"first line\n", "second line\n", "third line\n"} {
_, err := buf.Write([]byte(line))
assert.NoError(t, err)
}
out := buf.String()
assert.NotContains(t, out, "first line")
assert.Contains(t, out, "second line")
assert.Contains(t, out, "third line")
}

func TestCaptureWarnLogsRecordsWarnAndAbove(t *testing.T) {
ctx := log.NewContext(t.Context(), slog.New(slog.NewTextHandler(io.Discard, nil)))
ctx, buf := captureWarnLogs(ctx)

log.Infof(ctx, "info message")
log.Warnf(ctx, "warn message")
log.Errorf(ctx, "error message")

out := buf.String()
assert.NotContains(t, out, "info message")
assert.Contains(t, out, "warn message")
assert.Contains(t, out, "error message")
}

func TestCaptureWarnLogsKeepsHandlerAttrs(t *testing.T) {
ctx := log.NewContext(t.Context(), slog.New(slog.NewTextHandler(io.Discard, nil)))
ctx, buf := captureWarnLogs(ctx)
// Mirrors how the proxy server scopes its logger per connection.
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("session", "abc"))

log.Errorf(ctx, "connection failed")

out := buf.String()
assert.Contains(t, out, "connection failed")
assert.Contains(t, out, "session=abc")
}

func TestLogBufferServeHTTP(t *testing.T) {
ctx := log.NewContext(t.Context(), slog.New(slog.NewTextHandler(io.Discard, nil)))
ctx, buf := captureWarnLogs(ctx)
log.Errorf(ctx, "boom")

rec := httptest.NewRecorder()
buf.serveHTTP(rec, httptest.NewRequest(http.MethodGet, "/logs", nil))

assert.Equal(t, 200, rec.Code)
assert.Equal(t, "text/plain; charset=utf-8", rec.Header().Get("Content-Type"))
assert.Contains(t, rec.Body.String(), "boom")
}
4 changes: 4 additions & 0 deletions experimental/ssh/internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type ServerOptions struct {
}

func Run(ctx context.Context, client *databricks.WorkspaceClient, opts ServerOptions) error {
ctx, logBuf := captureWarnLogs(ctx)

port, err := findAvailablePort(opts.DefaultPort, opts.PortRange)
if err != nil {
return fmt.Errorf("failed to find available port: %w", err)
Expand Down Expand Up @@ -88,9 +90,11 @@ func Run(ctx context.Context, client *databricks.WorkspaceClient, opts ServerOpt
connections := proxy.NewConnectionsManager(opts.MaxClients, opts.ShutdownDelay)
http.Handle("/ssh", proxy.NewProxyServer(ctx, connections, createServerCommand))
http.HandleFunc("/metadata", serveMetadata)
http.HandleFunc("/logs", logBuf.serveHTTP)

http.Handle("/driver-proxy-http/ssh", proxy.NewProxyServer(ctx, connections, createServerCommand))
http.HandleFunc("/driver-proxy-http/metadata", serveMetadata)
http.HandleFunc("/driver-proxy-http/logs", logBuf.serveHTTP)

go handleTimeout(ctx, connections.TimedOut, opts.ShutdownDelay)

Expand Down
Loading