diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index fe3511e4b21..5ea61dc09b0 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -6,6 +6,7 @@ ### CLI * Show a once-per-day notice after a command when a newer CLI release is available, with a link to the release and the upgrade command for the detected install method. Suppressed for non-interactive/CI runs, JSON output, the Databricks Runtime, and development builds, and can be disabled with `DATABRICKS_CLI_DISABLE_UPDATE_CHECK` ([#5470](https://github.com/databricks/cli/pull/5470)). +* Improved error messages for `ssh connect`: when an SSH connection attempt fails, the client now fetches and prints the server's recent error logs ([#5555](https://github.com/databricks/cli/pull/5555)). ### Bundles * Remove API enum values and types that are still in development from the `databricks-bundles` Python package; these were never accepted by the backend ([#5484](https://github.com/databricks/cli/pull/5484)). diff --git a/experimental/ssh/FAILURE_MODES.md b/experimental/ssh/FAILURE_MODES.md index 18249cc0a5d..d7fd5119c93 100644 --- a/experimental/ssh/FAILURE_MODES.md +++ b/experimental/ssh/FAILURE_MODES.md @@ -82,7 +82,10 @@ job state. **With the error-handling improvements** the client aborts after a handshake timeout (no SSH banner from the server) with an actionable hint to install `openssh-server`, and exits -promptly instead of hanging. +promptly instead of hanging. The server also keeps its recent warning/error log lines in +memory and serves them at `/logs` (next to `/metadata`); when `ssh` exits with a +connection-level failure (code 255), the client fetches that endpoint and prints the server's +actual errors in the terminal — the job keeps running throughout. **Fix.** Install `openssh-server` in the image (`apt-get install -y openssh-server`). diff --git a/experimental/ssh/internal/client/client.go b/experimental/ssh/internal/client/client.go index 00c1e05d0d0..fd9265d1222 100644 --- a/experimental/ssh/internal/client/client.go +++ b/experimental/ssh/internal/client/client.go @@ -374,7 +374,7 @@ func Run(ctx context.Context, client *databricks.WorkspaceClient, opts ClientOpt return runIDE(ctx, client, userName, keyPath, serverPort, clusterID, opts) } else { log.Infof(ctx, "Additional SSH arguments: %v", opts.AdditionalArgs) - return spawnSSHClient(ctx, userName, keyPath, serverPort, clusterID, opts) + return spawnSSHClient(ctx, client, userName, keyPath, serverPort, clusterID, opts) } } @@ -452,22 +452,11 @@ func getServerMetadata(ctx context.Context, client *databricks.WorkspaceClient, return 0, "", "", errors.Join(errServerMetadata, errors.New("cluster ID not available in metadata")) } - workspaceID, err := auth.ResolveWorkspaceID(ctx, client) - if err != nil { - return 0, "", "", err - } - metadataURL := fmt.Sprintf("%s/driver-proxy-api/o/%s/%s/%d/metadata", client.Config.Host, workspaceID, effectiveClusterID, wsMetadata.Port) - log.Debugf(ctx, "Metadata URL: %s", metadataURL) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, metadataURL, nil) + req, err := newDriverProxyRequest(ctx, client, effectiveClusterID, wsMetadata.Port, "metadata", liteswap) if err != nil { return 0, "", "", err } - if liteswap != "" { - req.Header.Set("x-databricks-traffic-id", "testenv://liteswap/"+liteswap) - } - if err := client.Config.Authenticate(req); err != nil { - return 0, "", "", err - } + log.Debugf(ctx, "Metadata URL: %s", req.URL) httpClient := &http.Client{Transport: client.Config.HTTPTransport} resp, err := httpClient.Do(req) if err != nil { @@ -489,6 +478,55 @@ func getServerMetadata(ctx context.Context, client *databricks.WorkspaceClient, return wsMetadata.Port, string(bodyBytes), effectiveClusterID, nil } +// newDriverProxyRequest builds an authenticated GET request to one of the SSH server's +// HTTP endpoints behind the workspace driver proxy. +func newDriverProxyRequest(ctx context.Context, client *databricks.WorkspaceClient, clusterID string, port int, endpoint, liteswap string) (*http.Request, error) { + workspaceID, err := auth.ResolveWorkspaceID(ctx, client) + if err != nil { + return nil, err + } + url := fmt.Sprintf("%s/driver-proxy-api/o/%s/%s/%d/%s", client.Config.Host, workspaceID, clusterID, port, endpoint) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + if liteswap != "" { + req.Header.Set("x-databricks-traffic-id", "testenv://liteswap/"+liteswap) + } + if err := client.Config.Authenticate(req); err != nil { + return nil, err + } + return req, nil +} + +// fetchServerErrorLogs fetches recent warning/error log lines from the running SSH +// server's /logs endpoint. It is best-effort: any failure (including older server +// versions that don't serve /logs) yields an empty string. +func fetchServerErrorLogs(ctx context.Context, client *databricks.WorkspaceClient, clusterID string, serverPort int, liteswap string) string { + req, err := newDriverProxyRequest(ctx, client, clusterID, serverPort, "logs", liteswap) + if err != nil { + log.Debugf(ctx, "Failed to build server logs request: %v", err) + return "" + } + httpClient := &http.Client{Transport: client.Config.HTTPTransport} + resp, err := httpClient.Do(req) + if err != nil { + log.Debugf(ctx, "Failed to fetch server logs: %v", err) + return "" + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + log.Debugf(ctx, "Server logs endpoint returned status %d", resp.StatusCode) + return "" + } + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Debugf(ctx, "Failed to read server logs response: %v", err) + return "" + } + return strings.TrimSpace(string(body)) +} + // submitSSHTunnelJob submits the bootstrap job and waits for the SSH server task to start. // It returns the job run ID (when known) so callers can fetch and surface the run's error // details if the server never comes up. @@ -581,7 +619,7 @@ func submitSSHTunnelJob(ctx context.Context, client *databricks.WorkspaceClient, return waiter.RunId, waitForJobToStart(ctx, client, waiter.RunId, opts.TaskStartupTimeout) } -func spawnSSHClient(ctx context.Context, userName, privateKeyPath string, serverPort int, clusterID string, opts ClientOptions) error { +func spawnSSHClient(ctx context.Context, client *databricks.WorkspaceClient, userName, privateKeyPath string, serverPort int, clusterID string, opts ClientOptions) error { // Create a copy with metadata for the ProxyCommand optsWithMetadata := opts optsWithMetadata.ServerMetadata = FormatMetadata(userName, serverPort, clusterID) @@ -616,14 +654,19 @@ func spawnSSHClient(ctx context.Context, userName, privateKeyPath string, server err = sshCmd.Run() // ssh reserves exit code 255 for its own connection-level failures (a remote command's exit - // code is passed through as-is, 0-254). The most common cause here is the cluster's container - // image missing an OpenSSH server, so the server can't launch sshd once we connect — the - // connection then drops right after "Connected!". Surface an actionable hint rather than - // leaving the user with ssh's opaque "Connection closed" message. + // code is passed through as-is, 0-254). The server keeps running after a failed connection + // attempt, so its error (e.g. sshd missing from the container image) is only visible in its + // own logs — fetch them from the /logs endpoint and show them instead of leaving the user + // with ssh's opaque "Connection closed" message. if exitErr, ok := errors.AsType[*exec.ExitError](err); ok && exitErr.ExitCode() == 255 { - cmdio.LogString(ctx, cmdio.Yellow(ctx, "The SSH connection closed unexpectedly. If it dropped right after connecting, "+ - "the cluster's container image is likely missing an OpenSSH server: ensure 'openssh-server' "+ - "is installed (it provides /usr/sbin/sshd), then check the SSH server job run logs.")) + if logs := fetchServerErrorLogs(ctx, client, clusterID, serverPort, opts.Liteswap); logs != "" { + cmdio.LogString(ctx, cmdio.Yellow(ctx, "The SSH connection closed unexpectedly. Recent SSH server errors:")) + cmdio.LogString(ctx, truncateTail(logs, maxRunFailureTraceBytes)) + } else { + cmdio.LogString(ctx, cmdio.Yellow(ctx, "The SSH connection closed unexpectedly. If it dropped right after connecting, "+ + "the cluster's container image is likely missing an OpenSSH server: ensure 'openssh-server' "+ + "is installed (it provides /usr/sbin/sshd), then check the SSH server job run logs.")) + } } return err } diff --git a/experimental/ssh/internal/server/logs.go b/experimental/ssh/internal/server/logs.go new file mode 100644 index 00000000000..82b6276d622 --- /dev/null +++ b/experimental/ssh/internal/server/logs.go @@ -0,0 +1,106 @@ +package server + +import ( + "context" + "errors" + "io" + "log/slog" + "net/http" + "strings" + "sync" + + "github.com/databricks/cli/libs/log" +) + +// maxLogBufferBytes bounds the in-memory log buffer served at /logs. +const maxLogBufferBytes = 16 * 1024 + +// logBuffer accumulates recent log records in memory so the client can fetch them +// via the /logs endpoint after a failed connection attempt. The Jobs API exposes no +// stdout logs for a running notebook task, so this is the only way for "ssh connect" +// to read the server's errors while the bootstrap job is still alive. +type logBuffer struct { + mu sync.Mutex + lines []string + size int + limit int +} + +func newLogBuffer(limit int) *logBuffer { + return &logBuffer{limit: limit} +} + +// Write implements io.Writer; slog handlers emit one Write call per record. +func (b *logBuffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + b.lines = append(b.lines, string(p)) + b.size += len(p) + for b.size > b.limit && len(b.lines) > 0 { + b.size -= len(b.lines[0]) + b.lines = b.lines[1:] + } + return len(p), nil +} + +func (b *logBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return strings.Join(b.lines, "") +} + +func (b *logBuffer) serveHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + if _, err := io.WriteString(w, b.String()); err != nil { + http.Error(w, "Failed to write logs", http.StatusInternalServerError) + } +} + +// captureWarnLogs returns a context whose logger also records warning-and-above +// log lines into the returned buffer. +func captureWarnLogs(ctx context.Context) (context.Context, *logBuffer) { + buf := newLogBuffer(maxLogBufferBytes) + bufHandler := slog.NewTextHandler(buf, &slog.HandlerOptions{Level: slog.LevelWarn}) + logger := slog.New(teeHandler{[]slog.Handler{log.GetLogger(ctx).Handler(), bufHandler}}) + return log.NewContext(ctx, logger), buf +} + +// teeHandler forwards log records to all underlying handlers. +type teeHandler struct { + handlers []slog.Handler +} + +func (t teeHandler) Enabled(ctx context.Context, level slog.Level) bool { + for _, h := range t.handlers { + if h.Enabled(ctx, level) { + return true + } + } + return false +} + +func (t teeHandler) Handle(ctx context.Context, r slog.Record) error { + var errs []error + for _, h := range t.handlers { + if h.Enabled(ctx, r.Level) { + errs = append(errs, h.Handle(ctx, r.Clone())) + } + } + return errors.Join(errs...) +} + +func (t teeHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + handlers := make([]slog.Handler, len(t.handlers)) + for i, h := range t.handlers { + handlers[i] = h.WithAttrs(attrs) + } + return teeHandler{handlers} +} + +func (t teeHandler) WithGroup(name string) slog.Handler { + handlers := make([]slog.Handler, len(t.handlers)) + for i, h := range t.handlers { + handlers[i] = h.WithGroup(name) + } + return teeHandler{handlers} +} diff --git a/experimental/ssh/internal/server/logs_test.go b/experimental/ssh/internal/server/logs_test.go new file mode 100644 index 00000000000..090948505ae --- /dev/null +++ b/experimental/ssh/internal/server/logs_test.go @@ -0,0 +1,64 @@ +package server + +import ( + "io" + "log/slog" + "net/http" + "net/http/httptest" + "testing" + + "github.com/databricks/cli/libs/log" + "github.com/stretchr/testify/assert" +) + +func TestLogBufferEvictsOldestLines(t *testing.T) { + buf := newLogBuffer(25) + for _, line := range []string{"first line\n", "second line\n", "third line\n"} { + _, err := buf.Write([]byte(line)) + assert.NoError(t, err) + } + out := buf.String() + assert.NotContains(t, out, "first line") + assert.Contains(t, out, "second line") + assert.Contains(t, out, "third line") +} + +func TestCaptureWarnLogsRecordsWarnAndAbove(t *testing.T) { + ctx := log.NewContext(t.Context(), slog.New(slog.NewTextHandler(io.Discard, nil))) + ctx, buf := captureWarnLogs(ctx) + + log.Infof(ctx, "info message") + log.Warnf(ctx, "warn message") + log.Errorf(ctx, "error message") + + out := buf.String() + assert.NotContains(t, out, "info message") + assert.Contains(t, out, "warn message") + assert.Contains(t, out, "error message") +} + +func TestCaptureWarnLogsKeepsHandlerAttrs(t *testing.T) { + ctx := log.NewContext(t.Context(), slog.New(slog.NewTextHandler(io.Discard, nil))) + ctx, buf := captureWarnLogs(ctx) + // Mirrors how the proxy server scopes its logger per connection. + ctx = log.NewContext(ctx, log.GetLogger(ctx).With("session", "abc")) + + log.Errorf(ctx, "connection failed") + + out := buf.String() + assert.Contains(t, out, "connection failed") + assert.Contains(t, out, "session=abc") +} + +func TestLogBufferServeHTTP(t *testing.T) { + ctx := log.NewContext(t.Context(), slog.New(slog.NewTextHandler(io.Discard, nil))) + ctx, buf := captureWarnLogs(ctx) + log.Errorf(ctx, "boom") + + rec := httptest.NewRecorder() + buf.serveHTTP(rec, httptest.NewRequest(http.MethodGet, "/logs", nil)) + + assert.Equal(t, 200, rec.Code) + assert.Equal(t, "text/plain; charset=utf-8", rec.Header().Get("Content-Type")) + assert.Contains(t, rec.Body.String(), "boom") +} diff --git a/experimental/ssh/internal/server/server.go b/experimental/ssh/internal/server/server.go index 6cf137618d9..b07b6863c00 100644 --- a/experimental/ssh/internal/server/server.go +++ b/experimental/ssh/internal/server/server.go @@ -54,6 +54,8 @@ type ServerOptions struct { } func Run(ctx context.Context, client *databricks.WorkspaceClient, opts ServerOptions) error { + ctx, logBuf := captureWarnLogs(ctx) + port, err := findAvailablePort(opts.DefaultPort, opts.PortRange) if err != nil { return fmt.Errorf("failed to find available port: %w", err) @@ -88,9 +90,11 @@ func Run(ctx context.Context, client *databricks.WorkspaceClient, opts ServerOpt connections := proxy.NewConnectionsManager(opts.MaxClients, opts.ShutdownDelay) http.Handle("/ssh", proxy.NewProxyServer(ctx, connections, createServerCommand)) http.HandleFunc("/metadata", serveMetadata) + http.HandleFunc("/logs", logBuf.serveHTTP) http.Handle("/driver-proxy-http/ssh", proxy.NewProxyServer(ctx, connections, createServerCommand)) http.HandleFunc("/driver-proxy-http/metadata", serveMetadata) + http.HandleFunc("/driver-proxy-http/logs", logBuf.serveHTTP) go handleTimeout(ctx, connections.TimedOut, opts.ShutdownDelay)