From b7d0987cee53577b2e3e45bbc0d809d31f3f986d Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Thu, 23 Apr 2026 13:58:23 +0200 Subject: [PATCH 1/6] Add session close support and cleanup handling Advertise ACP close capability and implement `unstable_closeSession` to interrupt active turns, unsubscribe threads, remove per-session listeners, and forget session state. --- src/CodexAcpClient.ts | 22 ++++ src/CodexAcpServer.ts | 30 ++++- src/CodexAppServerClient.ts | 18 +++ .../CodexACPAgent/CodexAcpClient.test.ts | 113 ++++++++++++++++++ .../CodexACPAgent/initialize.test.ts | 1 + 5 files changed, 183 insertions(+), 1 deletion(-) diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index 514e53b5..8c5e3942 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -375,6 +375,28 @@ export class CodexAcpClient { this.codexClient.onElicitationRequest(sessionId, elicitationHandler); } + unsubscribeFromSessionEvents(sessionId: string): void { + this.codexClient.removeServerNotification(sessionId); + this.codexClient.removeApprovalRequest(sessionId); + this.codexClient.removeElicitationRequest(sessionId); + } + + async closeSession(sessionId: string, currentTurnId: string | null): Promise { + if (currentTurnId) { + try { + await this.codexClient.turnInterrupt({ + threadId: sessionId, + turnId: currentTurnId, + }); + } catch (err) { + logger.error(`Failed to interrupt turn while closing session ${sessionId}`, err); + } + } + + await this.codexClient.threadUnsubscribe({threadId: sessionId}); + this.unsubscribeFromSessionEvents(sessionId); + } + async sendPrompt( request: acp.PromptRequest, agentMode: AgentMode, diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 4159656a..c274d1e6 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -101,6 +101,7 @@ export class CodexAcpServer implements acp.Agent { image: true }, sessionCapabilities: { + close: {}, resume: { }, list: { } }, @@ -238,6 +239,19 @@ export class CodexAcpServer implements acp.Agent { return await this.runWithProcessCheck(() => this.codexAcpClient.listSessions(params)); } + async unstable_closeSession(params: acp.CloseSessionRequest): Promise { + logger.log("Closing session...", {sessionId: params.sessionId}); + const sessionState = this.getSessionState(params.sessionId); + + await this.runWithProcessCheck(() => + this.codexAcpClient.closeSession(params.sessionId, sessionState.currentTurnId) + ); + this.forgetSession(params.sessionId); + + logger.log("Session closed", {sessionId: params.sessionId}); + return {}; + } + async newSession( params: acp.NewSessionRequest, ): Promise { @@ -641,6 +655,11 @@ export class CodexAcpServer implements acp.Agent { return sessionState; } + private forgetSession(sessionId: string): void { + this.sessions.delete(sessionId); + this.pendingMcpStartupSessions.delete(sessionId); + } + private resolveSessionMcpServers( mcpServers: Array, recoverFromStartup: boolean, @@ -678,7 +697,16 @@ export class CodexAcpServer implements acp.Agent { pendingStartup.afterVersion, ) ); - await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.requestedServers); + const sessionState = this.sessions.get(sessionId); + const pendingStartup = this.pendingMcpStartupSessions.get(sessionId); + if (sessionState && pendingStartup) { + sessionState.sessionMcpServers = mcpStartup.ready.filter(serverName => + pendingStartup.requestedServers.has(serverName) + ); +await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.requestedServers); + } else { + logger.log("Skipping MCP startup status for closed session", {sessionId}); + } } catch (err) { logger.error(`Failed to publish MCP startup status for session ${sessionId}`, err); } finally { diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index afd333ff..3ef4a2b7 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -34,6 +34,8 @@ import type { ThreadResumeResponse, ThreadStartParams, ThreadStartResponse, + ThreadUnsubscribeParams, + ThreadUnsubscribeResponse, TurnCompletedNotification, TurnInterruptParams, TurnInterruptResponse, @@ -143,10 +145,18 @@ export class CodexAppServerClient { this.approvalHandlers.set(threadId, handler); } + removeApprovalRequest(threadId: string): void { + this.approvalHandlers.delete(threadId); + } + onElicitationRequest(threadId: string, handler: ElicitationHandler): void { this.elicitationHandlers.set(threadId, handler); } + removeElicitationRequest(threadId: string): void { + this.elicitationHandlers.delete(threadId); + } + async initialize(params: InitializeParams): Promise { return await this.sendRequest({ method: "initialize", params: params }); } @@ -179,6 +189,10 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "thread/read", params: params }); } + async threadUnsubscribe(params: ThreadUnsubscribeParams): Promise { + return await this.sendRequest({ method: "thread/unsubscribe", params }); + } + async listMcpServerStatus(params: ListMcpServerStatusParams): Promise { return await this.sendRequest({ method: "mcpServerStatus/list", params }); } @@ -248,6 +262,10 @@ export class CodexAppServerClient { this.notificationHandlers.set(sessionId, callback); } + removeServerNotification(sessionId: string): void { + this.notificationHandlers.delete(sessionId); + } + private codexEventHandlers: Array<(event: CodexConnectionEvent) => void> = []; onClientTransportEvent(callback: (event: CodexConnectionEvent) => void){ this.codexEventHandlers.push(callback); diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index 826165ca..6df1e7ed 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -924,4 +924,117 @@ describe('ACP server test', { timeout: 40_000 }, () => { } }); }); + + it('closes an active session by interrupting the turn, unsubscribing the thread, and removing listeners', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + + await codexAcpClient.subscribeToSessionEvents( + "session-close", + vi.fn(), + { + handleCommandExecution: vi.fn(), + handleFileChange: vi.fn(), + }, + { + handleElicitation: vi.fn(), + } + ); + + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.notificationHandlers.has("session-close")).toBe(true); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.approvalHandlers.has("session-close")).toBe(true); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(true); + + await codexAcpClient.closeSession("session-close", "turn-close"); + + const closeRequests = mockFixture.getCodexConnectionEvents([]) + .filter((event) => event.eventType === "request"); + expect(closeRequests).toEqual([ + { + eventType: "request", + method: "turn/interrupt", + params: {threadId: "session-close", turnId: "turn-close"}, + }, + { + eventType: "request", + method: "thread/unsubscribe", + params: {threadId: "session-close"}, + }, + ]); + + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.notificationHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.approvalHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(false); + }); + + it('removes session bookkeeping after unstable_closeSession', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + currentTurnId: "turn-close", + }); + const closeSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.pendingMcpStartupSessions.set(sessionState.sessionId, { + requestedServers: new Set(["alpha"]), + }); + + await expect( + codexAcpAgent.unstable_closeSession({sessionId: sessionState.sessionId}) + ).resolves.toEqual({}); + expect(closeSpy).toHaveBeenCalledWith(sessionState.sessionId, sessionState.currentTurnId); + expect(() => codexAcpAgent.getSessionState(sessionState.sessionId)).toThrow( + `Session ${sessionState.sessionId} not found` + ); + // @ts-expect-error verifying test-only access to private map + expect(codexAcpAgent.pendingMcpStartupSessions.has(sessionState.sessionId)).toBe(false); + }); + + it('skips late MCP startup updates after a session is closed', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let resolveStartup!: (event: any) => void; + const startupPromise = new Promise((resolve) => { + resolveStartup = resolve; + }); + vi.spyOn(codexAcpClient, "awaitMcpStartupResult").mockReturnValue(startupPromise as Promise); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.pendingMcpStartupSessions.set(sessionState.sessionId, { + requestedServers: new Set(["alpha"]), + }); + + // @ts-expect-error exercising private helper to verify close-session race handling + const publishPromise = codexAcpAgent.doPublishMcpStartupStatus(sessionState.sessionId, 1); + // @ts-expect-error exercising private helper to simulate close-session cleanup + codexAcpAgent.forgetSession(sessionState.sessionId); + + resolveStartup({ + ready: ["alpha"], + failed: [], + cancelled: [], + }); + await publishPromise; + + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + }); }); diff --git a/src/__tests__/CodexACPAgent/initialize.test.ts b/src/__tests__/CodexACPAgent/initialize.test.ts index 8ca6b432..1d5cfef6 100644 --- a/src/__tests__/CodexACPAgent/initialize.test.ts +++ b/src/__tests__/CodexACPAgent/initialize.test.ts @@ -37,6 +37,7 @@ describe('CodexACPAgent - initialize', () => { image: true }, sessionCapabilities: { + close: {}, resume: {}, list: {}, }, From c39706a675a537f3f49f1aca4913633729179e09 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Thu, 23 Apr 2026 15:03:31 +0200 Subject: [PATCH 2/6] Track turns by session and route notifications Store pending turn ids separately from turn completion so session shutdown can interrupt only active turns and wait for them to finish. Also scope app-server notifications to their thread and avoid sending command/session updates after a session has been removed or is closing. --- src/CodexAcpClient.ts | 32 +- src/CodexAcpServer.ts | 89 ++++-- src/CodexAppServerClient.ts | 94 +++++- src/CodexCommands.ts | 41 ++- .../CodexACPAgent/CodexAcpClient.test.ts | 284 +++++++++++++++++- .../data/send-attachments-turn-start.json | 66 ---- src/__tests__/acp-test-utils.ts | 17 +- 7 files changed, 483 insertions(+), 140 deletions(-) delete mode 100644 src/__tests__/CodexACPAgent/data/send-attachments-turn-start.json diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index 8c5e3942..88281b65 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -382,33 +382,30 @@ export class CodexAcpClient { } async closeSession(sessionId: string, currentTurnId: string | null): Promise { - if (currentTurnId) { - try { - await this.codexClient.turnInterrupt({ - threadId: sessionId, - turnId: currentTurnId, - }); - } catch (err) { - logger.error(`Failed to interrupt turn while closing session ${sessionId}`, err); - } + if (currentTurnId && !this.codexClient.hasTurnCompleted(sessionId, currentTurnId)) { + await this.codexClient.turnInterrupt({ + threadId: sessionId, + turnId: currentTurnId, + }); + await this.codexClient.awaitTurnCompleted(sessionId, currentTurnId); } await this.codexClient.threadUnsubscribe({threadId: sessionId}); this.unsubscribeFromSessionEvents(sessionId); } - async sendPrompt( + async startPrompt( request: acp.PromptRequest, agentMode: AgentMode, modelId: ModelId, disableSummary: boolean, cwd: string, - ): Promise { + ): Promise { const input = buildPromptItems(request.prompt); const effort = modelId.effort as ReasoningEffort | null; //TODO remove unsafe conversion await this.refreshSkills(cwd, request._meta); - await this.codexClient.turnStart({ + const response = await this.codexClient.turnStart({ outputSchema: null, threadId: request.sessionId, input: input, @@ -422,9 +419,16 @@ export class CodexAcpClient { model: modelId.model, }); - // Wait for turn completion + return response.turn.id; + } + + async awaitTurnCompleted(sessionId: string, turnId: string): Promise { // If turnInterrupt() was called, Codex will send turn/completed event with status "interrupted" - return await this.codexClient.awaitTurnCompleted(); + return await this.codexClient.awaitTurnCompleted(sessionId, turnId); + } + + hasTurnCompleted(sessionId: string, turnId: string): boolean { + return this.codexClient.hasTurnCompleted(sessionId, turnId); } async listSkills(params?: SkillsListParams): Promise { diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index c274d1e6..4ec1b693 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -44,6 +44,7 @@ export interface SessionState { supportedReasoningEfforts: Array, supportedInputModalities: Array, agentMode: AgentMode, + pendingTurnId: Promise | null; currentTurnId: string | null; lastTokenUsage: TokenCount | null; totalTokenUsage: TokenCount | null; @@ -68,6 +69,7 @@ export class CodexAcpServer implements acp.Agent { private readonly sessions: Map; private readonly pendingMcpStartupSessions: Map; + private readonly closingSessions: Set; constructor( connection: acp.AgentSideConnection, @@ -77,6 +79,7 @@ export class CodexAcpServer implements acp.Agent { ) { this.sessions = new Map(); this.pendingMcpStartupSessions = new Map(); + this.closingSessions = new Set(); this.connection = connection; this.codexAcpClient = codexAcpClient; this.defaultAuthRequest = defaultAuthRequest ?? null; @@ -84,7 +87,9 @@ export class CodexAcpServer implements acp.Agent { this.availableCommands = new CodexCommands( connection, codexAcpClient, - (operation) => this.runWithProcessCheck(operation) + (operation) => this.runWithProcessCheck(operation), + (sessionId) => this.sessions.has(sessionId), + (sessionId) => this.closingSessions.has(sessionId), ); } @@ -170,6 +175,7 @@ export class CodexAcpServer implements acp.Agent { supportedReasoningEfforts: currentModel?.supportedReasoningEfforts ?? [], supportedInputModalities: currentModel?.inputModalities ?? ["text", "image"], agentMode: AgentMode.getInitialAgentMode(), + pendingTurnId: null, currentTurnId: null, lastTokenUsage: null, totalTokenUsage: null, @@ -242,11 +248,18 @@ export class CodexAcpServer implements acp.Agent { async unstable_closeSession(params: acp.CloseSessionRequest): Promise { logger.log("Closing session...", {sessionId: params.sessionId}); const sessionState = this.getSessionState(params.sessionId); + this.closingSessions.add(params.sessionId); - await this.runWithProcessCheck(() => - this.codexAcpClient.closeSession(params.sessionId, sessionState.currentTurnId) - ); - this.forgetSession(params.sessionId); + try { + const activeTurnId = await this.resolveActiveTurnId(sessionState); + await this.runWithProcessCheck(() => + this.codexAcpClient.closeSession(params.sessionId, activeTurnId) + ); + this.forgetSession(params.sessionId); + } catch (err) { + this.closingSessions.delete(params.sessionId); + throw err; + } logger.log("Session closed", {sessionId: params.sessionId}); return {}; @@ -394,6 +407,7 @@ export class CodexAcpServer implements acp.Agent { supportedReasoningEfforts: currentModel?.supportedReasoningEfforts ?? [], supportedInputModalities: currentModel?.inputModalities ?? ["text", "image"], agentMode: AgentMode.getInitialAgentMode(), + pendingTurnId: null, currentTurnId: null, lastTokenUsage: null, totalTokenUsage: null, @@ -655,9 +669,26 @@ export class CodexAcpServer implements acp.Agent { return sessionState; } + private async resolveActiveTurnId(sessionState: SessionState): Promise { + if (sessionState.currentTurnId) { + return sessionState.currentTurnId; + } + if (!sessionState.pendingTurnId) { + return null; + } + + try { + return await sessionState.pendingTurnId; + } catch (err) { + logger.error(`Failed to resolve pending turn for session ${sessionState.sessionId}`, err); + return null; + } + } + private forgetSession(sessionId: string): void { this.sessions.delete(sessionId); this.pendingMcpStartupSessions.delete(sessionId); + this.closingSessions.delete(sessionId); } private resolveSessionMcpServers( @@ -741,6 +772,10 @@ await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.request prompt: params.prompt, }); const sessionState = this.getSessionState(params.sessionId); + if (this.closingSessions.has(params.sessionId)) { + throw RequestError.invalidRequest("Session is closing"); + } + sessionState.pendingTurnId = null; sessionState.currentTurnId = null; sessionState.lastTokenUsage = null; @@ -750,6 +785,9 @@ await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.request const elicitationHandler = new CodexElicitationHandler(this.connection, sessionState); await this.codexAcpClient.subscribeToSessionEvents(params.sessionId, (event) => { + if (this.closingSessions.has(params.sessionId)) { + return; + } elicitationHandler.handleNotification(event); return eventHandler.handleNotification(event); }, @@ -781,21 +819,32 @@ await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.request throw RequestError.invalidRequest("The current model does not support image input"); } const agentMode = sessionState.agentMode; + const pendingTurnId = this.runWithProcessCheck( + () => this.codexAcpClient.startPrompt(params, agentMode, modelId, disableSummary, sessionState.cwd) + ); + sessionState.pendingTurnId = pendingTurnId; + const turnId = await pendingTurnId; + sessionState.pendingTurnId = null; + sessionState.currentTurnId = turnId; + const turnCompleted = await this.runWithProcessCheck( - () => this.codexAcpClient.sendPrompt(params, agentMode, modelId, disableSummary, sessionState.cwd)); + () => this.codexAcpClient.awaitTurnCompleted(params.sessionId, turnId) + ); // Check if turn was interrupted (cancelled) if (turnCompleted.turn.status === "interrupted") { - await this.connection.sessionUpdate({ - sessionId: params.sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: "*Conversation interrupted*" + if (!this.closingSessions.has(params.sessionId)) { + await this.connection.sessionUpdate({ + sessionId: params.sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "*Conversation interrupted*" + } } - } - }); + }); + } return { stopReason: "cancelled", usage: this.buildPromptUsage(sessionState.lastTokenUsage), @@ -819,6 +868,7 @@ await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.request throw err; } finally { logger.log("Prompt completed", {sessionId: params.sessionId}); + sessionState.pendingTurnId = null; sessionState.currentTurnId = null; } } @@ -872,24 +922,25 @@ await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.request return; } - if (!sessionState.currentTurnId) { + const currentTurnId = await this.resolveActiveTurnId(sessionState); + if (!currentTurnId) { logger.log("Cancel request rejected: no current turn", {sessionId: params.sessionId}); return; } logger.log("Cancel session requested", { sessionId: params.sessionId, - currentTurnId: sessionState.currentTurnId + currentTurnId, }); try { // After turnInterrupt(), Codex will send turn/completed event, which will naturally complete awaitTurnCompleted() await this.codexAcpClient.turnInterrupt({ threadId: params.sessionId, - turnId: sessionState.currentTurnId + turnId: currentTurnId }); logger.log("Cancel - turnInterrupt succeeded", { sessionId: params.sessionId, - currentTurnId: sessionState.currentTurnId + currentTurnId, }); } catch (err) { logger.error(`Cancel - turnInterrupt failed`, err); diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index 3ef4a2b7..24c1fa77 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -96,19 +96,26 @@ export class CodexAppServerClient { private mcpServerStartupVersion = 0; private readonly mcpServerStartupStates = new Map(); private readonly mcpServerStartupResolvers: Array = []; + private readonly turnCompletedResolvers = new Map>(); + private readonly lastTurnCompletedByThread = new Map(); constructor(connection: MessageConnection) { this.connection = connection; this.connection.onUnhandledNotification((data) => { const serverNotification = data as ServerNotification; - if (isMcpServerStatusUpdatedNotification(serverNotification)) { - this.mcpServerStartupVersion += 1; - this.mcpServerStartupStates.set(serverNotification.params.name, { - status: serverNotification.params.status, - error: serverNotification.params.error, - version: this.mcpServerStartupVersion, - }); - this.resolveMcpServerStartupResolvers(); + if (isMcpServerStatusUpdatedNotification(serverNotification)) { + this.mcpServerStartupVersion += 1; + this.mcpServerStartupStates.set(serverNotification.params.name, { + status: serverNotification.params.status, + error: serverNotification.params.error, + version: this.mcpServerStartupVersion, + }); + this.resolveMcpServerStartupResolvers(); + } + if (isTurnCompletedNotification(serverNotification)) { + this.lastTurnCompletedByThread.set(serverNotification.params.threadId, serverNotification.params); + this.resolveTurnCompleted(serverNotification.params); + } } this.notify(serverNotification); for (const callback of this.codexEventHandlers) { @@ -237,15 +244,24 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "account/read", params: params }); } - //TODO create type-safe helper - async awaitTurnCompleted(): Promise { + async awaitTurnCompleted(threadId: string, turnId: string): Promise { + const completedTurn = this.lastTurnCompletedByThread.get(threadId); + if (completedTurn && completedTurn.turn.id === turnId) { + return completedTurn; + } + return await new Promise((resolve) => { - this.connection.onNotification("turn/completed", (event: TurnCompletedNotification) => { - resolve(event); - }); + const resolvers = this.turnCompletedResolvers.get(threadId) ?? []; + resolvers.push({turnId, resolve}); + this.turnCompletedResolvers.set(threadId, resolvers); }); } + hasTurnCompleted(threadId: string, turnId: string): boolean { + const completedTurn = this.lastTurnCompletedByThread.get(threadId); + return completedTurn?.turn.id === turnId; + } + async listModels(params: ModelListParams = {cursor: null, limit: null}): Promise { return await this.sendRequest({ method: "model/list", params }); } @@ -273,11 +289,22 @@ export class CodexAppServerClient { private notificationHandlers = new Map void>(); private notify(notification: ServerNotification) { + const threadId = this.getThreadId(notification); + if (threadId) { + this.notificationHandlers.get(threadId)?.(notification); + return; + } + for (const notificationHandler of this.notificationHandlers.values()) { notificationHandler(notification); } } + private getThreadId(notification: ServerNotification): string | null { + const params = notification.params as { threadId?: unknown }; + return typeof params.threadId === "string" ? params.threadId : null; + } + private resolveMcpServerStartupResolvers(): void { const pendingResolvers: Array = []; for (const resolver of this.mcpServerStartupResolvers) { @@ -323,6 +350,28 @@ export class CodexAppServerClient { return { ready, failed, cancelled }; } + private resolveTurnCompleted(event: TurnCompletedNotification): void { + const resolvers = this.turnCompletedResolvers.get(event.threadId); + if (!resolvers) { + return; + } + + const pendingResolvers: Array = []; + for (const resolver of resolvers) { + if (resolver.turnId === event.turn.id) { + resolver.resolve(event); + } else { + pendingResolvers.push(resolver); + } + } + + if (pendingResolvers.length === 0) { + this.turnCompletedResolvers.delete(event.threadId); + } else { + this.turnCompletedResolvers.set(event.threadId, pendingResolvers); + } + } + private async sendRequest(request: CodexRequest): Promise { for (const callback of this.codexEventHandlers) { callback({ eventType: "request", ...request}); @@ -364,9 +413,28 @@ type McpServerStartupResolver = { resolve: (result: McpStartupResult) => void; }; +type TurnCompletedResolver = { + turnId: string; + resolve: (event: TurnCompletedNotification) => void; +}; + +type McpStartupCompleteNotification = { + method: "codex/event/mcp_startup_complete", + params: { + msg: McpStartupCompleteEvent & { type: "mcp_startup_complete" } + } +}; + function isMcpServerStatusUpdatedNotification(notification: ServerNotification): notification is { method: "mcpServer/startupStatus/updated"; params: McpServerStatusUpdatedNotification; } { return notification.method === "mcpServer/startupStatus/updated"; } + +function isTurnCompletedNotification(data: ServerNotification): data is { + method: "turn/completed"; + params: TurnCompletedNotification; +} { + return data.method === "turn/completed"; +} diff --git a/src/CodexCommands.ts b/src/CodexCommands.ts index c81e6429..10cbc8f1 100644 --- a/src/CodexCommands.ts +++ b/src/CodexCommands.ts @@ -1,6 +1,6 @@ import type * as acp from "@agentclientprotocol/sdk"; import type {AgentSideConnection, AvailableCommand} from "@agentclientprotocol/sdk"; -import {ACPSessionConnection} from "./ACPSessionConnection"; +import {ACPSessionConnection, type UpdateSessionEvent} from "./ACPSessionConnection"; import type {CodexAcpClient} from "./CodexAcpClient"; import type {RateLimitSnapshot, SkillsListEntry} from "./app-server/v2"; import type {SessionState} from "./CodexAcpServer"; @@ -12,27 +12,35 @@ export class CodexCommands { private readonly connection: AgentSideConnection; private readonly codexAcpClient: CodexAcpClient; private readonly runWithProcessCheck: (operation: () => Promise) => Promise; + private readonly hasTrackedSession: (sessionId: string) => boolean; + private readonly isSessionClosing: (sessionId: string) => boolean; constructor( connection: AgentSideConnection, codexAcpClient: CodexAcpClient, - runWithProcessCheck: (operation: () => Promise) => Promise + runWithProcessCheck: (operation: () => Promise) => Promise, + hasTrackedSession: (sessionId: string) => boolean, + isSessionClosing: (sessionId: string) => boolean, ) { this.connection = connection; this.codexAcpClient = codexAcpClient; this.runWithProcessCheck = runWithProcessCheck; + this.hasTrackedSession = hasTrackedSession; + this.isSessionClosing = isSessionClosing; } async publish(sessionId: string): Promise { try { + if (!this.hasTrackedSession(sessionId) || this.isSessionClosing(sessionId)) { + return; + } const skillsResponse = await this.runWithProcessCheck(() => this.codexAcpClient.listSkills()); const availableCommands = this.buildAvailableCommands(skillsResponse?.data ?? []); - if (availableCommands.length === 0) { + if (availableCommands.length === 0 || !this.hasTrackedSession(sessionId) || this.isSessionClosing(sessionId)) { return; } - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "available_commands_update", availableCommands }); @@ -124,9 +132,8 @@ export class CodexCommands { switch (command.name) { case "status": { - const session = new ACPSessionConnection(this.connection, sessionId); const message = this.buildStatusMessage(sessionState); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: message } }); @@ -134,8 +141,7 @@ export class CodexCommands { } case "logout": { await this.runWithProcessCheck(() => this.codexAcpClient.logout()); - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: "Logged out from Codex account." } }); @@ -151,8 +157,7 @@ export class CodexCommands { const text = lines.length > 0 ? ["Available skills:", ...lines].join("\n") : "No skills configured."; - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text } }); @@ -172,8 +177,7 @@ export class CodexCommands { const text = lines.length > 0 ? ["Configured MCP servers:", ...lines].join("\n") : "No MCP servers configured."; - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text } }); @@ -194,13 +198,20 @@ export class CodexCommands { if (lines.length > 0) { text.push(...lines); } - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: text.join("\n") } }); } + private async updateSession(sessionId: string, update: UpdateSessionEvent): Promise { + if (this.isSessionClosing(sessionId)) { + return; + } + const session = new ACPSessionConnection(this.connection, sessionId); + await session.update(update); + } + private buildStatusMessage(sessionState: SessionState): string { const agentMode = sessionState.agentMode; const accountText = this.formatAccountInfo(sessionState.account); diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index 6df1e7ed..db259305 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -331,9 +331,9 @@ describe('ACP server test', { timeout: 40_000 }, () => { function loadNotifications(){ //TODO collect logs form dev run and then load them from file to speedup const serverNotifications: ServerNotification[] = [ - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "He", }}, - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "ll", }}, - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "o!", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "He", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "ll", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "o!", }}, ]; function onServerNotification(_sessionId: string, callback: (event: ServerNotification) => void){ for (const notification of serverNotifications) { @@ -398,9 +398,9 @@ describe('ACP server test', { timeout: 40_000 }, () => { // Trigger notifications after both prompts - should produce only 3 events, not 6 const serverNotifications: ServerNotification[] = [ - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "He", }}, - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "ll", }}, - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "o!", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "He", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "ll", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "o!", }}, ]; for (const notification of serverNotifications) { mockFixture.sendServerNotification(notification); @@ -415,7 +415,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/follow-up-no-duplicates.json"); }); - it('should handle multiple sessions independently', async () => { + it('should route thread-scoped notifications to the matching session only', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -448,9 +448,9 @@ describe('ACP server test', { timeout: 40_000 }, () => { mockFixture.clearAcpConnectionDump(); - // Trigger notifications - both session handlers should receive them + // Trigger notifications for only session-1 const serverNotifications: ServerNotification[] = [ - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "Hello", }}, + { method: "item/agentMessage/delta", params: { threadId: "session-1", turnId: "string", itemId: "string", delta: "Hello", }}, ]; for (const notification of serverNotifications) { mockFixture.sendServerNotification(notification); @@ -462,8 +462,20 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(dump.length).toBeGreaterThan(0); }); - // Should have 2 events - one for each session's handler - await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/multiple-sessions.json"); + expect(mockFixture.getAcpConnectionEvents([])).toEqual([ + { + method: "sessionUpdate", + args: [ + { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Hello" }, + }, + }, + ], + }, + ]); }); it('should send attachments as prompt items', async () => { @@ -471,6 +483,22 @@ describe('ACP server test', { timeout: 40_000 }, () => { const codexAcpAgent = mockFixture.getCodexAcpAgent(); const codexAppServerClient = mockFixture.getCodexAppServerClient(); + vi.spyOn(codexAppServerClient.connection, "sendRequest").mockImplementation((method: string) => { + if (method === "turn/start") { + return Promise.resolve({ + turn: { + id: "turn-id", + items: [], + status: "inProgress", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); + } + return Promise.resolve(undefined); + }); vi.spyOn(codexAppServerClient, "awaitTurnCompleted").mockResolvedValue({ threadId: "session-id", turn: { @@ -496,7 +524,79 @@ describe('ACP server test', { timeout: 40_000 }, () => { await codexAcpAgent.prompt({ sessionId: "session-id", prompt }); - await expect(mockFixture.getCodexConnectionDump(ignoredFields)).toMatchFileSnapshot("data/send-attachments-turn-start.json"); + expect(mockFixture.getCodexConnectionEvents(ignoredFields)).toEqual([ + { + eventType: "request", + method: "skills/list", + params: { + cwds: ["/test/cwd"], + forceReload: true, + perCwdExtraUserRoots: [{ + cwd: "cwd", + extraUserRoots: [], + }], + }, + }, + { + eventType: "response", + }, + { + eventType: "request", + method: "turn/start", + params: { + outputSchema: null, + threadId: "threadId", + input: [ + { + type: "text", + text: "Hello", + text_elements: [], + }, + { + type: "image", + url: "https://example.com/image.png", + }, + { + type: "text", + text: "[@report.txt](file:///tmp/report.txt)", + text_elements: [], + }, + { + type: "text", + text: "[@notes.txt](file:///tmp/notes.txt)\n\nNotes body\n", + text_elements: [], + }, + ], + approvalPolicy: "on-request", + sandboxPolicy: { + type: "workspaceWrite", + writableRoots: [], + readOnlyAccess: "readOnlyAccess", + networkAccess: false, + excludeTmpdirEnvVar: false, + excludeSlashTmp: false, + }, + summary: null, + personality: null, + collaborationMode: null, + cwd: "cwd", + effort: "effort", + model: "model", + }, + }, + { + eventType: "response", + turn: { + id: "id", + items: [], + status: "inProgress", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }, + ]); }); it('should fail on wrong sessionId', async () => { @@ -516,6 +616,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { const codexAcpAgent = mockFixture.getCodexAcpAgent(); vi.spyOn(mockFixture.getCodexAcpClient(), "listSkills").mockResolvedValue({ data: [] }); + // @ts-expect-error seeding private session store for focused publish test + codexAcpAgent.sessions.set("session-id", createTestSessionState({ sessionId: "session-id" })); // @ts-expect-error - exercising private helper await codexAcpAgent.availableCommands.publish("session-id"); @@ -541,6 +643,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { errors: [] }] }); + // @ts-expect-error seeding private session store for focused publish test + codexAcpAgent.sessions.set("session-id", createTestSessionState({ sessionId: "session-id" })); // @ts-expect-error - exercising private helper await codexAcpAgent.availableCommands.publish("session-id"); @@ -929,6 +1033,18 @@ describe('ACP server test', { timeout: 40_000 }, () => { const mockFixture = createCodexMockTestFixture(); const codexAcpClient = mockFixture.getCodexAcpClient(); const codexAppServerClient = mockFixture.getCodexAppServerClient(); + vi.spyOn(codexAppServerClient, "awaitTurnCompleted").mockResolvedValue({ + threadId: "session-close", + turn: { + id: "turn-close", + items: [], + status: "interrupted", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); await codexAcpClient.subscribeToSessionEvents( "session-close", @@ -974,6 +1090,88 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(false); }); + it('waits for the matching turn completion before resolving a prompt', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + currentModelId: "model-id[effort]", + agentMode: AgentMode.DEFAULT_AGENT_MODE, + }); + + codexAppServerClient.turnStart = vi.fn().mockResolvedValue({ + turn: { + id: "turn-close", + items: [], + status: "inProgress", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); + vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + + let promptSettled = false; + const promptPromise = codexAcpAgent.prompt({ + sessionId: "session-close", + prompt: [{type: "text", text: "wait for the right turn"}], + }).then((result) => { + promptSettled = true; + return result; + }); + + await vi.waitFor(() => { + expect(sessionState.currentTurnId).toBe("turn-close"); + }); + + mockFixture.sendServerNotification({ + method: "turn/completed", + params: { + threadId: "other-session", + turn: { + id: "turn-other", + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }, + }); + await Promise.resolve(); + expect(promptSettled).toBe(false); + + mockFixture.sendServerNotification({ + method: "turn/completed", + params: { + threadId: "session-close", + turn: { + id: "turn-close", + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }, + }); + + await expect(promptPromise).resolves.toEqual({ + stopReason: "end_turn", + usage: null, + _meta: { + quota: { + token_count: null, + model_usage: [], + }, + }, + }); + }); + it('removes session bookkeeping after unstable_closeSession', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -1002,6 +1200,27 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(codexAcpAgent.pendingMcpStartupSessions.has(sessionState.sessionId)).toBe(false); }); + it('uses the pending turn start when closing before turn/started arrives', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + currentTurnId: null, + pendingTurnId: Promise.resolve("turn-pending"), + }); + const closeSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + await expect( + codexAcpAgent.unstable_closeSession({sessionId: sessionState.sessionId}) + ).resolves.toEqual({}); + + expect(closeSpy).toHaveBeenCalledWith(sessionState.sessionId, "turn-pending"); + }); + it('skips late MCP startup updates after a session is closed', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -1037,4 +1256,45 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); }); + + it('skips late available commands updates after a session is closed', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let resolveSkills!: (response: SkillsListResponse) => void; + const skillsPromise = new Promise((resolve) => { + resolveSkills = resolve; + }); + vi.spyOn(codexAcpClient, "listSkills").mockReturnValue(skillsPromise); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + // @ts-expect-error exercising private helper through the availableCommands collaborator + const publishPromise = codexAcpAgent.availableCommands.publish(sessionState.sessionId); + // @ts-expect-error exercising private helper to simulate close-session cleanup + codexAcpAgent.forgetSession(sessionState.sessionId); + + resolveSkills({ + data: [{ + cwd: "/workspace", + skills: [{ + name: "build", + description: "Build the project", + shortDescription: "Build", + path: "/workspace", + scope: "user", + enabled: true, + }], + errors: [], + }], + }); + await publishPromise; + + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + }); }); diff --git a/src/__tests__/CodexACPAgent/data/send-attachments-turn-start.json b/src/__tests__/CodexACPAgent/data/send-attachments-turn-start.json deleted file mode 100644 index 25ade969..00000000 --- a/src/__tests__/CodexACPAgent/data/send-attachments-turn-start.json +++ /dev/null @@ -1,66 +0,0 @@ -{ - "eventType": "request", - "method": "skills/list", - "params": { - "cwds": [ - "/test/cwd" - ], - "forceReload": true, - "perCwdExtraUserRoots": [ - { - "cwd": "cwd", - "extraUserRoots": [] - } - ] - } -} -{ - "eventType": "response" -} -{ - "eventType": "request", - "method": "turn/start", - "params": { - "outputSchema": null, - "threadId": "threadId", - "input": [ - { - "type": "text", - "text": "Hello", - "text_elements": [] - }, - { - "type": "image", - "url": "https://example.com/image.png" - }, - { - "type": "text", - "text": "[@report.txt](file:///tmp/report.txt)", - "text_elements": [] - }, - { - "type": "text", - "text": "[@notes.txt](file:///tmp/notes.txt)\n\nNotes body\n", - "text_elements": [] - } - ], - "approvalPolicy": "on-request", - "sandboxPolicy": { - "type": "workspaceWrite", - "writableRoots": [], - "readOnlyAccess": "readOnlyAccess", - "networkAccess": false, - "excludeTmpdirEnvVar": false, - "excludeSlashTmp": false - }, - "summary": null, - "personality": null, - "collaborationMode": null, - "cwd": "cwd", - "effort": "effort", - "model": "model" - } -} -{ - "eventType": "response" -} \ No newline at end of file diff --git a/src/__tests__/acp-test-utils.ts b/src/__tests__/acp-test-utils.ts index d702f0d2..6a690b06 100644 --- a/src/__tests__/acp-test-utils.ts +++ b/src/__tests__/acp-test-utils.ts @@ -299,6 +299,7 @@ function anonymizeValue(value: any, path: string[], fieldsToAnonymize: Set): SessionState { return { + pendingTurnId: null, currentTurnId: null, lastTokenUsage: null, totalTokenUsage: null, @@ -343,7 +344,21 @@ export async function setupPromptAndSendNotifications( fixture.clearAcpConnectionDump(); for (const notification of notifications) { - fixture.sendServerNotification(notification); + const routedNotification = (() => { + const params = notification.params as { threadId?: unknown }; + if (typeof params.threadId !== "string") { + return notification; + } + return { + ...notification, + params: { + ...notification.params, + threadId: sessionId, + }, + } satisfies ServerNotification; + })(); + + fixture.sendServerNotification(routedNotification); } await vi.waitFor(() => { From 3486dad6b07595ac0a4a87794e35f19e6b14bda1 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Thu, 23 Apr 2026 17:28:53 +0200 Subject: [PATCH 3/6] Cleanups --- src/CodexAcpClient.ts | 1 + src/CodexAcpServer.ts | 29 +++-- src/CodexAppServerClient.ts | 30 +++-- src/CodexCommands.ts | 2 +- .../CodexACPAgent/CodexAcpClient.test.ts | 117 +++++++++++++++++- src/__tests__/acp-test-utils.ts | 6 +- 6 files changed, 154 insertions(+), 31 deletions(-) diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index 88281b65..76119545 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -392,6 +392,7 @@ export class CodexAcpClient { await this.codexClient.threadUnsubscribe({threadId: sessionId}); this.unsubscribeFromSessionEvents(sessionId); + this.codexClient.clearThreadState(sessionId); } async startPrompt( diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 4ec1b693..3d5c9dd4 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -685,6 +685,12 @@ export class CodexAcpServer implements acp.Agent { } } + private ensureSessionIsActive(sessionId: string, requireTrackedSession: boolean = false): void { + if (this.closingSessions.has(sessionId) || (requireTrackedSession && !this.sessions.has(sessionId))) { + throw RequestError.invalidRequest("Session is closing"); + } + } + private forgetSession(sessionId: string): void { this.sessions.delete(sessionId); this.pendingMcpStartupSessions.delete(sessionId); @@ -721,20 +727,23 @@ export class CodexAcpServer implements acp.Agent { return; } + const requestedServers = pendingStartup.requestedServers; + const afterVersion = pendingStartup.afterVersion; + try { const mcpStartup = await this.runWithProcessCheck(() => this.codexAcpClient.awaitMcpServerStartup( - Array.from(pendingStartup.requestedServers), - pendingStartup.afterVersion, + Array.from(requestedServers), + afterVersion, ) ); const sessionState = this.sessions.get(sessionId); - const pendingStartup = this.pendingMcpStartupSessions.get(sessionId); - if (sessionState && pendingStartup) { + const currentPendingStartup = this.pendingMcpStartupSessions.get(sessionId); + if (sessionState && currentPendingStartup) { sessionState.sessionMcpServers = mcpStartup.ready.filter(serverName => - pendingStartup.requestedServers.has(serverName) + currentPendingStartup.requestedServers.has(serverName) ); -await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.requestedServers); + await this.publishMcpStartupStatus(sessionId, mcpStartup, currentPendingStartup.requestedServers); } else { logger.log("Skipping MCP startup status for closed session", {sessionId}); } @@ -772,9 +781,8 @@ await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.request prompt: params.prompt, }); const sessionState = this.getSessionState(params.sessionId); - if (this.closingSessions.has(params.sessionId)) { - throw RequestError.invalidRequest("Session is closing"); - } + const requireTrackedSession = this.sessions.has(params.sessionId); + this.ensureSessionIsActive(params.sessionId, requireTrackedSession); sessionState.pendingTurnId = null; sessionState.currentTurnId = null; sessionState.lastTokenUsage = null; @@ -793,6 +801,7 @@ await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.request }, approvalHandler, elicitationHandler); + this.ensureSessionIsActive(params.sessionId, requireTrackedSession); if (await this.availableCommands.tryHandle(params.prompt, sessionState)) { logger.log("Prompt handled by a command"); @@ -803,6 +812,8 @@ await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.request }; } + this.ensureSessionIsActive(params.sessionId, requireTrackedSession); + const modelId = ModelId.fromString(sessionState.currentModelId); const modelLacksReasoning = sessionState.supportedReasoningEfforts.length > 0 && sessionState.supportedReasoningEfforts.every(e => e.reasoningEffort === "none"); diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index 24c1fa77..d1a06b45 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -103,19 +103,18 @@ export class CodexAppServerClient { this.connection = connection; this.connection.onUnhandledNotification((data) => { const serverNotification = data as ServerNotification; - if (isMcpServerStatusUpdatedNotification(serverNotification)) { - this.mcpServerStartupVersion += 1; - this.mcpServerStartupStates.set(serverNotification.params.name, { - status: serverNotification.params.status, - error: serverNotification.params.error, - version: this.mcpServerStartupVersion, - }); - this.resolveMcpServerStartupResolvers(); - } + if (isMcpServerStatusUpdatedNotification(serverNotification)) { + this.mcpServerStartupVersion += 1; + this.mcpServerStartupStates.set(serverNotification.params.name, { + status: serverNotification.params.status, + error: serverNotification.params.error, + version: this.mcpServerStartupVersion, + }); + this.resolveMcpServerStartupResolvers(); + } if (isTurnCompletedNotification(serverNotification)) { this.lastTurnCompletedByThread.set(serverNotification.params.threadId, serverNotification.params); this.resolveTurnCompleted(serverNotification.params); - } } this.notify(serverNotification); for (const callback of this.codexEventHandlers) { @@ -282,6 +281,11 @@ export class CodexAppServerClient { this.notificationHandlers.delete(sessionId); } + clearThreadState(threadId: string): void { + this.turnCompletedResolvers.delete(threadId); + this.lastTurnCompletedByThread.delete(threadId); + } + private codexEventHandlers: Array<(event: CodexConnectionEvent) => void> = []; onClientTransportEvent(callback: (event: CodexConnectionEvent) => void){ this.codexEventHandlers.push(callback); @@ -418,12 +422,6 @@ type TurnCompletedResolver = { resolve: (event: TurnCompletedNotification) => void; }; -type McpStartupCompleteNotification = { - method: "codex/event/mcp_startup_complete", - params: { - msg: McpStartupCompleteEvent & { type: "mcp_startup_complete" } - } -}; function isMcpServerStatusUpdatedNotification(notification: ServerNotification): notification is { method: "mcpServer/startupStatus/updated"; diff --git a/src/CodexCommands.ts b/src/CodexCommands.ts index 10cbc8f1..51d49136 100644 --- a/src/CodexCommands.ts +++ b/src/CodexCommands.ts @@ -205,7 +205,7 @@ export class CodexCommands { } private async updateSession(sessionId: string, update: UpdateSessionEvent): Promise { - if (this.isSessionClosing(sessionId)) { + if (this.isSessionClosing(sessionId) || !this.hasTrackedSession(sessionId)) { return; } const session = new ACPSessionConnection(this.connection, sessionId); diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index db259305..83e0a889 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -658,6 +658,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { const sessionState: SessionState = createTestSessionState(); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); await codexAcpAgent.prompt({ sessionId: "session-id", prompt: [{ type: "text", text: "/status" }] }); await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/command-status.json"); @@ -670,6 +672,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { const sessionState: SessionState = createTestSessionState(); const logoutSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "logout").mockResolvedValue({}); + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); // @ts-expect-error - exercising private helper const handled = await codexAcpAgent.availableCommands.handleCommand({ name: "logout", input: null }, sessionState); @@ -696,6 +700,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { }] }; const skillsSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "listSkills").mockResolvedValue(skillsResponse); + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); // @ts-expect-error - exercising private helper const handled = await codexAcpAgent.availableCommands.handleCommand({ name: "skills", input: null }, sessionState); @@ -731,6 +737,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { nextCursor: null }; const mcpSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "listMcpServers").mockResolvedValue(mcpResponse); + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); // @ts-expect-error - exercising private helper const handled = await codexAcpAgent.availableCommands.handleCommand({ name: "mcp", input: null }, sessionState); @@ -820,7 +828,10 @@ describe('ACP server test', { timeout: 40_000 }, () => { durationMs: null, } }); - vi.spyOn(mockFixture.getCodexAcpAgent(), "getSessionState").mockReturnValue(sessionState); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); return { mockFixture, sessionState, turnStartSpy }; } @@ -1064,6 +1075,24 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(codexAppServerClient.approvalHandlers.has("session-close")).toBe(true); // @ts-expect-error verifying test-only access to private maps expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(true); + // @ts-expect-error verifying test-only access to private maps + codexAppServerClient.turnCompletedResolvers.set("session-close", [{ + turnId: "other-turn", + resolve: vi.fn(), + }]); + // @ts-expect-error verifying test-only access to private maps + codexAppServerClient.lastTurnCompletedByThread.set("session-close", { + threadId: "session-close", + turn: { + id: "completed-turn", + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); await codexAcpClient.closeSession("session-close", "turn-close"); @@ -1088,6 +1117,10 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(codexAppServerClient.approvalHandlers.has("session-close")).toBe(false); // @ts-expect-error verifying test-only access to private maps expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.turnCompletedResolvers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.lastTurnCompletedByThread.has("session-close")).toBe(false); }); it('waits for the matching turn completion before resolving a prompt', async () => { @@ -1221,6 +1254,45 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(closeSpy).toHaveBeenCalledWith(sessionState.sessionId, "turn-pending"); }); + it('rejects a prompt if the session closes before turn start begins', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let resolveTryHandle!: (handled: boolean) => void; + const tryHandlePromise = new Promise((resolve) => { + resolveTryHandle = resolve; + }); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + // @ts-expect-error exercising private helper through the availableCommands collaborator + const tryHandleSpy = vi.spyOn(codexAcpAgent.availableCommands, "tryHandle").mockReturnValue(tryHandlePromise); + const startPromptSpy = vi.spyOn(codexAcpClient, "startPrompt"); + vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + const promptPromise = codexAcpAgent.prompt({ + sessionId: sessionState.sessionId, + prompt: [{ type: "text", text: "normal prompt" }], + }); + + await vi.waitFor(() => { + expect(tryHandleSpy).toHaveBeenCalled(); + }); + + await expect( + codexAcpAgent.unstable_closeSession({sessionId: sessionState.sessionId}) + ).resolves.toEqual({}); + + resolveTryHandle(false); + + await expect(promptPromise).rejects.toThrow("Invalid request"); + expect(startPromptSpy).not.toHaveBeenCalled(); + }); + it('skips late MCP startup updates after a session is closed', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -1233,7 +1305,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { const startupPromise = new Promise((resolve) => { resolveStartup = resolve; }); - vi.spyOn(codexAcpClient, "awaitMcpStartupResult").mockReturnValue(startupPromise as Promise); + vi.spyOn(codexAcpClient, "awaitMcpServerStartup").mockReturnValue(startupPromise as Promise); // @ts-expect-error seeding private session store for focused close-session test codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); @@ -1297,4 +1369,45 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); }); + + it('skips late slash command updates after a session is closed', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let resolveSkills!: (response: SkillsListResponse) => void; + const skillsPromise = new Promise((resolve) => { + resolveSkills = resolve; + }); + vi.spyOn(codexAcpClient, "listSkills").mockReturnValue(skillsPromise); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + // @ts-expect-error exercising private helper through the availableCommands collaborator + const commandPromise = codexAcpAgent.availableCommands.handleCommand({ name: "skills", input: null }, sessionState); + // @ts-expect-error exercising private helper to simulate close-session cleanup + codexAcpAgent.forgetSession(sessionState.sessionId); + + resolveSkills({ + data: [{ + cwd: "/workspace", + skills: [{ + name: "build", + description: "Build the project", + shortDescription: "Build", + path: "/workspace", + scope: "user", + enabled: true, + }], + errors: [], + }], + }); + + await expect(commandPromise).resolves.toBe(true); + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + }); }); diff --git a/src/__tests__/acp-test-utils.ts b/src/__tests__/acp-test-utils.ts index 6a690b06..0db2c7b5 100644 --- a/src/__tests__/acp-test-utils.ts +++ b/src/__tests__/acp-test-utils.ts @@ -345,8 +345,8 @@ export async function setupPromptAndSendNotifications( for (const notification of notifications) { const routedNotification = (() => { - const params = notification.params as { threadId?: unknown }; - if (typeof params.threadId !== "string") { + const params = notification.params; + if ("threadId" in params && typeof params.threadId !== "string") { return notification; } return { @@ -355,7 +355,7 @@ export async function setupPromptAndSendNotifications( ...notification.params, threadId: sessionId, }, - } satisfies ServerNotification; + } as ServerNotification; })(); fixture.sendServerNotification(routedNotification); From deb1946cbc5717b4be1155e74747219034a9a040 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Tue, 28 Apr 2026 18:22:04 +0200 Subject: [PATCH 4/6] Use stable method name --- src/CodexAcpServer.ts | 2 +- src/__tests__/CodexACPAgent/CodexAcpClient.test.ts | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 3d5c9dd4..47176949 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -245,7 +245,7 @@ export class CodexAcpServer implements acp.Agent { return await this.runWithProcessCheck(() => this.codexAcpClient.listSessions(params)); } - async unstable_closeSession(params: acp.CloseSessionRequest): Promise { + async closeSession(params: acp.CloseSessionRequest): Promise { logger.log("Closing session...", {sessionId: params.sessionId}); const sessionState = this.getSessionState(params.sessionId); this.closingSessions.add(params.sessionId); diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index 83e0a889..becbb61e 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -1205,7 +1205,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { }); }); - it('removes session bookkeeping after unstable_closeSession', async () => { + it('removes session bookkeeping after closeSession', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); const codexAcpClient = mockFixture.getCodexAcpClient(); @@ -1223,7 +1223,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { }); await expect( - codexAcpAgent.unstable_closeSession({sessionId: sessionState.sessionId}) + codexAcpAgent.closeSession({sessionId: sessionState.sessionId}) ).resolves.toEqual({}); expect(closeSpy).toHaveBeenCalledWith(sessionState.sessionId, sessionState.currentTurnId); expect(() => codexAcpAgent.getSessionState(sessionState.sessionId)).toThrow( @@ -1248,7 +1248,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); await expect( - codexAcpAgent.unstable_closeSession({sessionId: sessionState.sessionId}) + codexAcpAgent.closeSession({sessionId: sessionState.sessionId}) ).resolves.toEqual({}); expect(closeSpy).toHaveBeenCalledWith(sessionState.sessionId, "turn-pending"); @@ -1284,7 +1284,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { }); await expect( - codexAcpAgent.unstable_closeSession({sessionId: sessionState.sessionId}) + codexAcpAgent.closeSession({sessionId: sessionState.sessionId}) ).resolves.toEqual({}); resolveTryHandle(false); From 625c5b7b21bfae09d43feb30a8b2d515099af276 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Tue, 28 Apr 2026 18:27:38 +0200 Subject: [PATCH 5/6] Use current turn id --- src/CodexAcpServer.ts | 29 +++++++++---------- .../CodexACPAgent/CodexAcpClient.test.ts | 3 +- src/__tests__/acp-test-utils.ts | 1 - 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 47176949..dccaed14 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -44,8 +44,8 @@ export interface SessionState { supportedReasoningEfforts: Array, supportedInputModalities: Array, agentMode: AgentMode, - pendingTurnId: Promise | null; - currentTurnId: string | null; + // Promise covers the turn/start request window before the turn id response arrives. + currentTurnId: Promise | string | null; lastTokenUsage: TokenCount | null; totalTokenUsage: TokenCount | null; modelContextWindow: number | null; @@ -175,7 +175,6 @@ export class CodexAcpServer implements acp.Agent { supportedReasoningEfforts: currentModel?.supportedReasoningEfforts ?? [], supportedInputModalities: currentModel?.inputModalities ?? ["text", "image"], agentMode: AgentMode.getInitialAgentMode(), - pendingTurnId: null, currentTurnId: null, lastTokenUsage: null, totalTokenUsage: null, @@ -407,7 +406,6 @@ export class CodexAcpServer implements acp.Agent { supportedReasoningEfforts: currentModel?.supportedReasoningEfforts ?? [], supportedInputModalities: currentModel?.inputModalities ?? ["text", "image"], agentMode: AgentMode.getInitialAgentMode(), - pendingTurnId: null, currentTurnId: null, lastTokenUsage: null, totalTokenUsage: null, @@ -670,17 +668,19 @@ export class CodexAcpServer implements acp.Agent { } private async resolveActiveTurnId(sessionState: SessionState): Promise { - if (sessionState.currentTurnId) { - return sessionState.currentTurnId; - } - if (!sessionState.pendingTurnId) { + const currentTurnId = sessionState.currentTurnId; + if (!currentTurnId) { return null; } + if (typeof currentTurnId === "string") { + return currentTurnId; + } + try { - return await sessionState.pendingTurnId; + return await currentTurnId; } catch (err) { - logger.error(`Failed to resolve pending turn for session ${sessionState.sessionId}`, err); + logger.error(`Failed to resolve current turn for session ${sessionState.sessionId}`, err); return null; } } @@ -783,7 +783,6 @@ export class CodexAcpServer implements acp.Agent { const sessionState = this.getSessionState(params.sessionId); const requireTrackedSession = this.sessions.has(params.sessionId); this.ensureSessionIsActive(params.sessionId, requireTrackedSession); - sessionState.pendingTurnId = null; sessionState.currentTurnId = null; sessionState.lastTokenUsage = null; @@ -830,12 +829,11 @@ export class CodexAcpServer implements acp.Agent { throw RequestError.invalidRequest("The current model does not support image input"); } const agentMode = sessionState.agentMode; - const pendingTurnId = this.runWithProcessCheck( + const turnIdPromise = this.runWithProcessCheck( () => this.codexAcpClient.startPrompt(params, agentMode, modelId, disableSummary, sessionState.cwd) ); - sessionState.pendingTurnId = pendingTurnId; - const turnId = await pendingTurnId; - sessionState.pendingTurnId = null; + sessionState.currentTurnId = turnIdPromise; + const turnId = await turnIdPromise; sessionState.currentTurnId = turnId; const turnCompleted = await this.runWithProcessCheck( @@ -879,7 +877,6 @@ export class CodexAcpServer implements acp.Agent { throw err; } finally { logger.log("Prompt completed", {sessionId: params.sessionId}); - sessionState.pendingTurnId = null; sessionState.currentTurnId = null; } } diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index becbb61e..7644c92f 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -1239,8 +1239,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { const codexAcpClient = mockFixture.getCodexAcpClient(); const sessionState = createTestSessionState({ sessionId: "session-close", - currentTurnId: null, - pendingTurnId: Promise.resolve("turn-pending"), + currentTurnId: Promise.resolve("turn-pending"), }); const closeSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); diff --git a/src/__tests__/acp-test-utils.ts b/src/__tests__/acp-test-utils.ts index 0db2c7b5..22562310 100644 --- a/src/__tests__/acp-test-utils.ts +++ b/src/__tests__/acp-test-utils.ts @@ -299,7 +299,6 @@ function anonymizeValue(value: any, path: string[], fieldsToAnonymize: Set): SessionState { return { - pendingTurnId: null, currentTurnId: null, lastTokenUsage: null, totalTokenUsage: null, From b46d04329fc379bebb806c755c5c7d2ce2b1c28d Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Wed, 29 Apr 2026 08:19:37 +0200 Subject: [PATCH 6/6] Better handle the async current turns --- src/CodexAcpClient.ts | 58 ++++- src/CodexAcpServer.ts | 97 ++++++-- src/CodexAppServerClient.ts | 66 +++++- src/CodexCommands.ts | 24 +- src/CodexEventHandler.ts | 2 - .../CodexACPAgent/CodexAcpClient.test.ts | 221 ++++++++++++++++-- .../CodexACPAgent/approval-events.test.ts | 2 + .../command-action-events.test.ts | 30 +-- .../CodexACPAgent/elicitation-events.test.ts | 2 + .../CodexACPAgent/file-change-events.test.ts | 14 +- .../model-rerouted-events.test.ts | 2 +- .../terminal-output-events.test.ts | 24 +- .../CodexACPAgent/token-usage-events.test.ts | 10 +- src/__tests__/acp-test-utils.ts | 18 +- 14 files changed, 449 insertions(+), 121 deletions(-) diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index 76119545..58b4fa5f 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -31,12 +31,15 @@ import type { SkillsListResponse, Thread, ThreadSourceKind, + ThreadUnsubscribeResponse, TurnCompletedNotification, UserInput, } from "./app-server/v2"; import packageJson from "../package.json"; import type {AuthenticationLogoutResponse, AuthenticationStatusResponse} from "./AcpExtensions"; +const CLOSE_TURN_COMPLETION_TIMEOUT_MS = 10_000; + /** * API for accessing the Codex App Server using ACP requests. * Converts ACP requests into corresponding app-server operations. @@ -382,17 +385,34 @@ export class CodexAcpClient { } async closeSession(sessionId: string, currentTurnId: string | null): Promise { - if (currentTurnId && !this.codexClient.hasTurnCompleted(sessionId, currentTurnId)) { - await this.codexClient.turnInterrupt({ - threadId: sessionId, - turnId: currentTurnId, - }); - await this.codexClient.awaitTurnCompleted(sessionId, currentTurnId); - } + try { + if (currentTurnId && !this.codexClient.hasTurnCompleted(sessionId, currentTurnId)) { + try { + await this.codexClient.turnInterrupt({ + threadId: sessionId, + turnId: currentTurnId, + }); + await this.waitForTurnCompletionDuringClose(sessionId, currentTurnId); + } catch (err) { + logger.error(`Failed to interrupt active turn while closing session ${sessionId}`, err); + } + } - await this.codexClient.threadUnsubscribe({threadId: sessionId}); - this.unsubscribeFromSessionEvents(sessionId); - this.codexClient.clearThreadState(sessionId); + try { + const response = await this.codexClient.threadUnsubscribe({threadId: sessionId}) as ThreadUnsubscribeResponse | undefined; + if (response?.status && response.status !== "unsubscribed") { + logger.log("Thread unsubscribe completed without an active subscription", { + sessionId, + status: response.status, + }); + } + } catch (err) { + logger.error(`Failed to unsubscribe thread while closing session ${sessionId}`, err); + } + } finally { + this.unsubscribeFromSessionEvents(sessionId); + this.codexClient.clearThreadState(sessionId); + } } async startPrompt( @@ -428,6 +448,24 @@ export class CodexAcpClient { return await this.codexClient.awaitTurnCompleted(sessionId, turnId); } + private async waitForTurnCompletionDuringClose(sessionId: string, turnId: string): Promise { + let timeout: ReturnType | null = null; + try { + await Promise.race([ + this.codexClient.awaitTurnCompleted(sessionId, turnId), + new Promise((_, reject) => { + timeout = setTimeout(() => { + reject(new Error(`Timed out waiting for turn ${turnId} to complete during close`)); + }, CLOSE_TURN_COMPLETION_TIMEOUT_MS); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } + } + hasTurnCompleted(sessionId: string, turnId: string): boolean { return this.codexClient.hasTurnCompleted(sessionId, turnId); } diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index dccaed14..14c84f57 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -70,6 +70,7 @@ export class CodexAcpServer implements acp.Agent { private readonly sessions: Map; private readonly pendingMcpStartupSessions: Map; private readonly closingSessions: Set; + private readonly closeSessionPromises: Map>; constructor( connection: acp.AgentSideConnection, @@ -80,6 +81,7 @@ export class CodexAcpServer implements acp.Agent { this.sessions = new Map(); this.pendingMcpStartupSessions = new Map(); this.closingSessions = new Set(); + this.closeSessionPromises = new Map(); this.connection = connection; this.codexAcpClient = codexAcpClient; this.defaultAuthRequest = defaultAuthRequest ?? null; @@ -246,24 +248,40 @@ export class CodexAcpServer implements acp.Agent { async closeSession(params: acp.CloseSessionRequest): Promise { logger.log("Closing session...", {sessionId: params.sessionId}); - const sessionState = this.getSessionState(params.sessionId); - this.closingSessions.add(params.sessionId); + const existingClose = this.closeSessionPromises.get(params.sessionId); + if (existingClose) { + await existingClose; + logger.log("Session close already completed", {sessionId: params.sessionId}); + return {}; + } + const closePromise = this.closeSessionInternal(params.sessionId); + this.closeSessionPromises.set(params.sessionId, closePromise); try { - const activeTurnId = await this.resolveActiveTurnId(sessionState); - await this.runWithProcessCheck(() => - this.codexAcpClient.closeSession(params.sessionId, activeTurnId) - ); - this.forgetSession(params.sessionId); - } catch (err) { - this.closingSessions.delete(params.sessionId); - throw err; + await closePromise; + } finally { + this.closeSessionPromises.delete(params.sessionId); } logger.log("Session closed", {sessionId: params.sessionId}); return {}; } + private async closeSessionInternal(sessionId: string): Promise { + logger.log("Closing session internals...", {sessionId}); + const sessionState = this.getSessionState(sessionId); + this.closingSessions.add(sessionId); + + try { + const activeTurnId = await this.resolveActiveTurnId(sessionState); + await this.runWithProcessCheck(() => + this.codexAcpClient.closeSession(sessionId, activeTurnId) + ); + } finally { + this.forgetSession(sessionId); + } + } + async newSession( params: acp.NewSessionRequest, ): Promise { @@ -305,6 +323,9 @@ export class CodexAcpServer implements acp.Agent { }); const sessionState = this.sessions.get(_params.sessionId); if (!sessionState) throw new Error(`Session ${_params.sessionId} not found`); + if (this.closingSessions.has(_params.sessionId)) { + throw RequestError.invalidRequest("Session is closing"); + } const newMode = AgentMode.find(_params.modeId); if (!newMode) { @@ -321,6 +342,9 @@ export class CodexAcpServer implements acp.Agent { }); const sessionState = this.sessions.get(params.sessionId); if (!sessionState) throw new Error(`Session ${params.sessionId} not found`); + if (this.closingSessions.has(params.sessionId)) { + throw RequestError.invalidRequest("Session is closing"); + } const requestedModelId= ModelId.fromString(params.modelId); const requestedModelName = requestedModelId.model; @@ -685,10 +709,8 @@ export class CodexAcpServer implements acp.Agent { } } - private ensureSessionIsActive(sessionId: string, requireTrackedSession: boolean = false): void { - if (this.closingSessions.has(sessionId) || (requireTrackedSession && !this.sessions.has(sessionId))) { - throw RequestError.invalidRequest("Session is closing"); - } + private isSessionClosedOrClosing(sessionId: string): boolean { + return this.closingSessions.has(sessionId) || !this.sessions.has(sessionId); } private forgetSession(sessionId: string): void { @@ -781,8 +803,9 @@ export class CodexAcpServer implements acp.Agent { prompt: params.prompt, }); const sessionState = this.getSessionState(params.sessionId); - const requireTrackedSession = this.sessions.has(params.sessionId); - this.ensureSessionIsActive(params.sessionId, requireTrackedSession); + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } sessionState.currentTurnId = null; sessionState.lastTokenUsage = null; @@ -800,10 +823,15 @@ export class CodexAcpServer implements acp.Agent { }, approvalHandler, elicitationHandler); - this.ensureSessionIsActive(params.sessionId, requireTrackedSession); + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } if (await this.availableCommands.tryHandle(params.prompt, sessionState)) { logger.log("Prompt handled by a command"); + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } return { stopReason: "end_turn", usage: this.buildPromptUsage(sessionState.lastTokenUsage), @@ -811,7 +839,9 @@ export class CodexAcpServer implements acp.Agent { }; } - this.ensureSessionIsActive(params.sessionId, requireTrackedSession); + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } const modelId = ModelId.fromString(sessionState.currentModelId); const modelLacksReasoning = sessionState.supportedReasoningEfforts.length > 0 @@ -832,8 +862,20 @@ export class CodexAcpServer implements acp.Agent { const turnIdPromise = this.runWithProcessCheck( () => this.codexAcpClient.startPrompt(params, agentMode, modelId, disableSummary, sessionState.cwd) ); + turnIdPromise.catch(() => { + // The prompt path awaits this promise, but close also reads currentTurnId. + // Attach a handler immediately so future readers cannot create an unhandled rejection. + }); sessionState.currentTurnId = turnIdPromise; - const turnId = await turnIdPromise; + let turnId: string; + try { + turnId = await turnIdPromise; + } catch (err) { + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } + throw err; + } sessionState.currentTurnId = turnId; const turnCompleted = await this.runWithProcessCheck( @@ -854,11 +896,7 @@ export class CodexAcpServer implements acp.Agent { } }); } - return { - stopReason: "cancelled", - usage: this.buildPromptUsage(sessionState.lastTokenUsage), - _meta: this.buildQuotaMeta(sessionState), - }; + return this.buildCancelledPromptResponse(sessionState); } const error = eventHandler.getFailure() @@ -873,6 +911,9 @@ export class CodexAcpServer implements acp.Agent { _meta: this.buildQuotaMeta(sessionState), }; } catch (err) { + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } logger.error(`Prompt for session ${params.sessionId} failed`, err); throw err; } finally { @@ -907,6 +948,14 @@ export class CodexAcpServer implements acp.Agent { return toPromptUsage(lastTokenUsage); } + private buildCancelledPromptResponse(sessionState: SessionState): acp.PromptResponse { + return { + stopReason: "cancelled", + usage: this.buildPromptUsage(sessionState.lastTokenUsage), + _meta: this.buildQuotaMeta(sessionState), + }; + } + private async runWithProcessCheck(operation: () => Promise): Promise { try { return await operation(); diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index d1a06b45..68ce44c9 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -5,6 +5,7 @@ import type { InitializeResponse, ServerNotification } from "./app-server"; +import {logger} from "./Logger"; import type { AccountLoginCompletedNotification, AccountUpdatedNotification, ConfigReadParams, @@ -85,6 +86,8 @@ const McpServerElicitationRequest = new RequestType< void >('mcpServer/elicitation/request'); +const MAX_COMPLETED_TURNS_PER_THREAD = 20; + /** * A type-safe client over the Codex App Server's JSON-RPC API. * Maps each request to its expected response and exposes clear, typed methods for supported JSON-RPC operations. @@ -97,7 +100,7 @@ export class CodexAppServerClient { private readonly mcpServerStartupStates = new Map(); private readonly mcpServerStartupResolvers: Array = []; private readonly turnCompletedResolvers = new Map>(); - private readonly lastTurnCompletedByThread = new Map(); + private readonly completedTurnsByThread = new Map>(); constructor(connection: MessageConnection) { this.connection = connection; @@ -113,7 +116,7 @@ export class CodexAppServerClient { this.resolveMcpServerStartupResolvers(); } if (isTurnCompletedNotification(serverNotification)) { - this.lastTurnCompletedByThread.set(serverNotification.params.threadId, serverNotification.params); + this.rememberCompletedTurn(serverNotification.params); this.resolveTurnCompleted(serverNotification.params); } this.notify(serverNotification); @@ -244,21 +247,20 @@ export class CodexAppServerClient { } async awaitTurnCompleted(threadId: string, turnId: string): Promise { - const completedTurn = this.lastTurnCompletedByThread.get(threadId); - if (completedTurn && completedTurn.turn.id === turnId) { + const completedTurn = this.getCompletedTurn(threadId, turnId); + if (completedTurn) { return completedTurn; } - return await new Promise((resolve) => { + return await new Promise((resolve, reject) => { const resolvers = this.turnCompletedResolvers.get(threadId) ?? []; - resolvers.push({turnId, resolve}); + resolvers.push({turnId, resolve, reject}); this.turnCompletedResolvers.set(threadId, resolvers); }); } hasTurnCompleted(threadId: string, turnId: string): boolean { - const completedTurn = this.lastTurnCompletedByThread.get(threadId); - return completedTurn?.turn.id === turnId; + return this.getCompletedTurn(threadId, turnId) !== null; } async listModels(params: ModelListParams = {cursor: null, limit: null}): Promise { @@ -282,8 +284,12 @@ export class CodexAppServerClient { } clearThreadState(threadId: string): void { + const resolvers = this.turnCompletedResolvers.get(threadId) ?? []; + for (const resolver of resolvers) { + resolver.reject(new Error(`Stopped waiting for turn completion after thread ${threadId} was cleared`)); + } this.turnCompletedResolvers.delete(threadId); - this.lastTurnCompletedByThread.delete(threadId); + this.completedTurnsByThread.delete(threadId); } private codexEventHandlers: Array<(event: CodexConnectionEvent) => void> = []; @@ -295,7 +301,15 @@ export class CodexAppServerClient { private notify(notification: ServerNotification) { const threadId = this.getThreadId(notification); if (threadId) { - this.notificationHandlers.get(threadId)?.(notification); + const notificationHandler = this.notificationHandlers.get(threadId); + if (notificationHandler) { + notificationHandler(notification); + } else { + logger.log("Dropping scoped notification for unregistered thread", { + method: notification.method, + threadId, + }); + } return; } @@ -305,8 +319,35 @@ export class CodexAppServerClient { } private getThreadId(notification: ServerNotification): string | null { - const params = notification.params as { threadId?: unknown }; - return typeof params.threadId === "string" ? params.threadId : null; + switch (notification.method) { + case "thread/started": + return notification.params.thread.id; + default: { + const params = notification.params; + if (params === null || typeof params !== "object") { + return null; + } + const threadId = (params as { threadId?: unknown }).threadId; + return typeof threadId === "string" ? threadId : null; + } + } + } + + private getCompletedTurn(threadId: string, turnId: string): TurnCompletedNotification | null { + return this.completedTurnsByThread.get(threadId)?.get(turnId) ?? null; + } + + private rememberCompletedTurn(event: TurnCompletedNotification): void { + const threadTurns = this.completedTurnsByThread.get(event.threadId) ?? new Map(); + threadTurns.set(event.turn.id, event); + while (threadTurns.size > MAX_COMPLETED_TURNS_PER_THREAD) { + const oldestTurnId = threadTurns.keys().next().value; + if (oldestTurnId === undefined) { + break; + } + threadTurns.delete(oldestTurnId); + } + this.completedTurnsByThread.set(event.threadId, threadTurns); } private resolveMcpServerStartupResolvers(): void { @@ -420,6 +461,7 @@ type McpServerStartupResolver = { type TurnCompletedResolver = { turnId: string; resolve: (event: TurnCompletedNotification) => void; + reject: (error: Error) => void; }; diff --git a/src/CodexCommands.ts b/src/CodexCommands.ts index 51d49136..1ab36b1a 100644 --- a/src/CodexCommands.ts +++ b/src/CodexCommands.ts @@ -52,6 +52,9 @@ export class CodexCommands { async tryHandle(prompt: acp.ContentBlock[], sessionState: SessionState): Promise { const command = this.parseCommand(prompt); if (command) { + if (!this.isSessionActive(sessionState.sessionId)) { + return true; + } return this.handleCommand(command, sessionState); } return false; @@ -129,6 +132,9 @@ export class CodexCommands { async handleCommand(command: ParsedCommand, sessionState: SessionState): Promise { const sessionId = sessionState.sessionId; + if (!this.isSessionActive(sessionId)) { + return true; + } switch (command.name) { case "status": { @@ -140,7 +146,13 @@ export class CodexCommands { return true; } case "logout": { + if (!this.isSessionActive(sessionId)) { + return true; + } await this.runWithProcessCheck(() => this.codexAcpClient.logout()); + if (!this.isSessionActive(sessionId)) { + return true; + } await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: "Logged out from Codex account." } @@ -149,6 +161,9 @@ export class CodexCommands { } case "skills": { const response = await this.runWithProcessCheck(() => this.codexAcpClient.listSkills()); + if (!this.isSessionActive(sessionId)) { + return true; + } const skills = (response?.data ?? []).flatMap(entry => entry.skills); const lines = skills.map(skill => { const description = skill.shortDescription ?? skill.description ?? ""; @@ -165,6 +180,9 @@ export class CodexCommands { } case "mcp": { const servers = await this.runWithProcessCheck(() => this.codexAcpClient.listMcpServers()); + if (!this.isSessionActive(sessionId)) { + return true; + } const configuredServers = servers.data.map(server => { const toolCount = Object.keys(server.tools ?? {}).length; const resourceCount = (server.resources ?? []).length; @@ -205,13 +223,17 @@ export class CodexCommands { } private async updateSession(sessionId: string, update: UpdateSessionEvent): Promise { - if (this.isSessionClosing(sessionId) || !this.hasTrackedSession(sessionId)) { + if (!this.isSessionActive(sessionId)) { return; } const session = new ACPSessionConnection(this.connection, sessionId); await session.update(update); } + private isSessionActive(sessionId: string): boolean { + return !this.isSessionClosing(sessionId) && this.hasTrackedSession(sessionId); + } + private buildStatusMessage(sessionState: SessionState): string { const agentMode = sessionState.agentMode; const accountText = this.formatAccountInfo(sessionState.account); diff --git a/src/CodexEventHandler.ts b/src/CodexEventHandler.ts index d1adbc71..f91b339c 100644 --- a/src/CodexEventHandler.ts +++ b/src/CodexEventHandler.ts @@ -83,10 +83,8 @@ export class CodexEventHandler { case "error": return await this.createErrorEvent(notification.params); case "turn/started": - this.sessionState.currentTurnId = notification.params.turn.id; return null; case "turn/completed": - this.sessionState.currentTurnId = null; return null; case "thread/tokenUsage/updated": return this.createUsageUpdate(notification.params); diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index 7644c92f..e6df9125 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -303,10 +303,13 @@ describe('ACP server test', { timeout: 40_000 }, () => { turn: { id: "turn-id", items: [], status: "completed", error: null } } as any); - vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(createTestSessionState({ + const sessionState = createTestSessionState({ sessionId: "session-id", cwd: "/workspace" - })); + }); + vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); const promptRequest: acp.PromptRequest = { sessionId: "session-id", @@ -362,6 +365,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { agentMode: AgentMode.DEFAULT_AGENT_MODE }); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); await codexAcpAgent.prompt({ sessionId: "id", prompt: [{type: "text", text: ""}] }); @@ -387,6 +392,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { agentMode: AgentMode.DEFAULT_AGENT_MODE }); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); // First prompt - registers first notification handler await codexAcpAgent.prompt({ sessionId: "id", prompt: [{type: "text", text: "First message"}] }); @@ -441,6 +448,10 @@ describe('ACP server test', { timeout: 40_000 }, () => { vi.spyOn(codexAcpAgent, "getSessionState").mockImplementation((sessionId: string) => { return sessionId === "session-1" ? sessionState1 : sessionState2; }); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState1.sessionId, sessionState1); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState2.sessionId, sessionState2); // Start prompts for two different sessions await codexAcpAgent.prompt({ sessionId: "session-1", prompt: [{type: "text", text: "Message to session 1"}] }); @@ -478,6 +489,36 @@ describe('ACP server test', { timeout: 40_000 }, () => { ]); }); + it('routes thread/started notifications by nested thread id', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const session1Handler = vi.fn(); + const session2Handler = vi.fn(); + + codexAppServerClient.onServerNotification("session-1", session1Handler); + codexAppServerClient.onServerNotification("session-2", session2Handler); + + mockFixture.sendServerNotification({ + method: "thread/started", + params: { + thread: { + id: "session-1", + createdAt: "2026-01-01T00:00:00.000Z", + source: "appServer", + status: { type: "idle" }, + title: null, + cwd: "/workspace", + metadata: {}, + conversationId: null, + userInstructions: null, + }, + }, + } as unknown as ServerNotification); + + expect(session1Handler).toHaveBeenCalledTimes(1); + expect(session2Handler).not.toHaveBeenCalled(); + }); + it('should send attachments as prompt items', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -514,6 +555,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { const sessionState: SessionState = createTestSessionState(); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); const prompt: acp.ContentBlock[] = [ { type: "text", text: "Hello" }, @@ -683,6 +726,25 @@ describe('ACP server test', { timeout: 40_000 }, () => { await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/command-logout.json"); }); + it('does not run logout command side effects for a closing session', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const sessionState: SessionState = createTestSessionState(); + const logoutSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "logout").mockResolvedValue({}); + + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + // @ts-expect-error simulating close already in progress + codexAcpAgent.closingSessions.add(sessionState.sessionId); + + // @ts-expect-error - exercising private helper + const handled = await codexAcpAgent.availableCommands.handleCommand({ name: "logout", input: null }, sessionState); + + expect(handled).toBe(true); + expect(logoutSpy).not.toHaveBeenCalled(); + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + }); + it('handles skills command', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -804,7 +866,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { */ function setupPromptFixture(sessionOverrides?: Partial) { const mockFixture = createCodexMockTestFixture(); - const sessionState = createTestSessionState(sessionOverrides); + const sessionState = createTestSessionState({ sessionId: "id", ...sessionOverrides }); const turnStartSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "turnStart").mockResolvedValue({ turn: { id: "turn-id", @@ -941,7 +1003,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { } }); - const { mockFixture } = setupPromptFixture({ rateLimits }); + const { mockFixture } = setupPromptFixture({ sessionId: "session-id", rateLimits }); await mockFixture.getCodexAcpAgent().prompt({ sessionId: "session-id", prompt: [{ type: "text", text: "/status" }] }); await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/command-status-with-rate-limits.json"); @@ -1079,20 +1141,23 @@ describe('ACP server test', { timeout: 40_000 }, () => { codexAppServerClient.turnCompletedResolvers.set("session-close", [{ turnId: "other-turn", resolve: vi.fn(), + reject: vi.fn(), }]); // @ts-expect-error verifying test-only access to private maps - codexAppServerClient.lastTurnCompletedByThread.set("session-close", { - threadId: "session-close", - turn: { - id: "completed-turn", - items: [], - status: "completed", - error: null, - startedAt: null, - completedAt: null, - durationMs: null, - }, - }); + codexAppServerClient.completedTurnsByThread.set("session-close", new Map([ + ["completed-turn", { + threadId: "session-close", + turn: { + id: "completed-turn", + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }] + ])); await codexAcpClient.closeSession("session-close", "turn-close"); @@ -1120,7 +1185,67 @@ describe('ACP server test', { timeout: 40_000 }, () => { // @ts-expect-error verifying test-only access to private maps expect(codexAppServerClient.turnCompletedResolvers.has("session-close")).toBe(false); // @ts-expect-error verifying test-only access to private maps - expect(codexAppServerClient.lastTurnCompletedByThread.has("session-close")).toBe(false); + expect(codexAppServerClient.completedTurnsByThread.has("session-close")).toBe(false); + }); + + it('times out waiting for turn completion during close and still removes local listeners', async () => { + vi.useFakeTimers(); + try { + const mockFixture = createCodexMockTestFixture(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + vi.spyOn(codexAppServerClient, "awaitTurnCompleted").mockReturnValue(new Promise(() => {})); + + await codexAcpClient.subscribeToSessionEvents( + "session-close", + vi.fn(), + { + handleCommandExecution: vi.fn(), + handleFileChange: vi.fn(), + }, + { + handleElicitation: vi.fn(), + } + ); + + const closePromise = codexAcpClient.closeSession("session-close", "turn-close"); + await vi.advanceTimersByTimeAsync(10_000); + + await expect(closePromise).resolves.toBeUndefined(); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.notificationHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.turnCompletedResolvers.has("session-close")).toBe(false); + } finally { + vi.useRealTimers(); + } + }); + + it('removes local listeners when thread unsubscribe fails during close', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + vi.spyOn(codexAppServerClient, "threadUnsubscribe").mockRejectedValue(new Error("unsubscribe failed")); + + await codexAcpClient.subscribeToSessionEvents( + "session-close", + vi.fn(), + { + handleCommandExecution: vi.fn(), + handleFileChange: vi.fn(), + }, + { + handleElicitation: vi.fn(), + } + ); + + await expect(codexAcpClient.closeSession("session-close", null)).resolves.toBeUndefined(); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.notificationHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.approvalHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(false); }); it('waits for the matching turn completion before resolving a prompt', async () => { @@ -1145,6 +1270,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { }, }); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); let promptSettled = false; const promptPromise = codexAcpAgent.prompt({ @@ -1220,6 +1347,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { // @ts-expect-error seeding private session store for focused close-session test codexAcpAgent.pendingMcpStartupSessions.set(sessionState.sessionId, { requestedServers: new Set(["alpha"]), + afterVersion: 0, }); await expect( @@ -1253,7 +1381,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(closeSpy).toHaveBeenCalledWith(sessionState.sessionId, "turn-pending"); }); - it('rejects a prompt if the session closes before turn start begins', async () => { + it('cancels a prompt if the session closes before turn start begins', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); const codexAcpClient = mockFixture.getCodexAcpClient(); @@ -1288,10 +1416,62 @@ describe('ACP server test', { timeout: 40_000 }, () => { resolveTryHandle(false); - await expect(promptPromise).rejects.toThrow("Invalid request"); + await expect(promptPromise).resolves.toEqual({ + stopReason: "cancelled", + usage: null, + _meta: { + quota: { + token_count: null, + model_usage: [], + }, + }, + }); expect(startPromptSpy).not.toHaveBeenCalled(); }); + it('cancels a prompt when pending turn start rejects during close', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let rejectTurnStart!: (error: Error) => void; + const turnStartPromise = new Promise((_, reject) => { + rejectTurnStart = reject; + }); + const startPromptSpy = vi.spyOn(codexAcpClient, "startPrompt").mockReturnValue(turnStartPromise); + vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + const promptPromise = codexAcpAgent.prompt({ + sessionId: sessionState.sessionId, + prompt: [{ type: "text", text: "normal prompt" }], + }); + + await vi.waitFor(() => { + expect(startPromptSpy).toHaveBeenCalled(); + }); + + const closePromise = codexAcpAgent.closeSession({sessionId: sessionState.sessionId}); + rejectTurnStart(new Error("turn/start failed")); + + await expect(closePromise).resolves.toEqual({}); + await expect(promptPromise).resolves.toEqual({ + stopReason: "cancelled", + usage: null, + _meta: { + quota: { + token_count: null, + model_usage: [], + }, + }, + }); + }); + it('skips late MCP startup updates after a session is closed', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -1311,10 +1491,11 @@ describe('ACP server test', { timeout: 40_000 }, () => { // @ts-expect-error seeding private session store for focused close-session test codexAcpAgent.pendingMcpStartupSessions.set(sessionState.sessionId, { requestedServers: new Set(["alpha"]), + afterVersion: 0, }); // @ts-expect-error exercising private helper to verify close-session race handling - const publishPromise = codexAcpAgent.doPublishMcpStartupStatus(sessionState.sessionId, 1); + const publishPromise = codexAcpAgent.doPublishMcpStartupStatus(sessionState.sessionId); // @ts-expect-error exercising private helper to simulate close-session cleanup codexAcpAgent.forgetSession(sessionState.sessionId); diff --git a/src/__tests__/CodexACPAgent/approval-events.test.ts b/src/__tests__/CodexACPAgent/approval-events.test.ts index 7ee43746..db135dd9 100644 --- a/src/__tests__/CodexACPAgent/approval-events.test.ts +++ b/src/__tests__/CodexACPAgent/approval-events.test.ts @@ -32,6 +32,8 @@ describe('Approval Events', () => { agentMode: AgentMode.DEFAULT_AGENT_MODE }); vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); const promptPromise = codexAcpAgent.prompt({ sessionId, diff --git a/src/__tests__/CodexACPAgent/command-action-events.test.ts b/src/__tests__/CodexACPAgent/command-action-events.test.ts index b6aa0f7a..bb92314d 100644 --- a/src/__tests__/CodexACPAgent/command-action-events.test.ts +++ b/src/__tests__/CodexACPAgent/command-action-events.test.ts @@ -23,7 +23,7 @@ describe('CodexEventHandler - command action events', () => { const listFilesNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -58,7 +58,7 @@ describe('CodexEventHandler - command action events', () => { const listFilesNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -93,7 +93,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -129,7 +129,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -165,7 +165,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -201,7 +201,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -237,7 +237,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -265,7 +265,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -283,7 +283,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/mcpToolCall/progress', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'call-id', message: "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened", @@ -292,7 +292,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -324,7 +324,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -342,7 +342,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/mcpToolCall/progress', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'call-id', message: repeatedMessage, @@ -351,7 +351,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/mcpToolCall/progress', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'call-id', message: repeatedMessage, @@ -360,7 +360,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -390,7 +390,7 @@ describe('CodexEventHandler - command action events', () => { const dynamicToolNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "dynamicToolCall", diff --git a/src/__tests__/CodexACPAgent/elicitation-events.test.ts b/src/__tests__/CodexACPAgent/elicitation-events.test.ts index 32548bee..dd486bbf 100644 --- a/src/__tests__/CodexACPAgent/elicitation-events.test.ts +++ b/src/__tests__/CodexACPAgent/elicitation-events.test.ts @@ -33,6 +33,8 @@ describe('Elicitation Events', () => { agentMode: AgentMode.DEFAULT_AGENT_MODE }); vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); const promptPromise = codexAcpAgent.prompt({ sessionId, diff --git a/src/__tests__/CodexACPAgent/file-change-events.test.ts b/src/__tests__/CodexACPAgent/file-change-events.test.ts index b25094f2..ce00135a 100644 --- a/src/__tests__/CodexACPAgent/file-change-events.test.ts +++ b/src/__tests__/CodexACPAgent/file-change-events.test.ts @@ -44,7 +44,7 @@ describe('CodexEventHandler - file change events', () => { const newFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -79,7 +79,7 @@ describe('CodexEventHandler - file change events', () => { const multiFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -119,7 +119,7 @@ describe('CodexEventHandler - file change events', () => { const newFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -147,7 +147,7 @@ describe('CodexEventHandler - file change events', () => { const deleteFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -183,7 +183,7 @@ describe('CodexEventHandler - file change events', () => { const deletedFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -213,7 +213,7 @@ describe('CodexEventHandler - file change events', () => { const deleteFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -248,7 +248,7 @@ describe('CodexEventHandler - file change events', () => { const deletedFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', diff --git a/src/__tests__/CodexACPAgent/model-rerouted-events.test.ts b/src/__tests__/CodexACPAgent/model-rerouted-events.test.ts index a037364c..61c49400 100644 --- a/src/__tests__/CodexACPAgent/model-rerouted-events.test.ts +++ b/src/__tests__/CodexACPAgent/model-rerouted-events.test.ts @@ -23,7 +23,7 @@ describe("CodexEventHandler - model rerouted events", () => { const modelReroutedNotification: ServerNotification = { method: "model/rerouted", params: { - threadId: "thread-1", + threadId: sessionId, turnId: "turn-1", fromModel: "gpt-5", toModel: "gpt-5-mini", diff --git a/src/__tests__/CodexACPAgent/terminal-output-events.test.ts b/src/__tests__/CodexACPAgent/terminal-output-events.test.ts index e0e3e661..de5ff938 100644 --- a/src/__tests__/CodexACPAgent/terminal-output-events.test.ts +++ b/src/__tests__/CodexACPAgent/terminal-output-events.test.ts @@ -23,7 +23,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandStartNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -60,7 +60,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandStartNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -90,7 +90,7 @@ describe('CodexEventHandler - terminal output events', () => { const outputDeltaNotification: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-123', delta: 'file1.txt\nfile2.txt\n', @@ -108,7 +108,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandCompletedNotification: ServerNotification = { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -137,7 +137,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandFailedNotification: ServerNotification = { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -166,7 +166,7 @@ describe('CodexEventHandler - terminal output events', () => { const dynamicToolCompletedNotification: ServerNotification = { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'dynamicToolCall', @@ -193,7 +193,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandStartNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -214,7 +214,7 @@ describe('CodexEventHandler - terminal output events', () => { const outputDeltaNotification: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-flow', delta: 'hello\n', @@ -224,7 +224,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandCompletedNotification: ServerNotification = { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -257,7 +257,7 @@ describe('CodexEventHandler - terminal output events', () => { const delta1: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-accumulate', delta: 'line1\n', @@ -267,7 +267,7 @@ describe('CodexEventHandler - terminal output events', () => { const delta2: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-accumulate', delta: 'line2\n', @@ -277,7 +277,7 @@ describe('CodexEventHandler - terminal output events', () => { const delta3: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-accumulate', delta: 'line3\n', diff --git a/src/__tests__/CodexACPAgent/token-usage-events.test.ts b/src/__tests__/CodexACPAgent/token-usage-events.test.ts index 2025b2d2..75990c4e 100644 --- a/src/__tests__/CodexACPAgent/token-usage-events.test.ts +++ b/src/__tests__/CodexACPAgent/token-usage-events.test.ts @@ -49,7 +49,10 @@ describe('Token Usage Events', () => { }; }); - vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(createTestSessionState({ sessionId })); + const sessionState = createTestSessionState({ sessionId }); + vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); return codexAcpAgent; } @@ -179,7 +182,10 @@ describe('Token Usage Events', () => { }; }); - vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(createTestSessionState({ sessionId })); + const sessionState = createTestSessionState({ sessionId }); + vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); return async () => { await codexAcpAgent.prompt({ diff --git a/src/__tests__/acp-test-utils.ts b/src/__tests__/acp-test-utils.ts index 22562310..03a3ef24 100644 --- a/src/__tests__/acp-test-utils.ts +++ b/src/__tests__/acp-test-utils.ts @@ -334,6 +334,8 @@ export async function setupPromptAndSendNotifications( }); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for focused event-handler tests + codexAcpAgent.sessions.set(sessionId, sessionState); await codexAcpAgent.prompt({ sessionId, @@ -343,21 +345,7 @@ export async function setupPromptAndSendNotifications( fixture.clearAcpConnectionDump(); for (const notification of notifications) { - const routedNotification = (() => { - const params = notification.params; - if ("threadId" in params && typeof params.threadId !== "string") { - return notification; - } - return { - ...notification, - params: { - ...notification.params, - threadId: sessionId, - }, - } as ServerNotification; - })(); - - fixture.sendServerNotification(routedNotification); + fixture.sendServerNotification(notification); } await vi.waitFor(() => {