Skip to content

Commit 73eee61

Browse files
committed
πŸ€– feat: SSH connection pool with backoff and singleflighting
Prevents thundering herd issues with SSH connections by: - Adding SSHConnectionPool class with health tracking - Implementing exponential backoff (1s β†’ 5s β†’ 10s β†’ 20s β†’ 40s β†’ 60s cap) - Singleflighting concurrent connection attempts to same host - Probing unknown connections before first use - Skipping probes for known-healthy connections Integration points: - SSHRuntime.exec() and execSSHCommand() call acquireConnection() - PTYService calls acquireConnection() before spawning SSH terminals _Generated with mux_
1 parent ea846e2 commit 73eee61

File tree

5 files changed

+462
-31
lines changed

5 files changed

+462
-31
lines changed

β€Žsrc/node/runtime/SSHRuntime.tsβ€Ž

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { expandTildeForSSH, cdCommandForSSH } from "./tildeExpansion";
2727
import { getProjectName } from "@/node/utils/runtime/helpers";
2828
import { getErrorMessage } from "@/common/utils/errors";
2929
import { execAsync, DisposableProcess } from "@/node/utils/disposableExec";
30-
import { getControlPath } from "./sshConnectionPool";
30+
import { getControlPath, sshConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
3131
import { getBashPath } from "@/node/utils/main/bashPath";
3232
import { SSHBackgroundHandle } from "./SSHBackgroundHandle";
3333
import { execBuffered } from "@/node/utils/runtime/helpers";
@@ -47,21 +47,8 @@ const shescape = {
4747
},
4848
};
4949

50-
/**
51-
* SSH Runtime Configuration
52-
*/
53-
export interface SSHRuntimeConfig {
54-
/** SSH host (can be hostname, user@host, or SSH config alias) */
55-
host: string;
56-
/** Working directory on remote host */
57-
srcBaseDir: string;
58-
/** Directory on remote for background process output (default: /tmp/mux-bashes) */
59-
bgOutputDir?: string;
60-
/** Optional: Path to SSH private key (if not using ~/.ssh/config or ssh-agent) */
61-
identityFile?: string;
62-
/** Optional: SSH port (default: 22) */
63-
port?: number;
64-
}
50+
// Re-export SSHRuntimeConfig from connection pool (defined there to avoid circular deps)
51+
export type { SSHRuntimeConfig } from "./sshConnectionPool";
6552

6653
/**
6754
* SSH runtime implementation that executes commands and file operations
@@ -132,7 +119,6 @@ export class SSHRuntime implements Runtime {
132119
/**
133120
* Execute command over SSH with streaming I/O
134121
*/
135-
// eslint-disable-next-line @typescript-eslint/require-await
136122
async exec(command: string, options: ExecOptions): Promise<ExecStream> {
137123
const startTime = performance.now();
138124

@@ -141,6 +127,10 @@ export class SSHRuntime implements Runtime {
141127
throw new RuntimeErrorClass("Operation aborted before execution", "exec");
142128
}
143129

130+
// Ensure connection is healthy before executing
131+
// This provides backoff protection and singleflighting for concurrent requests
132+
await sshConnectionPool.acquireConnection(this.config);
133+
144134
// Build command parts
145135
const parts: string[] = [];
146136

@@ -238,11 +228,22 @@ export class SSHRuntime implements Runtime {
238228
resolve(EXIT_CODE_TIMEOUT);
239229
return;
240230
}
241-
resolve(code ?? (signal ? -1 : 0));
231+
232+
const exitCode = code ?? (signal ? -1 : 0);
233+
234+
// SSH exit code 255 indicates connection failure - report to pool for backoff
235+
// This prevents thundering herd when a previously healthy host goes down
236+
if (exitCode === 255) {
237+
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
238+
}
239+
240+
resolve(exitCode);
242241
// Cleanup runs automatically via DisposableProcess
243242
});
244243

