Skip to content

Commit 997109f

Browse files
committed
πŸ€– feat: SSH connection pool with backoff and singleflighting
Prevents thundering herd issues with SSH connections by: - Adding SSHConnectionPool class with health tracking - Implementing exponential backoff (1s β†’ 5s β†’ 10s β†’ 20s β†’ 40s β†’ 60s cap) - Singleflighting concurrent connection attempts to same host - Probing unknown connections before first use - Skipping probes for known-healthy connections Integration points: - SSHRuntime.exec() and execSSHCommand() call acquireConnection() - PTYService calls acquireConnection() before spawning SSH terminals _Generated with mux_
1 parent 7e1ef79 commit 997109f

File tree

5 files changed

+456
-29
lines changed

5 files changed

+456
-29
lines changed

β€Žsrc/node/runtime/SSHRuntime.tsβ€Ž

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { expandTildeForSSH, cdCommandForSSH } from "./tildeExpansion";
2424
import { getProjectName } from "@/node/utils/runtime/helpers";
2525
import { getErrorMessage } from "@/common/utils/errors";
2626
import { execAsync, DisposableProcess } from "@/node/utils/disposableExec";
27-
import { getControlPath } from "./sshConnectionPool";
27+
import { getControlPath, sshConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
2828
import { getBashPath } from "@/node/utils/main/bashPath";
2929

3030
/**
@@ -40,19 +40,8 @@ const shescape = {
4040
},
4141
};
4242

43-
/**
44-
* SSH Runtime Configuration
45-
*/
46-
export interface SSHRuntimeConfig {
47-
/** SSH host (can be hostname, user@host, or SSH config alias) */
48-
host: string;
49-
/** Working directory on remote host */
50-
srcBaseDir: string;
51-
/** Optional: Path to SSH private key (if not using ~/.ssh/config or ssh-agent) */
52-
identityFile?: string;
53-
/** Optional: SSH port (default: 22) */
54-
port?: number;
55-
}
43+
// Re-export SSHRuntimeConfig from connection pool to maintain API compatibility
44+
export type { SSHRuntimeConfig } from "./sshConnectionPool";
5645

5746
/**
5847
* SSH runtime implementation that executes commands and file operations
@@ -92,7 +81,6 @@ export class SSHRuntime implements Runtime {
9281
/**
9382
* Execute command over SSH with streaming I/O
9483
*/
95-
// eslint-disable-next-line @typescript-eslint/require-await
9684
async exec(command: string, options: ExecOptions): Promise<ExecStream> {
9785
const startTime = performance.now();
9886

@@ -101,6 +89,10 @@ export class SSHRuntime implements Runtime {
10189
throw new RuntimeErrorClass("Operation aborted before execution", "exec");
10290
}
10391

92+
// Ensure connection is healthy before executing
93+
// This provides backoff protection and singleflighting for concurrent requests
94+
await sshConnectionPool.acquireConnection(this.config);
95+
10496
// Build command parts
10597
const parts: string[] = [];
10698

@@ -218,11 +210,22 @@ export class SSHRuntime implements Runtime {
218210
resolve(EXIT_CODE_TIMEOUT);
219211
return;
220212
}
221-
resolve(code ?? (signal ? -1 : 0));
213+
214+
const exitCode = code ?? (signal ? -1 : 0);
215+
216+
// SSH exit code 255 indicates connection failure - report to pool for backoff
217+
// This prevents thundering herd when a previously healthy host goes down
218+
if (exitCode === 255) {
219+
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
220+
}
221+
222+
resolve(exitCode);
222223
// Cleanup runs automatically via DisposableProcess
223224
});
224225

