Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/bridge/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ class BridgeProcess {

constructor(options: BridgeOptions) {
this.options = options;
// Compute socket path from session name (platform-aware: Unix socket or Windows named pipe)
this.socketPath = getSocketPath(options.sessionName);
// Each bridge instance gets a unique socket path based on its PID, so that
// overlapping bridges (e.g. background reconnect racing with explicit restart)
// never delete each other's sockets during cleanup.
this.socketPath = getSocketPath(options.sessionName, process.pid);

// Create promise that resolves when MCP client connects
this.mcpClientReady = new Promise<void>((resolve, reject) => {
Expand Down Expand Up @@ -1516,6 +1518,7 @@ class BridgeProcess {
});

// Remove socket file (Unix only - Windows named pipes don't leave files)
// Safe because each bridge has a unique PID-based socket path.
if (process.platform !== 'win32') {
try {
if (await fileExists(this.socketPath)) {
Expand Down
17 changes: 16 additions & 1 deletion src/cli/commands/clean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
getLogsDir,
fileExists,
cleanupOrphanedLogFiles,
cleanupOrphanedSockets,
} from '../../lib/index.js';
import { formatOutput, formatSuccess, formatWarning } from '../output.js';
import { loadSessions, deleteSession, consolidateSessions } from '../../lib/sessions.js';
Expand All @@ -33,6 +34,7 @@ interface CleanResult {
crashedBridges: number;
expiredSessions: number;
orphanedBridgeLogs: number;
orphanedSockets: number;
sessions: number;
profiles: number;
logs: number;
Expand All @@ -47,17 +49,22 @@ async function cleanStale(): Promise<{
crashedBridges: number;
expiredSessions: number;
orphanedBridgeLogs: number;
orphanedSockets: number;
}> {
// Consolidate sessions, removes expired ones
const consolidateResult = await consolidateSessions(true);

// Clean up orphaned log files (for sessions that no longer exist, older than 7 days)
const orphanedBridgeLogs = await cleanupOrphanedLogFiles(consolidateResult.sessions);

// Clean up orphaned socket files (PID-based sockets from dead bridges, older than 5 min)
const orphanedSockets = await cleanupOrphanedSockets(consolidateResult.sessions);

return {
crashedBridges: consolidateResult.crashedBridges,
expiredSessions: consolidateResult.expiredSessions,
orphanedBridgeLogs,
orphanedSockets,
};
}

Expand Down Expand Up @@ -127,6 +134,7 @@ async function cleanAll(): Promise<CleanResult> {
crashedBridges: 0,
expiredSessions: 0,
orphanedBridgeLogs: 0,
orphanedSockets: 0,
sessions: 0,
profiles: 0,
logs: 0,
Expand All @@ -148,6 +156,7 @@ async function cleanAll(): Promise<CleanResult> {
result.crashedBridges = staleResult.crashedBridges;
result.expiredSessions = staleResult.expiredSessions;
result.orphanedBridgeLogs = staleResult.orphanedBridgeLogs;
result.orphanedSockets = staleResult.orphanedSockets;

// Remove any remaining empty directories
const mcpcHome = getMcpcHome();
Expand Down Expand Up @@ -188,6 +197,7 @@ export async function clean(options: CleanOptions): Promise<void> {
crashedBridges: 0,
expiredSessions: 0,
orphanedBridgeLogs: 0,
orphanedSockets: 0,
sessions: 0,
profiles: 0,
logs: 0,
Expand Down Expand Up @@ -223,6 +233,7 @@ export async function clean(options: CleanOptions): Promise<void> {
result.crashedBridges = staleResult.crashedBridges;
result.expiredSessions = staleResult.expiredSessions;
result.orphanedBridgeLogs = staleResult.orphanedBridgeLogs;
result.orphanedSockets = staleResult.orphanedSockets;
}

// Clean specific resources if requested
Expand All @@ -246,14 +257,18 @@ export async function clean(options: CleanOptions): Promise<void> {

if (!cleaningSpecific) {
const hasCleanups =
result.crashedBridges > 0 || result.expiredSessions > 0 || result.orphanedBridgeLogs > 0;
result.crashedBridges > 0 ||
result.expiredSessions > 0 ||
result.orphanedBridgeLogs > 0 ||
result.orphanedSockets > 0;

if (hasCleanups) {
const parts: string[] = [];
if (result.crashedBridges > 0) parts.push(`${result.crashedBridges} crashed bridge(s)`);
if (result.expiredSessions > 0) parts.push(`${result.expiredSessions} expired session(s)`);
if (result.orphanedBridgeLogs > 0)
parts.push(`${result.orphanedBridgeLogs} orphaned log(s)`);
if (result.orphanedSockets > 0) parts.push(`${result.orphanedSockets} orphaned socket(s)`);
messages.push(`Cleaned ${parts.join(', ')}`);
} else {
messages.push('No stale resources found');
Expand Down
41 changes: 20 additions & 21 deletions src/lib/bridge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
*/

import { spawn, type ChildProcess } from 'child_process';
import { unlink } from 'fs/promises';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
import type { ServerConfig, AuthCredentials, ProxyConfig, X402WalletCredentials } from './types.js';
import {
getSocketPath,
waitForFile,
isProcessAlive,
fileExists,
getLogsDir,
isSessionExpiredError,
enrichErrorMessage,
Expand Down Expand Up @@ -137,17 +135,6 @@ export async function startBridge(options: StartBridgeOptions): Promise<StartBri

logger.debug(`Launching bridge for session: ${sessionName}`);

// Get socket path (computed from session name, platform-aware)
const socketPath = getSocketPath(sessionName);

// Remove existing socket file if it exists (Unix only, Windows named pipes don't leave files)
// We MUST do it here, so waitForFile() below doesn't pick the old file!
// Plus, if it fails, the user will see the error
if (process.platform !== 'win32' && (await fileExists(socketPath))) {
logger.debug(`Removing existing socket: ${socketPath}`);
await unlink(socketPath);
}

// Create a sanitized transport config without any headers
// Headers will be sent to the bridge via IPC instead
const sanitizedTarget: ServerConfig = { ...serverConfig };
Expand Down Expand Up @@ -215,6 +202,11 @@ export async function startBridge(options: StartBridgeOptions): Promise<StartBri

const pid = bridgeProcess.pid;

// Each bridge gets a unique socket path based on its PID, so overlapping
// bridges (e.g. background reconnect racing with explicit restart) never
// conflict. The bridge process computes the same path via process.pid.
const socketPath = getSocketPath(sessionName, pid);

// Wait for socket file to be created (with timeout)
try {
await waitForFile(socketPath, { timeoutMs: 5000 });
Expand Down Expand Up @@ -280,7 +272,7 @@ export async function stopBridge(
// For graceful shutdown (closeSession), send IPC message first so bridge
// can send HTTP DELETE. For restart, just kill immediately.
if (options?.graceful) {
const socketPath = getSocketPath(sessionName);
const socketPath = getSocketPath(sessionName, session.pid);
const shutdownOk = await sendBridgeShutdown(socketPath);
if (shutdownOk) {
await waitForProcessExit(session.pid, 2000);
Expand Down Expand Up @@ -610,13 +602,13 @@ export async function ensureBridgeReady(sessionName: string): Promise<string> {
);
}

// Socket path is computed from session name (platform-aware)
const socketPath = getSocketPath(sessionName);
// Socket path is PID-based: each bridge instance gets its own unique path
const socketPath = session.pid ? getSocketPath(sessionName, session.pid) : null;

// Quick check: is the process alive?
const processAlive = session.pid ? isProcessAlive(session.pid) : false;

if (processAlive) {
if (processAlive && socketPath) {
// Process alive, try getServerDetails (blocks until MCP connected)
const result = await checkBridgeHealth(socketPath);
if (result.healthy) {
Expand All @@ -642,16 +634,23 @@ export async function ensureBridgeReady(sessionName: string): Promise<string> {
// Bridge not healthy - restart it
// Use 'connecting' if the session has never successfully connected (no lastSeenAt),
// 'reconnecting' if it was previously active.
// Set lastConnectionAttemptAt to prevent parallel CLI processes from
// also triggering a restart via consolidateSessions/reconnectCrashedSessions.
const restartStatus = session.lastSeenAt ? 'reconnecting' : 'connecting';
await updateSession(sessionName, { status: restartStatus });
await restartBridge(sessionName);
await updateSession(sessionName, {
status: restartStatus,
lastConnectionAttemptAt: new Date().toISOString(),
});
const { pid: newPid } = await restartBridge(sessionName);

const newSocketPath = getSocketPath(sessionName, newPid);

// Try getServerDetails on restarted bridge (blocks until MCP connected)
const result = await checkBridgeHealth(socketPath);
const result = await checkBridgeHealth(newSocketPath);
if (result.healthy) {
await updateSession(sessionName, { status: 'active' });
logger.debug(`Bridge for ${sessionName} passed health check`);
return socketPath;
return newSocketPath;
}

// Not healthy after restart - classify the error
Expand Down
65 changes: 64 additions & 1 deletion src/lib/cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import { readdir, unlink, stat } from 'fs/promises';
import { join } from 'path';
import { getLogsDir, fileExists } from './utils.js';
import { getLogsDir, getBridgesDir, getSocketPath, fileExists } from './utils.js';
import { createLogger } from './logger.js';

const logger = createLogger('cleanup');
Expand Down Expand Up @@ -80,3 +80,66 @@ export async function cleanupOrphanedLogFiles(

return deletedCount;
}

/**
* Clean up orphaned socket files in the bridges directory.
* With PID-based socket paths (@session.1234.sock), stale sockets can accumulate
* when a bridge exits without cleanup (e.g. SIGKILL, crash, or orphaned background restart).
*
* A socket is considered orphaned if it doesn't match any active session's current PID.
* Only sockets older than `minAgeSeconds` are removed to avoid racing with a bridge
* that was just spawned but hasn't updated sessions.json yet.
*
* @param activeSessions - Map of session names to session data (with optional pid)
* @param options.minAgeSeconds - Only delete sockets older than this (default: 300 = 5 min)
* @returns Number of socket files deleted
*/
export async function cleanupOrphanedSockets(
activeSessions: Record<string, { pid?: number } | undefined>,
options: { minAgeSeconds?: number } = {}
): Promise<number> {
const { minAgeSeconds = 300 } = options;

if (process.platform === 'win32') {
return 0; // Windows named pipes don't leave files
}

const bridgesDir = getBridgesDir();
if (!(await fileExists(bridgesDir))) {
return 0;
}

// Build set of active socket paths for fast lookup
const activeSocketPaths = new Set<string>();
for (const [name, session] of Object.entries(activeSessions)) {
if (session?.pid) {
activeSocketPaths.add(getSocketPath(name, session.pid));
}
}

const cutoffTime = Date.now() - minAgeSeconds * 1000;
let deletedCount = 0;

const files = await readdir(bridgesDir);
for (const file of files) {
if (!file.endsWith('.sock')) continue;

const filePath = join(bridgesDir, file);

// Skip sockets that belong to a known active session+PID
if (activeSocketPaths.has(filePath)) continue;

try {
const fileStats = await stat(filePath);
if (fileStats.mtime.getTime() < cutoffTime) {
await unlink(filePath);
deletedCount++;
logger.debug(`Removed orphaned socket: ${file}`);
}
} catch {
// Ignore stat/unlink errors
}
}

return deletedCount;
}
10 changes: 5 additions & 5 deletions src/lib/session-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ export class SessionClient extends EventEmitter implements IMcpClient {

// Restart bridge
await updateSession(this.sessionName, { status: 'reconnecting' });
await restartBridge(this.sessionName);
const { pid: newPid } = await restartBridge(this.sessionName);

// Reconnect using computed socket path
const socketPath = getSocketPath(this.sessionName);
// Reconnect using the new bridge's PID-based socket path
const socketPath = getSocketPath(this.sessionName, newPid);
this.bridgeClient = new BridgeClient(socketPath);
this.setupNotificationForwarding();
await this.bridgeClient.connect();
Expand Down Expand Up @@ -340,9 +340,9 @@ export class SessionClient extends EventEmitter implements IMcpClient {

logger.debug(`Socket error during callToolWithTask, will restart bridge...`);
await this.bridgeClient.close();
await restartBridge(this.sessionName);
const { pid: newPid } = await restartBridge(this.sessionName);

const socketPath = getSocketPath(this.sessionName);
const socketPath = getSocketPath(this.sessionName, newPid);
this.bridgeClient = new BridgeClient(socketPath);
this.setupNotificationForwarding();
await this.bridgeClient.connect();
Expand Down
12 changes: 10 additions & 2 deletions src/lib/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ export async function consolidateSessions(
}

// Delete socket file (Unix only - Windows named pipes don't leave files)
if (process.platform !== 'win32') {
const socketPath = getSocketPath(name);
if (process.platform !== 'win32' && session.pid) {
const socketPath = getSocketPath(name, session.pid);
try {
await unlink(socketPath);
logger.debug(`Removed stale socket: ${socketPath}`);
Expand All @@ -321,6 +321,14 @@ export async function consolidateSessions(
// Check bridge status - always remove pid if process is not alive
if (session.pid && !isProcessAlive(session.pid)) {
logger.debug(`Clearing crashed bridge PID for session: ${name} (PID: ${session.pid})`);
// Clean up the PID-based socket file for this dead bridge
if (process.platform !== 'win32') {
try {
await unlink(getSocketPath(name, session.pid));
} catch {
// Ignore - file may already be deleted by bridge cleanup
}
}
delete session.pid;
hasChanges = true;
// Don't overwrite terminal/transient statuses
Expand Down
8 changes: 5 additions & 3 deletions src/lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,20 @@ export function getBridgesDir(): string {
* to avoid conflicts between different mcpc instances.
*
* @param sessionName - The session name (e.g., "@my-session")
* @param pid - The bridge process PID (each bridge gets a unique socket path)
* @returns The platform-appropriate socket/pipe path
*/
export function getSocketPath(sessionName: string): string {
export function getSocketPath(sessionName: string, pid: number): string {
const suffix = `.${pid}`;
if (process.platform === 'win32') {
// Windows named pipes are global, so include a hash of the home directory
// to avoid conflicts between different mcpc instances
const homeHash = createHash('sha256').update(getMcpcHome()).digest('hex').slice(0, 8);
return `\\\\.\\pipe\\mcpc-${homeHash}-${sessionName}`;
return `\\\\.\\pipe\\mcpc-${homeHash}-${sessionName}${suffix}`;
}

// Unix/macOS: use socket file in bridges directory (naturally isolated per home dir)
return join(getBridgesDir(), `${sessionName}.sock`);
return join(getBridgesDir(), `${sessionName}${suffix}.sock`);
}

/**
Expand Down
Loading