245244
sshProcess.on("error", (err) => {
245+
// Spawn errors are connection-level failures
246+
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${err.message}`);
246247
reject(new RuntimeErrorClass(`Failed to execute SSH command: ${err.message}`, "exec", err));
247248
});
248249
});
@@ -553,6 +554,9 @@ export class SSHRuntime implements Runtime {
553554
* @private
554555
*/
555556
private async execSSHCommand(command: string, timeoutMs: number): Promise<string> {
557+
// Ensure connection is healthy before executing
558+
await sshConnectionPool.acquireConnection(this.config, timeoutMs);
559+
556560
const sshArgs = this.buildSSHArgs();
557561
sshArgs.push(this.config.host, command);
558562

@@ -587,6 +591,10 @@ export class SSHRuntime implements Runtime {
587591
if (timedOut) return; // Already rejected
588592

589593
if (code !== 0) {
594+
// SSH exit code 255 indicates connection failure - report to pool for backoff
595+
if (code === 255) {
596+
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
597+
}
590598
reject(new RuntimeErrorClass(`SSH command failed: ${stderr.trim()}`, "network"));
591599
return;
592600
}
@@ -599,6 +607,8 @@ export class SSHRuntime implements Runtime {
599607
clearTimeout(timer);
600608
if (timedOut) return; // Already rejected
601609

610+
// Spawn errors are connection-level failures
611+
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${getErrorMessage(err)}`);
602612
reject(
603613
new RuntimeErrorClass(
604614
`Cannot execute SSH command: ${getErrorMessage(err)}`,

β€Žsrc/node/runtime/sshConnectionPool.test.tsβ€Ž

Lines changed: 149 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as os from "os";
22
import * as path from "path";
3-
import { getControlPath } from "./sshConnectionPool";
4-
import type { SSHRuntimeConfig } from "./SSHRuntime";
3+
import { getControlPath, SSHConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
54

65
describe("sshConnectionPool", () => {
76
describe("getControlPath", () => {
@@ -134,3 +133,151 @@ describe("username isolation", () => {
134133
expect(controlPath).toMatch(/mux-ssh-[a-f0-9]{12}$/);
135134
});
136135
});
136+
137+
describe("SSHConnectionPool", () => {
138+
describe("health tracking", () => {
139+
test("getConnectionHealth returns undefined for unknown connection", () => {
140+
const pool = new SSHConnectionPool();
141+
const config: SSHRuntimeConfig = {
142+
host: "unknown.example.com",
143+
srcBaseDir: "/work",
144+
};
145+
146+
expect(pool.getConnectionHealth(config)).toBeUndefined();
147+
});
148+
149+
test("markHealthy sets connection to healthy state", () => {
150+
const pool = new SSHConnectionPool();
151+
const config: SSHRuntimeConfig = {
152+
host: "test.example.com",
153+
srcBaseDir: "/work",
154+
};
155+
156+
pool.markHealthy(config);
157+
const health = pool.getConnectionHealth(config);
158+
159+
expect(health).toBeDefined();
160+
expect(health!.status).toBe("healthy");
161+
expect(health!.consecutiveFailures).toBe(0);
162+
expect(health!.lastSuccess).toBeInstanceOf(Date);
163+
});
164+
165+
test("reportFailure puts connection into backoff", () => {
166+
const pool = new SSHConnectionPool();
167+
const config: SSHRuntimeConfig = {
168+
host: "test.example.com",
169+
srcBaseDir: "/work",
170+
};
171+
172+
// Mark healthy first
173+
pool.markHealthy(config);
174+
expect(pool.getConnectionHealth(config)?.status).toBe("healthy");
175+
176+
// Report a failure
177+
pool.reportFailure(config, "Connection refused");
178+
const health = pool.getConnectionHealth(config);
179+
180+
expect(health?.status).toBe("unhealthy");
181+
expect(health?.consecutiveFailures).toBe(1);
182+
expect(health?.lastError).toBe("Connection refused");
183+
expect(health?.backoffUntil).toBeDefined();
184+
});
185+
186+
test("resetBackoff clears backoff state after failed probe", async () => {
187+
const pool = new SSHConnectionPool();
188+
const config: SSHRuntimeConfig = {
189+
host: "nonexistent.invalid.host.test",
190+
srcBaseDir: "/work",
191+
};
192+
193+
// Trigger a failure via acquireConnection (will fail to connect)
194+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
195+
196+
// Verify we're now in backoff
197+
const healthBefore = pool.getConnectionHealth(config);
198+
expect(healthBefore?.status).toBe("unhealthy");
199+
expect(healthBefore?.backoffUntil).toBeDefined();
200+
201+
// Reset backoff
202+
pool.resetBackoff(config);
203+
const healthAfter = pool.getConnectionHealth(config);
204+
205+
expect(healthAfter).toBeDefined();
206+
expect(healthAfter!.status).toBe("unknown");
207+
expect(healthAfter!.consecutiveFailures).toBe(0);
208+
expect(healthAfter!.backoffUntil).toBeUndefined();
209+
});
210+
});
211+
212+
describe("acquireConnection", () => {
213+
test("returns immediately for known healthy connection", async () => {
214+
const pool = new SSHConnectionPool();
215+
const config: SSHRuntimeConfig = {
216+
host: "test.example.com",
217+
srcBaseDir: "/work",
218+
};
219+
220+
// Mark as healthy first
221+
pool.markHealthy(config);
222+
223+
// Should return immediately without probing
224+
const start = Date.now();
225+
await pool.acquireConnection(config);
226+
const elapsed = Date.now() - start;
227+
228+
// Should be nearly instant (< 50ms)
229+
expect(elapsed).toBeLessThan(50);
230+
});
231+
232+
test("throws immediately when in backoff", async () => {
233+
const pool = new SSHConnectionPool();
234+
const config: SSHRuntimeConfig = {
235+
host: "nonexistent.invalid.host.test",
236+
srcBaseDir: "/work",
237+
};
238+
239+
// Trigger a failure to put connection in backoff
240+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
241+
242+
// Second call should throw immediately with backoff message
243+
await expect(pool.acquireConnection(config)).rejects.toThrow(/in backoff/);
244+
});
245+
246+
test("getControlPath returns deterministic path", () => {
247+
const pool = new SSHConnectionPool();
248+
const config: SSHRuntimeConfig = {
249+
host: "test.example.com",
250+
srcBaseDir: "/work",
251+
};
252+
253+
const path1 = pool.getControlPath(config);
254+
const path2 = pool.getControlPath(config);
255+
256+
expect(path1).toBe(path2);
257+
expect(path1).toBe(getControlPath(config));
258+
});
259+
});
260+
261+
describe("singleflighting", () => {
262+
test("concurrent acquireConnection calls share same probe", async () => {
263+
const pool = new SSHConnectionPool();
264+
const config: SSHRuntimeConfig = {
265+
host: "test.example.com",
266+
srcBaseDir: "/work",
267+
};
268+
269+
// Mark healthy to avoid actual probe
270+
pool.markHealthy(config);
271+
272+
// Multiple concurrent calls should all succeed
273+
const results = await Promise.all([
274+
pool.acquireConnection(config),
275+
pool.acquireConnection(config),
276+
pool.acquireConnection(config),
277+
]);
278+
279+
// All should resolve (no errors)
280+
expect(results).toHaveLength(3);
281+
});
282+
});
283+
});

0 commit comments

Comments
Β (0)