diff --git a/src/CodexAcpClient.ts b/src/CodexAcpClient.ts index 514e53b5..58b4fa5f 100644 --- a/src/CodexAcpClient.ts +++ b/src/CodexAcpClient.ts @@ -31,12 +31,15 @@ import type { SkillsListResponse, Thread, ThreadSourceKind, + ThreadUnsubscribeResponse, TurnCompletedNotification, UserInput, } from "./app-server/v2"; import packageJson from "../package.json"; import type {AuthenticationLogoutResponse, AuthenticationStatusResponse} from "./AcpExtensions"; +const CLOSE_TURN_COMPLETION_TIMEOUT_MS = 10_000; + /** * API for accessing the Codex App Server using ACP requests. * Converts ACP requests into corresponding app-server operations. @@ -375,18 +378,55 @@ export class CodexAcpClient { this.codexClient.onElicitationRequest(sessionId, elicitationHandler); } - async sendPrompt( + unsubscribeFromSessionEvents(sessionId: string): void { + this.codexClient.removeServerNotification(sessionId); + this.codexClient.removeApprovalRequest(sessionId); + this.codexClient.removeElicitationRequest(sessionId); + } + + async closeSession(sessionId: string, currentTurnId: string | null): Promise { + try { + if (currentTurnId && !this.codexClient.hasTurnCompleted(sessionId, currentTurnId)) { + try { + await this.codexClient.turnInterrupt({ + threadId: sessionId, + turnId: currentTurnId, + }); + await this.waitForTurnCompletionDuringClose(sessionId, currentTurnId); + } catch (err) { + logger.error(`Failed to interrupt active turn while closing session ${sessionId}`, err); + } + } + + try { + const response = await this.codexClient.threadUnsubscribe({threadId: sessionId}) as ThreadUnsubscribeResponse | undefined; + if (response?.status && response.status !== "unsubscribed") { + logger.log("Thread unsubscribe completed without an active subscription", { + sessionId, + status: response.status, + }); + } + } catch (err) { + logger.error(`Failed to unsubscribe thread while closing session ${sessionId}`, err); + } + } finally { + this.unsubscribeFromSessionEvents(sessionId); + this.codexClient.clearThreadState(sessionId); + } + } + + async startPrompt( request: acp.PromptRequest, agentMode: AgentMode, modelId: ModelId, disableSummary: boolean, cwd: string, - ): Promise { + ): Promise { const input = buildPromptItems(request.prompt); const effort = modelId.effort as ReasoningEffort | null; //TODO remove unsafe conversion await this.refreshSkills(cwd, request._meta); - await this.codexClient.turnStart({ + const response = await this.codexClient.turnStart({ outputSchema: null, threadId: request.sessionId, input: input, @@ -400,9 +440,34 @@ export class CodexAcpClient { model: modelId.model, }); - // Wait for turn completion + return response.turn.id; + } + + async awaitTurnCompleted(sessionId: string, turnId: string): Promise { // If turnInterrupt() was called, Codex will send turn/completed event with status "interrupted" - return await this.codexClient.awaitTurnCompleted(); + return await this.codexClient.awaitTurnCompleted(sessionId, turnId); + } + + private async waitForTurnCompletionDuringClose(sessionId: string, turnId: string): Promise { + let timeout: ReturnType | null = null; + try { + await Promise.race([ + this.codexClient.awaitTurnCompleted(sessionId, turnId), + new Promise((_, reject) => { + timeout = setTimeout(() => { + reject(new Error(`Timed out waiting for turn ${turnId} to complete during close`)); + }, CLOSE_TURN_COMPLETION_TIMEOUT_MS); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } + } + + hasTurnCompleted(sessionId: string, turnId: string): boolean { + return this.codexClient.hasTurnCompleted(sessionId, turnId); } async listSkills(params?: SkillsListParams): Promise { diff --git a/src/CodexAcpServer.ts b/src/CodexAcpServer.ts index 4159656a..14c84f57 100644 --- a/src/CodexAcpServer.ts +++ b/src/CodexAcpServer.ts @@ -44,7 +44,8 @@ export interface SessionState { supportedReasoningEfforts: Array, supportedInputModalities: Array, agentMode: AgentMode, - currentTurnId: string | null; + // Promise covers the turn/start request window before the turn id response arrives. + currentTurnId: Promise | string | null; lastTokenUsage: TokenCount | null; totalTokenUsage: TokenCount | null; modelContextWindow: number | null; @@ -68,6 +69,8 @@ export class CodexAcpServer implements acp.Agent { private readonly sessions: Map; private readonly pendingMcpStartupSessions: Map; + private readonly closingSessions: Set; + private readonly closeSessionPromises: Map>; constructor( connection: acp.AgentSideConnection, @@ -77,6 +80,8 @@ export class CodexAcpServer implements acp.Agent { ) { this.sessions = new Map(); this.pendingMcpStartupSessions = new Map(); + this.closingSessions = new Set(); + this.closeSessionPromises = new Map(); this.connection = connection; this.codexAcpClient = codexAcpClient; this.defaultAuthRequest = defaultAuthRequest ?? null; @@ -84,7 +89,9 @@ export class CodexAcpServer implements acp.Agent { this.availableCommands = new CodexCommands( connection, codexAcpClient, - (operation) => this.runWithProcessCheck(operation) + (operation) => this.runWithProcessCheck(operation), + (sessionId) => this.sessions.has(sessionId), + (sessionId) => this.closingSessions.has(sessionId), ); } @@ -101,6 +108,7 @@ export class CodexAcpServer implements acp.Agent { image: true }, sessionCapabilities: { + close: {}, resume: { }, list: { } }, @@ -238,6 +246,42 @@ export class CodexAcpServer implements acp.Agent { return await this.runWithProcessCheck(() => this.codexAcpClient.listSessions(params)); } + async closeSession(params: acp.CloseSessionRequest): Promise { + logger.log("Closing session...", {sessionId: params.sessionId}); + const existingClose = this.closeSessionPromises.get(params.sessionId); + if (existingClose) { + await existingClose; + logger.log("Session close already completed", {sessionId: params.sessionId}); + return {}; + } + + const closePromise = this.closeSessionInternal(params.sessionId); + this.closeSessionPromises.set(params.sessionId, closePromise); + try { + await closePromise; + } finally { + this.closeSessionPromises.delete(params.sessionId); + } + + logger.log("Session closed", {sessionId: params.sessionId}); + return {}; + } + + private async closeSessionInternal(sessionId: string): Promise { + logger.log("Closing session internals...", {sessionId}); + const sessionState = this.getSessionState(sessionId); + this.closingSessions.add(sessionId); + + try { + const activeTurnId = await this.resolveActiveTurnId(sessionState); + await this.runWithProcessCheck(() => + this.codexAcpClient.closeSession(sessionId, activeTurnId) + ); + } finally { + this.forgetSession(sessionId); + } + } + async newSession( params: acp.NewSessionRequest, ): Promise { @@ -279,6 +323,9 @@ export class CodexAcpServer implements acp.Agent { }); const sessionState = this.sessions.get(_params.sessionId); if (!sessionState) throw new Error(`Session ${_params.sessionId} not found`); + if (this.closingSessions.has(_params.sessionId)) { + throw RequestError.invalidRequest("Session is closing"); + } const newMode = AgentMode.find(_params.modeId); if (!newMode) { @@ -295,6 +342,9 @@ export class CodexAcpServer implements acp.Agent { }); const sessionState = this.sessions.get(params.sessionId); if (!sessionState) throw new Error(`Session ${params.sessionId} not found`); + if (this.closingSessions.has(params.sessionId)) { + throw RequestError.invalidRequest("Session is closing"); + } const requestedModelId= ModelId.fromString(params.modelId); const requestedModelName = requestedModelId.model; @@ -641,6 +691,34 @@ export class CodexAcpServer implements acp.Agent { return sessionState; } + private async resolveActiveTurnId(sessionState: SessionState): Promise { + const currentTurnId = sessionState.currentTurnId; + if (!currentTurnId) { + return null; + } + + if (typeof currentTurnId === "string") { + return currentTurnId; + } + + try { + return await currentTurnId; + } catch (err) { + logger.error(`Failed to resolve current turn for session ${sessionState.sessionId}`, err); + return null; + } + } + + private isSessionClosedOrClosing(sessionId: string): boolean { + return this.closingSessions.has(sessionId) || !this.sessions.has(sessionId); + } + + private forgetSession(sessionId: string): void { + this.sessions.delete(sessionId); + this.pendingMcpStartupSessions.delete(sessionId); + this.closingSessions.delete(sessionId); + } + private resolveSessionMcpServers( mcpServers: Array, recoverFromStartup: boolean, @@ -671,14 +749,26 @@ export class CodexAcpServer implements acp.Agent { return; } + const requestedServers = pendingStartup.requestedServers; + const afterVersion = pendingStartup.afterVersion; + try { const mcpStartup = await this.runWithProcessCheck(() => this.codexAcpClient.awaitMcpServerStartup( - Array.from(pendingStartup.requestedServers), - pendingStartup.afterVersion, + Array.from(requestedServers), + afterVersion, ) ); - await this.publishMcpStartupStatus(sessionId, mcpStartup, pendingStartup.requestedServers); + const sessionState = this.sessions.get(sessionId); + const currentPendingStartup = this.pendingMcpStartupSessions.get(sessionId); + if (sessionState && currentPendingStartup) { + sessionState.sessionMcpServers = mcpStartup.ready.filter(serverName => + currentPendingStartup.requestedServers.has(serverName) + ); + await this.publishMcpStartupStatus(sessionId, mcpStartup, currentPendingStartup.requestedServers); + } else { + logger.log("Skipping MCP startup status for closed session", {sessionId}); + } } catch (err) { logger.error(`Failed to publish MCP startup status for session ${sessionId}`, err); } finally { @@ -713,6 +803,9 @@ export class CodexAcpServer implements acp.Agent { prompt: params.prompt, }); const sessionState = this.getSessionState(params.sessionId); + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } sessionState.currentTurnId = null; sessionState.lastTokenUsage = null; @@ -722,14 +815,23 @@ export class CodexAcpServer implements acp.Agent { const elicitationHandler = new CodexElicitationHandler(this.connection, sessionState); await this.codexAcpClient.subscribeToSessionEvents(params.sessionId, (event) => { + if (this.closingSessions.has(params.sessionId)) { + return; + } elicitationHandler.handleNotification(event); return eventHandler.handleNotification(event); }, approvalHandler, elicitationHandler); + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } if (await this.availableCommands.tryHandle(params.prompt, sessionState)) { logger.log("Prompt handled by a command"); + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } return { stopReason: "end_turn", usage: this.buildPromptUsage(sessionState.lastTokenUsage), @@ -737,6 +839,10 @@ export class CodexAcpServer implements acp.Agent { }; } + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } + const modelId = ModelId.fromString(sessionState.currentModelId); const modelLacksReasoning = sessionState.supportedReasoningEfforts.length > 0 && sessionState.supportedReasoningEfforts.every(e => e.reasoningEffort === "none"); @@ -753,26 +859,44 @@ export class CodexAcpServer implements acp.Agent { throw RequestError.invalidRequest("The current model does not support image input"); } const agentMode = sessionState.agentMode; + const turnIdPromise = this.runWithProcessCheck( + () => this.codexAcpClient.startPrompt(params, agentMode, modelId, disableSummary, sessionState.cwd) + ); + turnIdPromise.catch(() => { + // The prompt path awaits this promise, but close also reads currentTurnId. + // Attach a handler immediately so future readers cannot create an unhandled rejection. + }); + sessionState.currentTurnId = turnIdPromise; + let turnId: string; + try { + turnId = await turnIdPromise; + } catch (err) { + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } + throw err; + } + sessionState.currentTurnId = turnId; + const turnCompleted = await this.runWithProcessCheck( - () => this.codexAcpClient.sendPrompt(params, agentMode, modelId, disableSummary, sessionState.cwd)); + () => this.codexAcpClient.awaitTurnCompleted(params.sessionId, turnId) + ); // Check if turn was interrupted (cancelled) if (turnCompleted.turn.status === "interrupted") { - await this.connection.sessionUpdate({ - sessionId: params.sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: "*Conversation interrupted*" + if (!this.closingSessions.has(params.sessionId)) { + await this.connection.sessionUpdate({ + sessionId: params.sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "*Conversation interrupted*" + } } - } - }); - return { - stopReason: "cancelled", - usage: this.buildPromptUsage(sessionState.lastTokenUsage), - _meta: this.buildQuotaMeta(sessionState), - }; + }); + } + return this.buildCancelledPromptResponse(sessionState); } const error = eventHandler.getFailure() @@ -787,6 +911,9 @@ export class CodexAcpServer implements acp.Agent { _meta: this.buildQuotaMeta(sessionState), }; } catch (err) { + if (this.isSessionClosedOrClosing(params.sessionId)) { + return this.buildCancelledPromptResponse(sessionState); + } logger.error(`Prompt for session ${params.sessionId} failed`, err); throw err; } finally { @@ -821,6 +948,14 @@ export class CodexAcpServer implements acp.Agent { return toPromptUsage(lastTokenUsage); } + private buildCancelledPromptResponse(sessionState: SessionState): acp.PromptResponse { + return { + stopReason: "cancelled", + usage: this.buildPromptUsage(sessionState.lastTokenUsage), + _meta: this.buildQuotaMeta(sessionState), + }; + } + private async runWithProcessCheck(operation: () => Promise): Promise { try { return await operation(); @@ -844,24 +979,25 @@ export class CodexAcpServer implements acp.Agent { return; } - if (!sessionState.currentTurnId) { + const currentTurnId = await this.resolveActiveTurnId(sessionState); + if (!currentTurnId) { logger.log("Cancel request rejected: no current turn", {sessionId: params.sessionId}); return; } logger.log("Cancel session requested", { sessionId: params.sessionId, - currentTurnId: sessionState.currentTurnId + currentTurnId, }); try { // After turnInterrupt(), Codex will send turn/completed event, which will naturally complete awaitTurnCompleted() await this.codexAcpClient.turnInterrupt({ threadId: params.sessionId, - turnId: sessionState.currentTurnId + turnId: currentTurnId }); logger.log("Cancel - turnInterrupt succeeded", { sessionId: params.sessionId, - currentTurnId: sessionState.currentTurnId + currentTurnId, }); } catch (err) { logger.error(`Cancel - turnInterrupt failed`, err); diff --git a/src/CodexAppServerClient.ts b/src/CodexAppServerClient.ts index afd333ff..68ce44c9 100644 --- a/src/CodexAppServerClient.ts +++ b/src/CodexAppServerClient.ts @@ -5,6 +5,7 @@ import type { InitializeResponse, ServerNotification } from "./app-server"; +import {logger} from "./Logger"; import type { AccountLoginCompletedNotification, AccountUpdatedNotification, ConfigReadParams, @@ -34,6 +35,8 @@ import type { ThreadResumeResponse, ThreadStartParams, ThreadStartResponse, + ThreadUnsubscribeParams, + ThreadUnsubscribeResponse, TurnCompletedNotification, TurnInterruptParams, TurnInterruptResponse, @@ -83,6 +86,8 @@ const McpServerElicitationRequest = new RequestType< void >('mcpServer/elicitation/request'); +const MAX_COMPLETED_TURNS_PER_THREAD = 20; + /** * A type-safe client over the Codex App Server's JSON-RPC API. * Maps each request to its expected response and exposes clear, typed methods for supported JSON-RPC operations. @@ -94,6 +99,8 @@ export class CodexAppServerClient { private mcpServerStartupVersion = 0; private readonly mcpServerStartupStates = new Map(); private readonly mcpServerStartupResolvers: Array = []; + private readonly turnCompletedResolvers = new Map>(); + private readonly completedTurnsByThread = new Map>(); constructor(connection: MessageConnection) { this.connection = connection; @@ -108,6 +115,10 @@ export class CodexAppServerClient { }); this.resolveMcpServerStartupResolvers(); } + if (isTurnCompletedNotification(serverNotification)) { + this.rememberCompletedTurn(serverNotification.params); + this.resolveTurnCompleted(serverNotification.params); + } this.notify(serverNotification); for (const callback of this.codexEventHandlers) { callback({ eventType: "notification", ...serverNotification }); @@ -143,10 +154,18 @@ export class CodexAppServerClient { this.approvalHandlers.set(threadId, handler); } + removeApprovalRequest(threadId: string): void { + this.approvalHandlers.delete(threadId); + } + onElicitationRequest(threadId: string, handler: ElicitationHandler): void { this.elicitationHandlers.set(threadId, handler); } + removeElicitationRequest(threadId: string): void { + this.elicitationHandlers.delete(threadId); + } + async initialize(params: InitializeParams): Promise { return await this.sendRequest({ method: "initialize", params: params }); } @@ -179,6 +198,10 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "thread/read", params: params }); } + async threadUnsubscribe(params: ThreadUnsubscribeParams): Promise { + return await this.sendRequest({ method: "thread/unsubscribe", params }); + } + async listMcpServerStatus(params: ListMcpServerStatusParams): Promise { return await this.sendRequest({ method: "mcpServerStatus/list", params }); } @@ -223,15 +246,23 @@ export class CodexAppServerClient { return await this.sendRequest({ method: "account/read", params: params }); } - //TODO create type-safe helper - async awaitTurnCompleted(): Promise { - return await new Promise((resolve) => { - this.connection.onNotification("turn/completed", (event: TurnCompletedNotification) => { - resolve(event); - }); + async awaitTurnCompleted(threadId: string, turnId: string): Promise { + const completedTurn = this.getCompletedTurn(threadId, turnId); + if (completedTurn) { + return completedTurn; + } + + return await new Promise((resolve, reject) => { + const resolvers = this.turnCompletedResolvers.get(threadId) ?? []; + resolvers.push({turnId, resolve, reject}); + this.turnCompletedResolvers.set(threadId, resolvers); }); } + hasTurnCompleted(threadId: string, turnId: string): boolean { + return this.getCompletedTurn(threadId, turnId) !== null; + } + async listModels(params: ModelListParams = {cursor: null, limit: null}): Promise { return await this.sendRequest({ method: "model/list", params }); } @@ -248,6 +279,19 @@ export class CodexAppServerClient { this.notificationHandlers.set(sessionId, callback); } + removeServerNotification(sessionId: string): void { + this.notificationHandlers.delete(sessionId); + } + + clearThreadState(threadId: string): void { + const resolvers = this.turnCompletedResolvers.get(threadId) ?? []; + for (const resolver of resolvers) { + resolver.reject(new Error(`Stopped waiting for turn completion after thread ${threadId} was cleared`)); + } + this.turnCompletedResolvers.delete(threadId); + this.completedTurnsByThread.delete(threadId); + } + private codexEventHandlers: Array<(event: CodexConnectionEvent) => void> = []; onClientTransportEvent(callback: (event: CodexConnectionEvent) => void){ this.codexEventHandlers.push(callback); @@ -255,11 +299,57 @@ export class CodexAppServerClient { private notificationHandlers = new Map void>(); private notify(notification: ServerNotification) { + const threadId = this.getThreadId(notification); + if (threadId) { + const notificationHandler = this.notificationHandlers.get(threadId); + if (notificationHandler) { + notificationHandler(notification); + } else { + logger.log("Dropping scoped notification for unregistered thread", { + method: notification.method, + threadId, + }); + } + return; + } + for (const notificationHandler of this.notificationHandlers.values()) { notificationHandler(notification); } } + private getThreadId(notification: ServerNotification): string | null { + switch (notification.method) { + case "thread/started": + return notification.params.thread.id; + default: { + const params = notification.params; + if (params === null || typeof params !== "object") { + return null; + } + const threadId = (params as { threadId?: unknown }).threadId; + return typeof threadId === "string" ? threadId : null; + } + } + } + + private getCompletedTurn(threadId: string, turnId: string): TurnCompletedNotification | null { + return this.completedTurnsByThread.get(threadId)?.get(turnId) ?? null; + } + + private rememberCompletedTurn(event: TurnCompletedNotification): void { + const threadTurns = this.completedTurnsByThread.get(event.threadId) ?? new Map(); + threadTurns.set(event.turn.id, event); + while (threadTurns.size > MAX_COMPLETED_TURNS_PER_THREAD) { + const oldestTurnId = threadTurns.keys().next().value; + if (oldestTurnId === undefined) { + break; + } + threadTurns.delete(oldestTurnId); + } + this.completedTurnsByThread.set(event.threadId, threadTurns); + } + private resolveMcpServerStartupResolvers(): void { const pendingResolvers: Array = []; for (const resolver of this.mcpServerStartupResolvers) { @@ -305,6 +395,28 @@ export class CodexAppServerClient { return { ready, failed, cancelled }; } + private resolveTurnCompleted(event: TurnCompletedNotification): void { + const resolvers = this.turnCompletedResolvers.get(event.threadId); + if (!resolvers) { + return; + } + + const pendingResolvers: Array = []; + for (const resolver of resolvers) { + if (resolver.turnId === event.turn.id) { + resolver.resolve(event); + } else { + pendingResolvers.push(resolver); + } + } + + if (pendingResolvers.length === 0) { + this.turnCompletedResolvers.delete(event.threadId); + } else { + this.turnCompletedResolvers.set(event.threadId, pendingResolvers); + } + } + private async sendRequest(request: CodexRequest): Promise { for (const callback of this.codexEventHandlers) { callback({ eventType: "request", ...request}); @@ -346,9 +458,23 @@ type McpServerStartupResolver = { resolve: (result: McpStartupResult) => void; }; +type TurnCompletedResolver = { + turnId: string; + resolve: (event: TurnCompletedNotification) => void; + reject: (error: Error) => void; +}; + + function isMcpServerStatusUpdatedNotification(notification: ServerNotification): notification is { method: "mcpServer/startupStatus/updated"; params: McpServerStatusUpdatedNotification; } { return notification.method === "mcpServer/startupStatus/updated"; } + +function isTurnCompletedNotification(data: ServerNotification): data is { + method: "turn/completed"; + params: TurnCompletedNotification; +} { + return data.method === "turn/completed"; +} diff --git a/src/CodexCommands.ts b/src/CodexCommands.ts index c81e6429..1ab36b1a 100644 --- a/src/CodexCommands.ts +++ b/src/CodexCommands.ts @@ -1,6 +1,6 @@ import type * as acp from "@agentclientprotocol/sdk"; import type {AgentSideConnection, AvailableCommand} from "@agentclientprotocol/sdk"; -import {ACPSessionConnection} from "./ACPSessionConnection"; +import {ACPSessionConnection, type UpdateSessionEvent} from "./ACPSessionConnection"; import type {CodexAcpClient} from "./CodexAcpClient"; import type {RateLimitSnapshot, SkillsListEntry} from "./app-server/v2"; import type {SessionState} from "./CodexAcpServer"; @@ -12,27 +12,35 @@ export class CodexCommands { private readonly connection: AgentSideConnection; private readonly codexAcpClient: CodexAcpClient; private readonly runWithProcessCheck: (operation: () => Promise) => Promise; + private readonly hasTrackedSession: (sessionId: string) => boolean; + private readonly isSessionClosing: (sessionId: string) => boolean; constructor( connection: AgentSideConnection, codexAcpClient: CodexAcpClient, - runWithProcessCheck: (operation: () => Promise) => Promise + runWithProcessCheck: (operation: () => Promise) => Promise, + hasTrackedSession: (sessionId: string) => boolean, + isSessionClosing: (sessionId: string) => boolean, ) { this.connection = connection; this.codexAcpClient = codexAcpClient; this.runWithProcessCheck = runWithProcessCheck; + this.hasTrackedSession = hasTrackedSession; + this.isSessionClosing = isSessionClosing; } async publish(sessionId: string): Promise { try { + if (!this.hasTrackedSession(sessionId) || this.isSessionClosing(sessionId)) { + return; + } const skillsResponse = await this.runWithProcessCheck(() => this.codexAcpClient.listSkills()); const availableCommands = this.buildAvailableCommands(skillsResponse?.data ?? []); - if (availableCommands.length === 0) { + if (availableCommands.length === 0 || !this.hasTrackedSession(sessionId) || this.isSessionClosing(sessionId)) { return; } - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "available_commands_update", availableCommands }); @@ -44,6 +52,9 @@ export class CodexCommands { async tryHandle(prompt: acp.ContentBlock[], sessionState: SessionState): Promise { const command = this.parseCommand(prompt); if (command) { + if (!this.isSessionActive(sessionState.sessionId)) { + return true; + } return this.handleCommand(command, sessionState); } return false; @@ -121,21 +132,28 @@ export class CodexCommands { async handleCommand(command: ParsedCommand, sessionState: SessionState): Promise { const sessionId = sessionState.sessionId; + if (!this.isSessionActive(sessionId)) { + return true; + } switch (command.name) { case "status": { - const session = new ACPSessionConnection(this.connection, sessionId); const message = this.buildStatusMessage(sessionState); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: message } }); return true; } case "logout": { + if (!this.isSessionActive(sessionId)) { + return true; + } await this.runWithProcessCheck(() => this.codexAcpClient.logout()); - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + if (!this.isSessionActive(sessionId)) { + return true; + } + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: "Logged out from Codex account." } }); @@ -143,6 +161,9 @@ export class CodexCommands { } case "skills": { const response = await this.runWithProcessCheck(() => this.codexAcpClient.listSkills()); + if (!this.isSessionActive(sessionId)) { + return true; + } const skills = (response?.data ?? []).flatMap(entry => entry.skills); const lines = skills.map(skill => { const description = skill.shortDescription ?? skill.description ?? ""; @@ -151,8 +172,7 @@ export class CodexCommands { const text = lines.length > 0 ? ["Available skills:", ...lines].join("\n") : "No skills configured."; - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text } }); @@ -160,6 +180,9 @@ export class CodexCommands { } case "mcp": { const servers = await this.runWithProcessCheck(() => this.codexAcpClient.listMcpServers()); + if (!this.isSessionActive(sessionId)) { + return true; + } const configuredServers = servers.data.map(server => { const toolCount = Object.keys(server.tools ?? {}).length; const resourceCount = (server.resources ?? []).length; @@ -172,8 +195,7 @@ export class CodexCommands { const text = lines.length > 0 ? ["Configured MCP servers:", ...lines].join("\n") : "No MCP servers configured."; - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text } }); @@ -194,13 +216,24 @@ export class CodexCommands { if (lines.length > 0) { text.push(...lines); } - const session = new ACPSessionConnection(this.connection, sessionId); - await session.update({ + await this.updateSession(sessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: text.join("\n") } }); } + private async updateSession(sessionId: string, update: UpdateSessionEvent): Promise { + if (!this.isSessionActive(sessionId)) { + return; + } + const session = new ACPSessionConnection(this.connection, sessionId); + await session.update(update); + } + + private isSessionActive(sessionId: string): boolean { + return !this.isSessionClosing(sessionId) && this.hasTrackedSession(sessionId); + } + private buildStatusMessage(sessionState: SessionState): string { const agentMode = sessionState.agentMode; const accountText = this.formatAccountInfo(sessionState.account); diff --git a/src/CodexEventHandler.ts b/src/CodexEventHandler.ts index d1adbc71..f91b339c 100644 --- a/src/CodexEventHandler.ts +++ b/src/CodexEventHandler.ts @@ -83,10 +83,8 @@ export class CodexEventHandler { case "error": return await this.createErrorEvent(notification.params); case "turn/started": - this.sessionState.currentTurnId = notification.params.turn.id; return null; case "turn/completed": - this.sessionState.currentTurnId = null; return null; case "thread/tokenUsage/updated": return this.createUsageUpdate(notification.params); diff --git a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts index 826165ca..e6df9125 100644 --- a/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts +++ b/src/__tests__/CodexACPAgent/CodexAcpClient.test.ts @@ -303,10 +303,13 @@ describe('ACP server test', { timeout: 40_000 }, () => { turn: { id: "turn-id", items: [], status: "completed", error: null } } as any); - vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(createTestSessionState({ + const sessionState = createTestSessionState({ sessionId: "session-id", cwd: "/workspace" - })); + }); + vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); const promptRequest: acp.PromptRequest = { sessionId: "session-id", @@ -331,9 +334,9 @@ describe('ACP server test', { timeout: 40_000 }, () => { function loadNotifications(){ //TODO collect logs form dev run and then load them from file to speedup const serverNotifications: ServerNotification[] = [ - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "He", }}, - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "ll", }}, - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "o!", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "He", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "ll", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "o!", }}, ]; function onServerNotification(_sessionId: string, callback: (event: ServerNotification) => void){ for (const notification of serverNotifications) { @@ -362,6 +365,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { agentMode: AgentMode.DEFAULT_AGENT_MODE }); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); await codexAcpAgent.prompt({ sessionId: "id", prompt: [{type: "text", text: ""}] }); @@ -387,6 +392,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { agentMode: AgentMode.DEFAULT_AGENT_MODE }); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); // First prompt - registers first notification handler await codexAcpAgent.prompt({ sessionId: "id", prompt: [{type: "text", text: "First message"}] }); @@ -398,9 +405,9 @@ describe('ACP server test', { timeout: 40_000 }, () => { // Trigger notifications after both prompts - should produce only 3 events, not 6 const serverNotifications: ServerNotification[] = [ - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "He", }}, - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "ll", }}, - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "o!", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "He", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "ll", }}, + { method: "item/agentMessage/delta", params: { threadId: "id", turnId: "string", itemId: "string", delta: "o!", }}, ]; for (const notification of serverNotifications) { mockFixture.sendServerNotification(notification); @@ -415,7 +422,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/follow-up-no-duplicates.json"); }); - it('should handle multiple sessions independently', async () => { + it('should route thread-scoped notifications to the matching session only', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -441,6 +448,10 @@ describe('ACP server test', { timeout: 40_000 }, () => { vi.spyOn(codexAcpAgent, "getSessionState").mockImplementation((sessionId: string) => { return sessionId === "session-1" ? sessionState1 : sessionState2; }); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState1.sessionId, sessionState1); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState2.sessionId, sessionState2); // Start prompts for two different sessions await codexAcpAgent.prompt({ sessionId: "session-1", prompt: [{type: "text", text: "Message to session 1"}] }); @@ -448,9 +459,9 @@ describe('ACP server test', { timeout: 40_000 }, () => { mockFixture.clearAcpConnectionDump(); - // Trigger notifications - both session handlers should receive them + // Trigger notifications for only session-1 const serverNotifications: ServerNotification[] = [ - { method: "item/agentMessage/delta", params: { threadId: "string", turnId: "string", itemId: "string", delta: "Hello", }}, + { method: "item/agentMessage/delta", params: { threadId: "session-1", turnId: "string", itemId: "string", delta: "Hello", }}, ]; for (const notification of serverNotifications) { mockFixture.sendServerNotification(notification); @@ -462,8 +473,50 @@ describe('ACP server test', { timeout: 40_000 }, () => { expect(dump.length).toBeGreaterThan(0); }); - // Should have 2 events - one for each session's handler - await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/multiple-sessions.json"); + expect(mockFixture.getAcpConnectionEvents([])).toEqual([ + { + method: "sessionUpdate", + args: [ + { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Hello" }, + }, + }, + ], + }, + ]); + }); + + it('routes thread/started notifications by nested thread id', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const session1Handler = vi.fn(); + const session2Handler = vi.fn(); + + codexAppServerClient.onServerNotification("session-1", session1Handler); + codexAppServerClient.onServerNotification("session-2", session2Handler); + + mockFixture.sendServerNotification({ + method: "thread/started", + params: { + thread: { + id: "session-1", + createdAt: "2026-01-01T00:00:00.000Z", + source: "appServer", + status: { type: "idle" }, + title: null, + cwd: "/workspace", + metadata: {}, + conversationId: null, + userInstructions: null, + }, + }, + } as unknown as ServerNotification); + + expect(session1Handler).toHaveBeenCalledTimes(1); + expect(session2Handler).not.toHaveBeenCalled(); }); it('should send attachments as prompt items', async () => { @@ -471,6 +524,22 @@ describe('ACP server test', { timeout: 40_000 }, () => { const codexAcpAgent = mockFixture.getCodexAcpAgent(); const codexAppServerClient = mockFixture.getCodexAppServerClient(); + vi.spyOn(codexAppServerClient.connection, "sendRequest").mockImplementation((method: string) => { + if (method === "turn/start") { + return Promise.resolve({ + turn: { + id: "turn-id", + items: [], + status: "inProgress", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); + } + return Promise.resolve(undefined); + }); vi.spyOn(codexAppServerClient, "awaitTurnCompleted").mockResolvedValue({ threadId: "session-id", turn: { @@ -486,6 +555,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { const sessionState: SessionState = createTestSessionState(); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); const prompt: acp.ContentBlock[] = [ { type: "text", text: "Hello" }, @@ -496,7 +567,79 @@ describe('ACP server test', { timeout: 40_000 }, () => { await codexAcpAgent.prompt({ sessionId: "session-id", prompt }); - await expect(mockFixture.getCodexConnectionDump(ignoredFields)).toMatchFileSnapshot("data/send-attachments-turn-start.json"); + expect(mockFixture.getCodexConnectionEvents(ignoredFields)).toEqual([ + { + eventType: "request", + method: "skills/list", + params: { + cwds: ["/test/cwd"], + forceReload: true, + perCwdExtraUserRoots: [{ + cwd: "cwd", + extraUserRoots: [], + }], + }, + }, + { + eventType: "response", + }, + { + eventType: "request", + method: "turn/start", + params: { + outputSchema: null, + threadId: "threadId", + input: [ + { + type: "text", + text: "Hello", + text_elements: [], + }, + { + type: "image", + url: "https://example.com/image.png", + }, + { + type: "text", + text: "[@report.txt](file:///tmp/report.txt)", + text_elements: [], + }, + { + type: "text", + text: "[@notes.txt](file:///tmp/notes.txt)\n\nNotes body\n", + text_elements: [], + }, + ], + approvalPolicy: "on-request", + sandboxPolicy: { + type: "workspaceWrite", + writableRoots: [], + readOnlyAccess: "readOnlyAccess", + networkAccess: false, + excludeTmpdirEnvVar: false, + excludeSlashTmp: false, + }, + summary: null, + personality: null, + collaborationMode: null, + cwd: "cwd", + effort: "effort", + model: "model", + }, + }, + { + eventType: "response", + turn: { + id: "id", + items: [], + status: "inProgress", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }, + ]); }); it('should fail on wrong sessionId', async () => { @@ -516,6 +659,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { const codexAcpAgent = mockFixture.getCodexAcpAgent(); vi.spyOn(mockFixture.getCodexAcpClient(), "listSkills").mockResolvedValue({ data: [] }); + // @ts-expect-error seeding private session store for focused publish test + codexAcpAgent.sessions.set("session-id", createTestSessionState({ sessionId: "session-id" })); // @ts-expect-error - exercising private helper await codexAcpAgent.availableCommands.publish("session-id"); @@ -541,6 +686,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { errors: [] }] }); + // @ts-expect-error seeding private session store for focused publish test + codexAcpAgent.sessions.set("session-id", createTestSessionState({ sessionId: "session-id" })); // @ts-expect-error - exercising private helper await codexAcpAgent.availableCommands.publish("session-id"); @@ -554,6 +701,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { const sessionState: SessionState = createTestSessionState(); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); await codexAcpAgent.prompt({ sessionId: "session-id", prompt: [{ type: "text", text: "/status" }] }); await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/command-status.json"); @@ -566,6 +715,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { const sessionState: SessionState = createTestSessionState(); const logoutSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "logout").mockResolvedValue({}); + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); // @ts-expect-error - exercising private helper const handled = await codexAcpAgent.availableCommands.handleCommand({ name: "logout", input: null }, sessionState); @@ -575,6 +726,25 @@ describe('ACP server test', { timeout: 40_000 }, () => { await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/command-logout.json"); }); + it('does not run logout command side effects for a closing session', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const sessionState: SessionState = createTestSessionState(); + const logoutSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "logout").mockResolvedValue({}); + + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + // @ts-expect-error simulating close already in progress + codexAcpAgent.closingSessions.add(sessionState.sessionId); + + // @ts-expect-error - exercising private helper + const handled = await codexAcpAgent.availableCommands.handleCommand({ name: "logout", input: null }, sessionState); + + expect(handled).toBe(true); + expect(logoutSpy).not.toHaveBeenCalled(); + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + }); + it('handles skills command', async () => { const mockFixture = createCodexMockTestFixture(); const codexAcpAgent = mockFixture.getCodexAcpAgent(); @@ -592,6 +762,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { }] }; const skillsSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "listSkills").mockResolvedValue(skillsResponse); + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); // @ts-expect-error - exercising private helper const handled = await codexAcpAgent.availableCommands.handleCommand({ name: "skills", input: null }, sessionState); @@ -627,6 +799,8 @@ describe('ACP server test', { timeout: 40_000 }, () => { nextCursor: null }; const mcpSpy = vi.spyOn(mockFixture.getCodexAcpClient(), "listMcpServers").mockResolvedValue(mcpResponse); + // @ts-expect-error seeding private session store for slash-command test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); // @ts-expect-error - exercising private helper const handled = await codexAcpAgent.availableCommands.handleCommand({ name: "mcp", input: null }, sessionState); @@ -692,7 +866,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { */ function setupPromptFixture(sessionOverrides?: Partial) { const mockFixture = createCodexMockTestFixture(); - const sessionState = createTestSessionState(sessionOverrides); + const sessionState = createTestSessionState({ sessionId: "id", ...sessionOverrides }); const turnStartSpy = vi.spyOn(mockFixture.getCodexAppServerClient(), "turnStart").mockResolvedValue({ turn: { id: "turn-id", @@ -716,7 +890,10 @@ describe('ACP server test', { timeout: 40_000 }, () => { durationMs: null, } }); - vi.spyOn(mockFixture.getCodexAcpAgent(), "getSessionState").mockReturnValue(sessionState); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); return { mockFixture, sessionState, turnStartSpy }; } @@ -826,7 +1003,7 @@ describe('ACP server test', { timeout: 40_000 }, () => { } }); - const { mockFixture } = setupPromptFixture({ rateLimits }); + const { mockFixture } = setupPromptFixture({ sessionId: "session-id", rateLimits }); await mockFixture.getCodexAcpAgent().prompt({ sessionId: "session-id", prompt: [{ type: "text", text: "/status" }] }); await expect(mockFixture.getAcpConnectionDump([])).toMatchFileSnapshot("data/command-status-with-rate-limits.json"); @@ -924,4 +1101,493 @@ describe('ACP server test', { timeout: 40_000 }, () => { } }); }); + + it('closes an active session by interrupting the turn, unsubscribing the thread, and removing listeners', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + vi.spyOn(codexAppServerClient, "awaitTurnCompleted").mockResolvedValue({ + threadId: "session-close", + turn: { + id: "turn-close", + items: [], + status: "interrupted", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); + + await codexAcpClient.subscribeToSessionEvents( + "session-close", + vi.fn(), + { + handleCommandExecution: vi.fn(), + handleFileChange: vi.fn(), + }, + { + handleElicitation: vi.fn(), + } + ); + + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.notificationHandlers.has("session-close")).toBe(true); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.approvalHandlers.has("session-close")).toBe(true); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(true); + // @ts-expect-error verifying test-only access to private maps + codexAppServerClient.turnCompletedResolvers.set("session-close", [{ + turnId: "other-turn", + resolve: vi.fn(), + reject: vi.fn(), + }]); + // @ts-expect-error verifying test-only access to private maps + codexAppServerClient.completedTurnsByThread.set("session-close", new Map([ + ["completed-turn", { + threadId: "session-close", + turn: { + id: "completed-turn", + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }] + ])); + + await codexAcpClient.closeSession("session-close", "turn-close"); + + const closeRequests = mockFixture.getCodexConnectionEvents([]) + .filter((event) => event.eventType === "request"); + expect(closeRequests).toEqual([ + { + eventType: "request", + method: "turn/interrupt", + params: {threadId: "session-close", turnId: "turn-close"}, + }, + { + eventType: "request", + method: "thread/unsubscribe", + params: {threadId: "session-close"}, + }, + ]); + + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.notificationHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.approvalHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.turnCompletedResolvers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.completedTurnsByThread.has("session-close")).toBe(false); + }); + + it('times out waiting for turn completion during close and still removes local listeners', async () => { + vi.useFakeTimers(); + try { + const mockFixture = createCodexMockTestFixture(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + vi.spyOn(codexAppServerClient, "awaitTurnCompleted").mockReturnValue(new Promise(() => {})); + + await codexAcpClient.subscribeToSessionEvents( + "session-close", + vi.fn(), + { + handleCommandExecution: vi.fn(), + handleFileChange: vi.fn(), + }, + { + handleElicitation: vi.fn(), + } + ); + + const closePromise = codexAcpClient.closeSession("session-close", "turn-close"); + await vi.advanceTimersByTimeAsync(10_000); + + await expect(closePromise).resolves.toBeUndefined(); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.notificationHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.turnCompletedResolvers.has("session-close")).toBe(false); + } finally { + vi.useRealTimers(); + } + }); + + it('removes local listeners when thread unsubscribe fails during close', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + vi.spyOn(codexAppServerClient, "threadUnsubscribe").mockRejectedValue(new Error("unsubscribe failed")); + + await codexAcpClient.subscribeToSessionEvents( + "session-close", + vi.fn(), + { + handleCommandExecution: vi.fn(), + handleFileChange: vi.fn(), + }, + { + handleElicitation: vi.fn(), + } + ); + + await expect(codexAcpClient.closeSession("session-close", null)).resolves.toBeUndefined(); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.notificationHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.approvalHandlers.has("session-close")).toBe(false); + // @ts-expect-error verifying test-only access to private maps + expect(codexAppServerClient.elicitationHandlers.has("session-close")).toBe(false); + }); + + it('waits for the matching turn completion before resolving a prompt', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAppServerClient = mockFixture.getCodexAppServerClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + currentModelId: "model-id[effort]", + agentMode: AgentMode.DEFAULT_AGENT_MODE, + }); + + codexAppServerClient.turnStart = vi.fn().mockResolvedValue({ + turn: { + id: "turn-close", + items: [], + status: "inProgress", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }); + vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + let promptSettled = false; + const promptPromise = codexAcpAgent.prompt({ + sessionId: "session-close", + prompt: [{type: "text", text: "wait for the right turn"}], + }).then((result) => { + promptSettled = true; + return result; + }); + + await vi.waitFor(() => { + expect(sessionState.currentTurnId).toBe("turn-close"); + }); + + mockFixture.sendServerNotification({ + method: "turn/completed", + params: { + threadId: "other-session", + turn: { + id: "turn-other", + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }, + }); + await Promise.resolve(); + expect(promptSettled).toBe(false); + + mockFixture.sendServerNotification({ + method: "turn/completed", + params: { + threadId: "session-close", + turn: { + id: "turn-close", + items: [], + status: "completed", + error: null, + startedAt: null, + completedAt: null, + durationMs: null, + }, + }, + }); + + await expect(promptPromise).resolves.toEqual({ + stopReason: "end_turn", + usage: null, + _meta: { + quota: { + token_count: null, + model_usage: [], + }, + }, + }); + }); + + it('removes session bookkeeping after closeSession', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + currentTurnId: "turn-close", + }); + const closeSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.pendingMcpStartupSessions.set(sessionState.sessionId, { + requestedServers: new Set(["alpha"]), + afterVersion: 0, + }); + + await expect( + codexAcpAgent.closeSession({sessionId: sessionState.sessionId}) + ).resolves.toEqual({}); + expect(closeSpy).toHaveBeenCalledWith(sessionState.sessionId, sessionState.currentTurnId); + expect(() => codexAcpAgent.getSessionState(sessionState.sessionId)).toThrow( + `Session ${sessionState.sessionId} not found` + ); + // @ts-expect-error verifying test-only access to private map + expect(codexAcpAgent.pendingMcpStartupSessions.has(sessionState.sessionId)).toBe(false); + }); + + it('uses the pending turn start when closing before turn/started arrives', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + currentTurnId: Promise.resolve("turn-pending"), + }); + const closeSpy = vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + await expect( + codexAcpAgent.closeSession({sessionId: sessionState.sessionId}) + ).resolves.toEqual({}); + + expect(closeSpy).toHaveBeenCalledWith(sessionState.sessionId, "turn-pending"); + }); + + it('cancels a prompt if the session closes before turn start begins', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let resolveTryHandle!: (handled: boolean) => void; + const tryHandlePromise = new Promise((resolve) => { + resolveTryHandle = resolve; + }); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + // @ts-expect-error exercising private helper through the availableCommands collaborator + const tryHandleSpy = vi.spyOn(codexAcpAgent.availableCommands, "tryHandle").mockReturnValue(tryHandlePromise); + const startPromptSpy = vi.spyOn(codexAcpClient, "startPrompt"); + vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + const promptPromise = codexAcpAgent.prompt({ + sessionId: sessionState.sessionId, + prompt: [{ type: "text", text: "normal prompt" }], + }); + + await vi.waitFor(() => { + expect(tryHandleSpy).toHaveBeenCalled(); + }); + + await expect( + codexAcpAgent.closeSession({sessionId: sessionState.sessionId}) + ).resolves.toEqual({}); + + resolveTryHandle(false); + + await expect(promptPromise).resolves.toEqual({ + stopReason: "cancelled", + usage: null, + _meta: { + quota: { + token_count: null, + model_usage: [], + }, + }, + }); + expect(startPromptSpy).not.toHaveBeenCalled(); + }); + + it('cancels a prompt when pending turn start rejects during close', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let rejectTurnStart!: (error: Error) => void; + const turnStartPromise = new Promise((_, reject) => { + rejectTurnStart = reject; + }); + const startPromptSpy = vi.spyOn(codexAcpClient, "startPrompt").mockReturnValue(turnStartPromise); + vi.spyOn(codexAcpClient, "closeSession").mockResolvedValue(); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + const promptPromise = codexAcpAgent.prompt({ + sessionId: sessionState.sessionId, + prompt: [{ type: "text", text: "normal prompt" }], + }); + + await vi.waitFor(() => { + expect(startPromptSpy).toHaveBeenCalled(); + }); + + const closePromise = codexAcpAgent.closeSession({sessionId: sessionState.sessionId}); + rejectTurnStart(new Error("turn/start failed")); + + await expect(closePromise).resolves.toEqual({}); + await expect(promptPromise).resolves.toEqual({ + stopReason: "cancelled", + usage: null, + _meta: { + quota: { + token_count: null, + model_usage: [], + }, + }, + }); + }); + + it('skips late MCP startup updates after a session is closed', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let resolveStartup!: (event: any) => void; + const startupPromise = new Promise((resolve) => { + resolveStartup = resolve; + }); + vi.spyOn(codexAcpClient, "awaitMcpServerStartup").mockReturnValue(startupPromise as Promise); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.pendingMcpStartupSessions.set(sessionState.sessionId, { + requestedServers: new Set(["alpha"]), + afterVersion: 0, + }); + + // @ts-expect-error exercising private helper to verify close-session race handling + const publishPromise = codexAcpAgent.doPublishMcpStartupStatus(sessionState.sessionId); + // @ts-expect-error exercising private helper to simulate close-session cleanup + codexAcpAgent.forgetSession(sessionState.sessionId); + + resolveStartup({ + ready: ["alpha"], + failed: [], + cancelled: [], + }); + await publishPromise; + + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + }); + + it('skips late available commands updates after a session is closed', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let resolveSkills!: (response: SkillsListResponse) => void; + const skillsPromise = new Promise((resolve) => { + resolveSkills = resolve; + }); + vi.spyOn(codexAcpClient, "listSkills").mockReturnValue(skillsPromise); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + // @ts-expect-error exercising private helper through the availableCommands collaborator + const publishPromise = codexAcpAgent.availableCommands.publish(sessionState.sessionId); + // @ts-expect-error exercising private helper to simulate close-session cleanup + codexAcpAgent.forgetSession(sessionState.sessionId); + + resolveSkills({ + data: [{ + cwd: "/workspace", + skills: [{ + name: "build", + description: "Build the project", + shortDescription: "Build", + path: "/workspace", + scope: "user", + enabled: true, + }], + errors: [], + }], + }); + await publishPromise; + + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + }); + + it('skips late slash command updates after a session is closed', async () => { + const mockFixture = createCodexMockTestFixture(); + const codexAcpAgent = mockFixture.getCodexAcpAgent(); + const codexAcpClient = mockFixture.getCodexAcpClient(); + const sessionState = createTestSessionState({ + sessionId: "session-close", + }); + + let resolveSkills!: (response: SkillsListResponse) => void; + const skillsPromise = new Promise((resolve) => { + resolveSkills = resolve; + }); + vi.spyOn(codexAcpClient, "listSkills").mockReturnValue(skillsPromise); + + // @ts-expect-error seeding private session store for focused close-session test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); + + // @ts-expect-error exercising private helper through the availableCommands collaborator + const commandPromise = codexAcpAgent.availableCommands.handleCommand({ name: "skills", input: null }, sessionState); + // @ts-expect-error exercising private helper to simulate close-session cleanup + codexAcpAgent.forgetSession(sessionState.sessionId); + + resolveSkills({ + data: [{ + cwd: "/workspace", + skills: [{ + name: "build", + description: "Build the project", + shortDescription: "Build", + path: "/workspace", + scope: "user", + enabled: true, + }], + errors: [], + }], + }); + + await expect(commandPromise).resolves.toBe(true); + expect(mockFixture.getAcpConnectionEvents([])).toEqual([]); + }); }); diff --git a/src/__tests__/CodexACPAgent/approval-events.test.ts b/src/__tests__/CodexACPAgent/approval-events.test.ts index 7ee43746..db135dd9 100644 --- a/src/__tests__/CodexACPAgent/approval-events.test.ts +++ b/src/__tests__/CodexACPAgent/approval-events.test.ts @@ -32,6 +32,8 @@ describe('Approval Events', () => { agentMode: AgentMode.DEFAULT_AGENT_MODE }); vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); const promptPromise = codexAcpAgent.prompt({ sessionId, diff --git a/src/__tests__/CodexACPAgent/command-action-events.test.ts b/src/__tests__/CodexACPAgent/command-action-events.test.ts index b6aa0f7a..bb92314d 100644 --- a/src/__tests__/CodexACPAgent/command-action-events.test.ts +++ b/src/__tests__/CodexACPAgent/command-action-events.test.ts @@ -23,7 +23,7 @@ describe('CodexEventHandler - command action events', () => { const listFilesNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -58,7 +58,7 @@ describe('CodexEventHandler - command action events', () => { const listFilesNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -93,7 +93,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -129,7 +129,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -165,7 +165,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -201,7 +201,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -237,7 +237,7 @@ describe('CodexEventHandler - command action events', () => { const searchNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -265,7 +265,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -283,7 +283,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/mcpToolCall/progress', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'call-id', message: "File /Users/aleksandr.slapoguzov/Projects/ultimate/.ai/local.md doesn't exist or can't be opened", @@ -292,7 +292,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -324,7 +324,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -342,7 +342,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/mcpToolCall/progress', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'call-id', message: repeatedMessage, @@ -351,7 +351,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/mcpToolCall/progress', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'call-id', message: repeatedMessage, @@ -360,7 +360,7 @@ describe('CodexEventHandler - command action events', () => { { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "mcpToolCall", @@ -390,7 +390,7 @@ describe('CodexEventHandler - command action events', () => { const dynamicToolNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: "dynamicToolCall", diff --git a/src/__tests__/CodexACPAgent/data/send-attachments-turn-start.json b/src/__tests__/CodexACPAgent/data/send-attachments-turn-start.json deleted file mode 100644 index 25ade969..00000000 --- a/src/__tests__/CodexACPAgent/data/send-attachments-turn-start.json +++ /dev/null @@ -1,66 +0,0 @@ -{ - "eventType": "request", - "method": "skills/list", - "params": { - "cwds": [ - "/test/cwd" - ], - "forceReload": true, - "perCwdExtraUserRoots": [ - { - "cwd": "cwd", - "extraUserRoots": [] - } - ] - } -} -{ - "eventType": "response" -} -{ - "eventType": "request", - "method": "turn/start", - "params": { - "outputSchema": null, - "threadId": "threadId", - "input": [ - { - "type": "text", - "text": "Hello", - "text_elements": [] - }, - { - "type": "image", - "url": "https://example.com/image.png" - }, - { - "type": "text", - "text": "[@report.txt](file:///tmp/report.txt)", - "text_elements": [] - }, - { - "type": "text", - "text": "[@notes.txt](file:///tmp/notes.txt)\n\nNotes body\n", - "text_elements": [] - } - ], - "approvalPolicy": "on-request", - "sandboxPolicy": { - "type": "workspaceWrite", - "writableRoots": [], - "readOnlyAccess": "readOnlyAccess", - "networkAccess": false, - "excludeTmpdirEnvVar": false, - "excludeSlashTmp": false - }, - "summary": null, - "personality": null, - "collaborationMode": null, - "cwd": "cwd", - "effort": "effort", - "model": "model" - } -} -{ - "eventType": "response" -} \ No newline at end of file diff --git a/src/__tests__/CodexACPAgent/elicitation-events.test.ts b/src/__tests__/CodexACPAgent/elicitation-events.test.ts index 32548bee..dd486bbf 100644 --- a/src/__tests__/CodexACPAgent/elicitation-events.test.ts +++ b/src/__tests__/CodexACPAgent/elicitation-events.test.ts @@ -33,6 +33,8 @@ describe('Elicitation Events', () => { agentMode: AgentMode.DEFAULT_AGENT_MODE }); vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); const promptPromise = codexAcpAgent.prompt({ sessionId, diff --git a/src/__tests__/CodexACPAgent/file-change-events.test.ts b/src/__tests__/CodexACPAgent/file-change-events.test.ts index b25094f2..ce00135a 100644 --- a/src/__tests__/CodexACPAgent/file-change-events.test.ts +++ b/src/__tests__/CodexACPAgent/file-change-events.test.ts @@ -44,7 +44,7 @@ describe('CodexEventHandler - file change events', () => { const newFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -79,7 +79,7 @@ describe('CodexEventHandler - file change events', () => { const multiFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -119,7 +119,7 @@ describe('CodexEventHandler - file change events', () => { const newFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -147,7 +147,7 @@ describe('CodexEventHandler - file change events', () => { const deleteFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -183,7 +183,7 @@ describe('CodexEventHandler - file change events', () => { const deletedFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -213,7 +213,7 @@ describe('CodexEventHandler - file change events', () => { const deleteFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', @@ -248,7 +248,7 @@ describe('CodexEventHandler - file change events', () => { const deletedFileNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'fileChange', diff --git a/src/__tests__/CodexACPAgent/initialize.test.ts b/src/__tests__/CodexACPAgent/initialize.test.ts index 8ca6b432..1d5cfef6 100644 --- a/src/__tests__/CodexACPAgent/initialize.test.ts +++ b/src/__tests__/CodexACPAgent/initialize.test.ts @@ -37,6 +37,7 @@ describe('CodexACPAgent - initialize', () => { image: true }, sessionCapabilities: { + close: {}, resume: {}, list: {}, }, diff --git a/src/__tests__/CodexACPAgent/model-rerouted-events.test.ts b/src/__tests__/CodexACPAgent/model-rerouted-events.test.ts index a037364c..61c49400 100644 --- a/src/__tests__/CodexACPAgent/model-rerouted-events.test.ts +++ b/src/__tests__/CodexACPAgent/model-rerouted-events.test.ts @@ -23,7 +23,7 @@ describe("CodexEventHandler - model rerouted events", () => { const modelReroutedNotification: ServerNotification = { method: "model/rerouted", params: { - threadId: "thread-1", + threadId: sessionId, turnId: "turn-1", fromModel: "gpt-5", toModel: "gpt-5-mini", diff --git a/src/__tests__/CodexACPAgent/terminal-output-events.test.ts b/src/__tests__/CodexACPAgent/terminal-output-events.test.ts index e0e3e661..de5ff938 100644 --- a/src/__tests__/CodexACPAgent/terminal-output-events.test.ts +++ b/src/__tests__/CodexACPAgent/terminal-output-events.test.ts @@ -23,7 +23,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandStartNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -60,7 +60,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandStartNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -90,7 +90,7 @@ describe('CodexEventHandler - terminal output events', () => { const outputDeltaNotification: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-123', delta: 'file1.txt\nfile2.txt\n', @@ -108,7 +108,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandCompletedNotification: ServerNotification = { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -137,7 +137,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandFailedNotification: ServerNotification = { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -166,7 +166,7 @@ describe('CodexEventHandler - terminal output events', () => { const dynamicToolCompletedNotification: ServerNotification = { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'dynamicToolCall', @@ -193,7 +193,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandStartNotification: ServerNotification = { method: 'item/started', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -214,7 +214,7 @@ describe('CodexEventHandler - terminal output events', () => { const outputDeltaNotification: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-flow', delta: 'hello\n', @@ -224,7 +224,7 @@ describe('CodexEventHandler - terminal output events', () => { const commandCompletedNotification: ServerNotification = { method: 'item/completed', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', item: { type: 'commandExecution', @@ -257,7 +257,7 @@ describe('CodexEventHandler - terminal output events', () => { const delta1: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-accumulate', delta: 'line1\n', @@ -267,7 +267,7 @@ describe('CodexEventHandler - terminal output events', () => { const delta2: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-accumulate', delta: 'line2\n', @@ -277,7 +277,7 @@ describe('CodexEventHandler - terminal output events', () => { const delta3: ServerNotification = { method: 'item/commandExecution/outputDelta', params: { - threadId: 'thread-1', + threadId: sessionId, turnId: 'turn-1', itemId: 'command-accumulate', delta: 'line3\n', diff --git a/src/__tests__/CodexACPAgent/token-usage-events.test.ts b/src/__tests__/CodexACPAgent/token-usage-events.test.ts index 2025b2d2..75990c4e 100644 --- a/src/__tests__/CodexACPAgent/token-usage-events.test.ts +++ b/src/__tests__/CodexACPAgent/token-usage-events.test.ts @@ -49,7 +49,10 @@ describe('Token Usage Events', () => { }; }); - vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(createTestSessionState({ sessionId })); + const sessionState = createTestSessionState({ sessionId }); + vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); return codexAcpAgent; } @@ -179,7 +182,10 @@ describe('Token Usage Events', () => { }; }); - vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(createTestSessionState({ sessionId })); + const sessionState = createTestSessionState({ sessionId }); + vi.spyOn(codexAcpAgent, 'getSessionState').mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for prompt test + codexAcpAgent.sessions.set(sessionState.sessionId, sessionState); return async () => { await codexAcpAgent.prompt({ diff --git a/src/__tests__/acp-test-utils.ts b/src/__tests__/acp-test-utils.ts index d702f0d2..03a3ef24 100644 --- a/src/__tests__/acp-test-utils.ts +++ b/src/__tests__/acp-test-utils.ts @@ -334,6 +334,8 @@ export async function setupPromptAndSendNotifications( }); vi.spyOn(codexAcpAgent, "getSessionState").mockReturnValue(sessionState); + // @ts-expect-error seeding private session store for focused event-handler tests + codexAcpAgent.sessions.set(sessionId, sessionState); await codexAcpAgent.prompt({ sessionId,