225226
sshProcess.on("error", (err) => {
227+
// Spawn errors are connection-level failures
228+
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${err.message}`);
226229
reject(new RuntimeErrorClass(`Failed to execute SSH command: ${err.message}`, "exec", err));
227230
});
228231
});
@@ -406,6 +409,9 @@ export class SSHRuntime implements Runtime {
406409
* @private
407410
*/
408411
private async execSSHCommand(command: string, timeoutMs: number): Promise<string> {
412+
// Ensure connection is healthy before executing
413+
await sshConnectionPool.acquireConnection(this.config, timeoutMs);
414+
409415
const sshArgs = this.buildSSHArgs();
410416
sshArgs.push(this.config.host, command);
411417

@@ -440,6 +446,10 @@ export class SSHRuntime implements Runtime {
440446
if (timedOut) return; // Already rejected
441447

442448
if (code !== 0) {
449+
// SSH exit code 255 indicates connection failure - report to pool for backoff
450+
if (code === 255) {
451+
sshConnectionPool.reportFailure(this.config, "SSH connection failed (exit code 255)");
452+
}
443453
reject(new RuntimeErrorClass(`SSH command failed: ${stderr.trim()}`, "network"));
444454
return;
445455
}
@@ -452,6 +462,8 @@ export class SSHRuntime implements Runtime {
452462
clearTimeout(timer);
453463
if (timedOut) return; // Already rejected
454464

465+
// Spawn errors are connection-level failures
466+
sshConnectionPool.reportFailure(this.config, `SSH spawn error: ${getErrorMessage(err)}`);
455467
reject(
456468
new RuntimeErrorClass(
457469
`Cannot execute SSH command: ${getErrorMessage(err)}`,

β€Žsrc/node/runtime/sshConnectionPool.test.tsβ€Ž

Lines changed: 149 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as os from "os";
22
import * as path from "path";
3-
import { getControlPath } from "./sshConnectionPool";
4-
import type { SSHRuntimeConfig } from "./SSHRuntime";
3+
import { getControlPath, SSHConnectionPool, type SSHRuntimeConfig } from "./sshConnectionPool";
54

65
describe("sshConnectionPool", () => {
76
describe("getControlPath", () => {
@@ -134,3 +133,151 @@ describe("username isolation", () => {
134133
expect(controlPath).toMatch(/mux-ssh-[a-f0-9]{12}$/);
135134
});
136135
});
136+
137+
describe("SSHConnectionPool", () => {
138+
describe("health tracking", () => {
139+
test("getConnectionHealth returns undefined for unknown connection", () => {
140+
const pool = new SSHConnectionPool();
141+
const config: SSHRuntimeConfig = {
142+
host: "unknown.example.com",
143+
srcBaseDir: "/work",
144+
};
145+
146+
expect(pool.getConnectionHealth(config)).toBeUndefined();
147+
});
148+
149+
test("markHealthy sets connection to healthy state", () => {
150+
const pool = new SSHConnectionPool();
151+
const config: SSHRuntimeConfig = {
152+
host: "test.example.com",
153+
srcBaseDir: "/work",
154+
};
155+
156+
pool.markHealthy(config);
157+
const health = pool.getConnectionHealth(config);
158+
159+
expect(health).toBeDefined();
160+
expect(health!.status).toBe("healthy");
161+
expect(health!.consecutiveFailures).toBe(0);
162+
expect(health!.lastSuccess).toBeInstanceOf(Date);
163+
});
164+
165+
test("reportFailure puts connection into backoff", () => {
166+
const pool = new SSHConnectionPool();
167+
const config: SSHRuntimeConfig = {
168+
host: "test.example.com",
169+
srcBaseDir: "/work",
170+
};
171+
172+
// Mark healthy first
173+
pool.markHealthy(config);
174+
expect(pool.getConnectionHealth(config)?.status).toBe("healthy");
175+
176+
// Report a failure
177+
pool.reportFailure(config, "Connection refused");
178+
const health = pool.getConnectionHealth(config);
179+
180+
expect(health?.status).toBe("unhealthy");
181+
expect(health?.consecutiveFailures).toBe(1);
182+
expect(health?.lastError).toBe("Connection refused");
183+
expect(health?.backoffUntil).toBeDefined();
184+
});
185+
186+
test("resetBackoff clears backoff state after failed probe", async () => {
187+
const pool = new SSHConnectionPool();
188+
const config: SSHRuntimeConfig = {
189+
host: "nonexistent.invalid.host.test",
190+
srcBaseDir: "/work",
191+
};
192+
193+
// Trigger a failure via acquireConnection (will fail to connect)
194+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
195+
196+
// Verify we're now in backoff
197+
const healthBefore = pool.getConnectionHealth(config);
198+
expect(healthBefore?.status).toBe("unhealthy");
199+
expect(healthBefore?.backoffUntil).toBeDefined();
200+
201+
// Reset backoff
202+
pool.resetBackoff(config);
203+
const healthAfter = pool.getConnectionHealth(config);
204+
205+
expect(healthAfter).toBeDefined();
206+
expect(healthAfter!.status).toBe("unknown");
207+
expect(healthAfter!.consecutiveFailures).toBe(0);
208+
expect(healthAfter!.backoffUntil).toBeUndefined();
209+
});
210+
});
211+
212+
describe("acquireConnection", () => {
213+
test("returns immediately for known healthy connection", async () => {
214+
const pool = new SSHConnectionPool();
215+
const config: SSHRuntimeConfig = {
216+
host: "test.example.com",
217+
srcBaseDir: "/work",
218+
};
219+
220+
// Mark as healthy first
221+
pool.markHealthy(config);
222+
223+
// Should return immediately without probing
224+
const start = Date.now();
225+
await pool.acquireConnection(config);
226+
const elapsed = Date.now() - start;
227+
228+
// Should be nearly instant (< 50ms)
229+
expect(elapsed).toBeLessThan(50);
230+
});
231+
232+
test("throws immediately when in backoff", async () => {
233+
const pool = new SSHConnectionPool();
234+
const config: SSHRuntimeConfig = {
235+
host: "nonexistent.invalid.host.test",
236+
srcBaseDir: "/work",
237+
};
238+
239+
// Trigger a failure to put connection in backoff
240+
await expect(pool.acquireConnection(config, 1000)).rejects.toThrow();
241+
242+
// Second call should throw immediately with backoff message
243+
await expect(pool.acquireConnection(config)).rejects.toThrow(/in backoff/);
244+
});
245+
246+
test("getControlPath returns deterministic path", () => {
247+
const pool = new SSHConnectionPool();
248+
const config: SSHRuntimeConfig = {
249+
host: "test.example.com",
250+
srcBaseDir: "/work",
251+
};
252+
253+
const path1 = pool.getControlPath(config);
254+
const path2 = pool.getControlPath(config);
255+
256+
expect(path1).toBe(path2);
257+
expect(path1).toBe(getControlPath(config));
258+
});
259+
});
260+
261+
describe("singleflighting", () => {
262+
test("concurrent acquireConnection calls share same probe", async () => {
263+
const pool = new SSHConnectionPool();
264+
const config: SSHRuntimeConfig = {
265+
host: "test.example.com",
266+
srcBaseDir: "/work",
267+
};
268+
269+
// Mark healthy to avoid actual probe
270+
pool.markHealthy(config);
271+
272+
// Multiple concurrent calls should all succeed
273+
const results = await Promise.all([
274+
pool.acquireConnection(config),
275+
pool.acquireConnection(config),
276+
pool.acquireConnection(config),
277+
]);
278+
279+
// All should resolve (no errors)
280+
expect(results).toHaveLength(3);
281+
});
282+
});
283+
});

0 commit comments

Comments
Β (0)