From 1b2c893c8f47b07de1ecb13cd4ab0719f1522ffb Mon Sep 17 00:00:00 2001 From: Mike Olson Date: Thu, 18 Jun 2026 23:12:20 -0400 Subject: [PATCH] fix(grok): Complete turns from xAI prompt completion --- apps/server/scripts/acp-mock-agent.ts | 69 ++++++ .../src/provider/Layers/GrokAdapter.test.ts | 210 ++++++++++++++++++ .../server/src/provider/Layers/GrokAdapter.ts | 77 +++++-- .../provider/acp/AcpJsonRpcConnection.test.ts | 82 +++++++ .../src/provider/acp/AcpSessionRuntime.ts | 178 ++++++++++++++- packages/effect-acp/src/client.test.ts | 129 ++++++++++- 6 files changed, 723 insertions(+), 22 deletions(-) diff --git a/apps/server/scripts/acp-mock-agent.ts b/apps/server/scripts/acp-mock-agent.ts index 2b5da74eef0..1394bd85fb2 100644 --- a/apps/server/scripts/acp-mock-agent.ts +++ b/apps/server/scripts/acp-mock-agent.ts @@ -19,6 +19,10 @@ const emitInterleavedAssistantToolCalls = const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHOLDERS === "1"; const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1"; const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1"; +const emitXAiPromptCompleteThenHang = process.env.T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG === "1"; +const emitStaleXAiPromptCompleteBeforeSecondHang = + process.env.T3_ACP_EMIT_STALE_XAI_PROMPT_COMPLETE_BEFORE_SECOND_HANG === "1"; +const failPrompt = process.env.T3_ACP_FAIL_PROMPT === "1"; const failSetConfigOption = process.env.T3_ACP_FAIL_SET_CONFIG_OPTION === "1"; const exitOnSetConfigOption = process.env.T3_ACP_EXIT_ON_SET_CONFIG_OPTION === "1"; const promptResponseText = process.env.T3_ACP_PROMPT_RESPONSE_TEXT; @@ -36,6 +40,7 @@ let parameterizedModelPicker = false; let currentReasoning = "medium"; let currentContext = "272k"; let currentFast = false; +let promptCount = 0; const cancelledSessions = new Set(); function logExit(reason: string): void { @@ -45,6 +50,10 @@ function logExit(reason: string): void { appendFileSync(exitLogPath, `${reason}\n`, "utf8"); } +function writeJsonRpcNotification(method: string, params: unknown): void { + process.stdout.write(`${JSON.stringify({ jsonrpc: "2.0", method, params })}\n`); +} + process.once("SIGTERM", () => { logExit("SIGTERM"); process.exit(0); @@ -364,11 +373,71 @@ const program = Effect.gen(function* () { yield* agent.handlePrompt((request) => Effect.gen(function* () { const requestedSessionId = String(request.sessionId ?? sessionId); + promptCount += 1; if (Number.isFinite(promptDelayMs) && promptDelayMs > 0) { yield* Effect.sleep(`${promptDelayMs} millis`); } + if (failPrompt) { + return yield* AcpError.AcpRequestError.internalError("Mock prompt failure"); + } + + if (emitStaleXAiPromptCompleteBeforeSecondHang && promptCount === 1) { + return { + stopReason: "end_turn", + _meta: { + promptId: "mock-stale-xai-prompt-1", + requestId: "mock-stale-xai-prompt-1", + }, + }; + } + + if (emitStaleXAiPromptCompleteBeforeSecondHang && promptCount === 2) { + writeJsonRpcNotification("_x.ai/session/prompt_complete", { + sessionId: requestedSessionId, + promptId: "mock-stale-xai-prompt-1", + stopReason: "end_turn", + agentResult: null, + }); + + writeJsonRpcNotification("_x.ai/session/prompt_complete", { + sessionId: requestedSessionId, + promptId: "mock-current-xai-prompt-2", + stopReason: "end_turn", + agentResult: null, + }); + + return yield* Effect.never; + } + + if (emitXAiPromptCompleteThenHang) { + writeJsonRpcNotification("session/update", { + sessionId: requestedSessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "hello from " }, + }, + }); + + writeJsonRpcNotification("_x.ai/session/prompt_complete", { + sessionId: requestedSessionId, + promptId: "mock-xai-prompt-1", + stopReason: "end_turn", + agentResult: null, + }); + + writeJsonRpcNotification("session/update", { + sessionId: requestedSessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "mock" }, + }, + }); + + return yield* Effect.never; + } + if (emitInterleavedAssistantToolCalls) { const toolCallId = "tool-call-1"; diff --git a/apps/server/src/provider/Layers/GrokAdapter.test.ts b/apps/server/src/provider/Layers/GrokAdapter.test.ts index bfd5ae25755..69a637c2c4c 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.test.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.test.ts @@ -10,6 +10,7 @@ import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; import * as Fiber from "effect/Fiber"; import * as Layer from "effect/Layer"; +import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; import * as Stream from "effect/Stream"; @@ -175,6 +176,215 @@ it.layer(grokAdapterTestLayer)("GrokAdapterLive", (it) => { }), ); + it.effect("reports a Grok session running only while the prompt is in flight", () => + Effect.gen(function* () { + const threadId = ThreadId.make("grok-session-ready-after-prompt"); + const wrapperPath = yield* Effect.promise(() => + makeMockGrokWrapper({ + T3_ACP_EMIT_TOOL_CALLS: "1", + }), + ); + const adapter = yield* makeTestAdapter(wrapperPath); + const requestOpened = + yield* Deferred.make>(); + const eventsFiber = yield* Stream.runForEach(adapter.streamEvents, (event) => + event.type === "request.opened" + ? Deferred.succeed(requestOpened, event).pipe(Effect.ignore) + : Effect.void, + ).pipe(Effect.forkChild); + + yield* adapter.startSession({ + threadId, + provider: ProviderDriverKind.make("grok"), + cwd: process.cwd(), + runtimeMode: "approval-required", + modelSelection: { instanceId: ProviderInstanceId.make("grok"), model: "grok-build" }, + }); + + const sendTurnFiber = yield* adapter + .sendTurn({ threadId, input: "check lifecycle", attachments: [] }) + .pipe(Effect.forkChild); + const requestOpenedEvent = yield* Deferred.await(requestOpened); + + const runningSessions = yield* adapter.listSessions(); + const runningSession = runningSessions.find((session) => session.threadId === threadId); + assert.equal(runningSession?.status, "running"); + assert.isDefined(runningSession?.activeTurnId); + + yield* adapter.respondToRequest( + threadId, + ApprovalRequestId.make(String(requestOpenedEvent.requestId)), + "accept", + ); + yield* Fiber.join(sendTurnFiber); + + const readySessions = yield* adapter.listSessions(); + const readySession = readySessions.find((session) => session.threadId === threadId); + assert.equal(readySession?.status, "ready"); + assert.isUndefined(readySession?.activeTurnId); + + yield* Fiber.interrupt(eventsFiber); + yield* adapter.stopSession(threadId); + }), + ); + + it.effect("completes a Grok turn from xAI prompt completion when the prompt RPC hangs", () => + Effect.gen(function* () { + const threadId = ThreadId.make("grok-xai-prompt-complete-fallback"); + const wrapperPath = yield* Effect.promise(() => + makeMockGrokWrapper({ + T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG: "1", + }), + ); + const adapter = yield* makeTestAdapter(wrapperPath); + + const runtimeEvents: ProviderRuntimeEvent[] = []; + const turnCompleted = yield* Deferred.make(); + const runtimeEventsFiber = yield* Stream.runForEach(adapter.streamEvents, (event) => + Effect.sync(() => { + runtimeEvents.push(event); + }).pipe( + Effect.andThen( + event.type === "turn.completed" + ? Deferred.succeed(turnCompleted, undefined) + : Effect.void, + ), + ), + ).pipe(Effect.forkChild); + + yield* adapter.startSession({ + threadId, + provider: ProviderDriverKind.make("grok"), + cwd: process.cwd(), + runtimeMode: "full-access", + modelSelection: { instanceId: ProviderInstanceId.make("grok"), model: "grok-build" }, + }); + + const sendTurnResult = yield* adapter.sendTurn({ + threadId, + input: "exercise fallback", + attachments: [], + }); + + yield* Deferred.await(turnCompleted); + const readySessions = yield* adapter.listSessions(); + const readySession = readySessions.find((session) => session.threadId === threadId); + const turnCompletedEvent = runtimeEvents.find( + (event): event is Extract => + event.type === "turn.completed", + ); + const eventTypes = runtimeEvents.map((event) => event.type); + + assert.equal(sendTurnResult.threadId, threadId); + assert.include(eventTypes, "turn.completed"); + assert.equal(turnCompletedEvent?.payload.stopReason, "end_turn"); + assert.equal(readySession?.status, "ready"); + assert.isUndefined(readySession?.activeTurnId); + + yield* Fiber.interrupt(runtimeEventsFiber); + yield* adapter.stopSession(threadId); + }), + ); + + it.effect("settles the in-flight prompt before emitting completion", () => + Effect.gen(function* () { + const threadId = ThreadId.make("grok-completion-before-next-turn"); + const wrapperPath = yield* Effect.promise(() => makeMockGrokWrapper()); + const adapter = yield* makeTestAdapter(wrapperPath); + const completedCountRef = yield* Ref.make(0); + const secondTurnCompleted = yield* Deferred.make(); + + const runtimeEventsFiber = yield* Stream.runForEach(adapter.streamEvents, (event) => { + if (event.type !== "turn.completed" || String(event.threadId) !== String(threadId)) { + return Effect.void; + } + + return Ref.modify(completedCountRef, (count) => { + const nextCount = count + 1; + return [nextCount, nextCount] as const; + }).pipe( + Effect.flatMap((count) => { + if (count === 1) { + return adapter + .sendTurn({ + threadId, + input: "second turn after completion", + attachments: [], + }) + .pipe(Effect.forkChild, Effect.asVoid); + } + if (count === 2) { + return Deferred.succeed(secondTurnCompleted, undefined).pipe(Effect.asVoid); + } + return Effect.void; + }), + ); + }).pipe(Effect.forkChild); + + yield* adapter.startSession({ + threadId, + provider: ProviderDriverKind.make("grok"), + cwd: process.cwd(), + runtimeMode: "full-access", + modelSelection: { instanceId: ProviderInstanceId.make("grok"), model: "grok-build" }, + }); + + yield* adapter.sendTurn({ + threadId, + input: "first turn", + attachments: [], + }); + yield* Deferred.await(secondTurnCompleted); + + const completedCount = yield* Ref.get(completedCountRef); + const readySessions = yield* adapter.listSessions(); + const readySession = readySessions.find((session) => session.threadId === threadId); + + assert.equal(completedCount, 2); + assert.equal(readySession?.status, "ready"); + assert.isUndefined(readySession?.activeTurnId); + + yield* Fiber.interrupt(runtimeEventsFiber); + yield* adapter.stopSession(threadId); + }), + ); + + it.effect("restores a Grok session to ready when the prompt RPC fails", () => + Effect.gen(function* () { + const threadId = ThreadId.make("grok-prompt-failure-ready"); + const wrapperPath = yield* Effect.promise(() => + makeMockGrokWrapper({ + T3_ACP_FAIL_PROMPT: "1", + }), + ); + const adapter = yield* makeTestAdapter(wrapperPath); + + yield* adapter.startSession({ + threadId, + provider: ProviderDriverKind.make("grok"), + cwd: process.cwd(), + runtimeMode: "full-access", + modelSelection: { instanceId: ProviderInstanceId.make("grok"), model: "grok-build" }, + }); + + const error = yield* Effect.flip( + adapter.sendTurn({ + threadId, + input: "fail prompt", + attachments: [], + }), + ); + const readySessions = yield* adapter.listSessions(); + const readySession = readySessions.find((session) => session.threadId === threadId); + + assert.equal(error._tag, "ProviderAdapterRequestError"); + assert.equal(readySession?.status, "ready"); + assert.isUndefined(readySession?.activeTurnId); + + yield* adapter.stopSession(threadId); + }), + ); + it.effect("rejects startSession when provider mismatches", () => Effect.gen(function* () { const wrapperPath = yield* Effect.promise(() => makeMockGrokWrapper()); diff --git a/apps/server/src/provider/Layers/GrokAdapter.ts b/apps/server/src/provider/Layers/GrokAdapter.ts index a21a2bb9fc7..daf0a69db10 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.ts @@ -22,6 +22,7 @@ import * as FileSystem from "effect/FileSystem"; import * as Option from "effect/Option"; import * as Path from "effect/Path"; import * as PubSub from "effect/PubSub"; +import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; import * as Scope from "effect/Scope"; import * as Semaphore from "effect/Semaphore"; @@ -240,6 +241,31 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte const withThreadLock = (threadId: string, effect: Effect.Effect) => Effect.flatMap(getThreadSemaphore(threadId), (semaphore) => semaphore.withPermit(effect)); + const settlePromptInFlight = (threadId: ThreadId, turnId: TurnId) => + Effect.gen(function* () { + const liveCtx = sessions.get(threadId); + if (!liveCtx) { + return; + } + const remainingPrompts = Math.max(0, liveCtx.promptsInFlight - 1); + liveCtx.promptsInFlight = remainingPrompts; + if ( + remainingPrompts > 0 || + liveCtx.activeTurnId !== turnId || + liveCtx.session.activeTurnId !== turnId + ) { + return; + } + const updatedAt = yield* nowIso; + const { activeTurnId: _activeTurnId, ...readySession } = liveCtx.session; + liveCtx.activeTurnId = undefined; + liveCtx.session = { + ...readySession, + status: "ready", + updatedAt, + }; + }); + const logNative = (threadId: ThreadId, method: string, payload: unknown) => Effect.gen(function* () { if (!nativeEventLogger) return; @@ -771,6 +797,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte } ctx.session = { ...ctx.session, + status: "running", activeTurnId: turnId, updatedAt: yield* nowIso, ...(displayModel ? { model: displayModel } : {}), @@ -794,15 +821,10 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte promptParts, turnId, }; - }).pipe( - Effect.tapCause(() => - Effect.sync(() => { - ctx.promptsInFlight = Math.max(0, ctx.promptsInFlight - 1); - }), - ), - ); + }).pipe(Effect.tapCause(() => settlePromptInFlight(input.threadId, turnId))); }), ); + const promptSettled = yield* Ref.make(false); return yield* Effect.gen(function* () { const result = yield* prepared.acp @@ -843,15 +865,32 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte ]; ctx.session = { ...ctx.session, + status: "running", activeTurnId: prepared.turnId, updatedAt: yield* nowIso, ...(prepared.displayModel ? { model: prepared.displayModel } : {}), }; + const remainingPrompts = Math.max(0, ctx.promptsInFlight - 1); + ctx.promptsInFlight = remainingPrompts; + yield* Ref.set(promptSettled, true); - // Only the last remaining prompt settles the turn — a steer- - // superseded prompt resolving (usually cancelled) while another - // is in flight or pending must leave the merged turn running. - if (ctx.promptsInFlight === 1) { + // Only the last remaining prompt settles the turn. A steer- + // superseded prompt resolving while another is in flight or + // pending must leave the merged turn running. + if ( + remainingPrompts === 0 && + ctx.activeTurnId === prepared.turnId && + ctx.session.activeTurnId === prepared.turnId + ) { + const completedAt = yield* nowIso; + const { activeTurnId: _completedTurnId, ...readySession } = ctx.session; + ctx.activeTurnId = undefined; + ctx.session = { + ...readySession, + status: "ready", + updatedAt: completedAt, + ...(prepared.displayModel ? { model: prepared.displayModel } : {}), + }; yield* offerRuntimeEvent({ type: "turn.completed", ...(yield* makeEventStamp()), @@ -874,12 +913,16 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte ); }).pipe( Effect.ensuring( - Effect.sync(() => { - const liveCtx = sessions.get(input.threadId); - if (liveCtx) { - liveCtx.promptsInFlight = Math.max(0, liveCtx.promptsInFlight - 1); - } - }), + Ref.get(promptSettled).pipe( + Effect.flatMap((settled) => + settled + ? Effect.void + : withThreadLock( + input.threadId, + settlePromptInFlight(input.threadId, prepared.turnId), + ), + ), + ), ), ); }); diff --git a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts index f2e286c589c..216360b30a2 100644 --- a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts +++ b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts @@ -113,6 +113,88 @@ describe("AcpSessionRuntime", () => { ), ); + it.effect("resolves a prompt from xAI prompt completion when the prompt RPC hangs", () => + Effect.gen(function* () { + const runtime = yield* AcpSessionRuntime; + yield* runtime.start(); + + const promptResult = yield* runtime.prompt({ + prompt: [{ type: "text", text: "hi" }], + }); + + expect(promptResult).toMatchObject({ + stopReason: "end_turn", + _meta: { + sessionId: "mock-session-1", + promptId: "mock-xai-prompt-1", + requestId: "mock-xai-prompt-1", + }, + }); + }).pipe( + Effect.provide( + AcpSessionRuntime.layer({ + spawn: { + command: mockAgentCommand, + args: mockAgentArgs, + env: { + T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG: "1", + }, + }, + cwd: process.cwd(), + clientInfo: { name: "t3-test", version: "0.0.0" }, + authMethodId: "test", + }), + ), + Effect.scoped, + Effect.provide(NodeServices.layer), + ), + ); + + it.effect("ignores stale xAI prompt completion for an already completed prompt", () => + Effect.gen(function* () { + const runtime = yield* AcpSessionRuntime; + yield* runtime.start(); + + const firstPromptResult = yield* runtime.prompt({ + prompt: [{ type: "text", text: "first" }], + }); + expect(firstPromptResult).toMatchObject({ + stopReason: "end_turn", + _meta: { + promptId: "mock-stale-xai-prompt-1", + }, + }); + + const secondPromptResult = yield* runtime.prompt({ + prompt: [{ type: "text", text: "second" }], + }); + expect(secondPromptResult).toMatchObject({ + stopReason: "end_turn", + _meta: { + promptId: "mock-current-xai-prompt-2", + requestId: "mock-current-xai-prompt-2", + }, + }); + }).pipe( + Effect.provide( + AcpSessionRuntime.layer({ + spawn: { + command: mockAgentCommand, + args: mockAgentArgs, + env: { + T3_ACP_EMIT_STALE_XAI_PROMPT_COMPLETE_BEFORE_SECOND_HANG: "1", + }, + }, + cwd: process.cwd(), + clientInfo: { name: "t3-test", version: "0.0.0" }, + authMethodId: "test", + }), + ), + Effect.scoped, + Effect.provide(NodeServices.layer), + ), + ); + it.effect("segments assistant text around ACP tool calls", () => Effect.gen(function* () { const runtime = yield* AcpSessionRuntime; diff --git a/apps/server/src/provider/acp/AcpSessionRuntime.ts b/apps/server/src/provider/acp/AcpSessionRuntime.ts index b8097f10b75..5e10cbdab59 100644 --- a/apps/server/src/provider/acp/AcpSessionRuntime.ts +++ b/apps/server/src/provider/acp/AcpSessionRuntime.ts @@ -7,6 +7,7 @@ import * as Queue from "effect/Queue"; import * as Ref from "effect/Ref"; import * as Scope from "effect/Scope"; import * as Context from "effect/Context"; +import * as Schema from "effect/Schema"; import * as Stream from "effect/Stream"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; import * as EffectAcpClient from "effect-acp/client"; @@ -31,6 +32,26 @@ function formatConfigOptionValue(value: string | boolean): string { return JSON.stringify(value); } +const XAiPromptCompleteNotification = Schema.Struct({ + sessionId: Schema.String, + promptId: Schema.optional(Schema.String), + stopReason: Schema.optional(Schema.String), + agentResult: Schema.optional(Schema.NullOr(Schema.Unknown)), +}); + +type XAiPromptCompleteNotification = typeof XAiPromptCompleteNotification.Type; + +const decodeXAiPromptCompleteNotification = Schema.decodeUnknownEffect( + XAiPromptCompleteNotification, +); + +interface PendingXAiPromptCompletion { + readonly sessionId: string; + readonly deferred: Deferred.Deferred; +} + +const completedXAiPromptIdLimit = 128; + export interface AcpSpawnInput { readonly command: string; readonly args: ReadonlyArray; @@ -170,6 +191,10 @@ const makeAcpSessionRuntime = ( const assistantSegmentRef = yield* Ref.make({ nextSegmentIndex: 0 }); const configOptionsRef = yield* Ref.make(sessionConfigOptionsFromSetup(undefined)); const startStateRef = yield* Ref.make({ _tag: "NotStarted" }); + const pendingXAiPromptCompletionsRef = yield* Ref.make< + ReadonlyArray + >([]); + const completedXAiPromptIdsRef = yield* Ref.make>([]); const logRequest = (event: AcpSessionRequestLogEvent) => options.requestLogger ? options.requestLogger(event) : Effect.void; @@ -249,6 +274,21 @@ const makeAcpSessionRuntime = ( params: notification, }), ); + yield* acp.handleUnknownExtNotification((method, payload) => { + if (method !== "_x.ai/session/prompt_complete") { + return Effect.void; + } + return decodeXAiPromptCompleteNotification(payload).pipe( + Effect.flatMap((notification) => + resolveXAiPromptCompletionFallback({ + pendingRef: pendingXAiPromptCompletionsRef, + completedPromptIdsRef: completedXAiPromptIdsRef, + notification, + }), + ), + Effect.catch(() => Effect.void), + ); + }); const initializeClientCapabilities = { fs: { @@ -520,10 +560,32 @@ const makeAcpSessionRuntime = ( assistantSegmentRef, }).pipe( Effect.andThen( - runLoggedRequest( - "session/prompt", - requestPayload, - acp.agent.prompt(requestPayload), + registerXAiPromptCompletionFallback( + pendingXAiPromptCompletionsRef, + started.sessionId, + ), + ), + Effect.flatMap((fallback) => + // Grok can emit its private prompt-complete notification even when the + // standard session/prompt RPC never settles. Race the RPC with that + // notification so callers still emit turn completion and clear UI state. + Effect.raceFirst( + runLoggedRequest( + "session/prompt", + requestPayload, + acp.agent.prompt(requestPayload), + ), + Deferred.await(fallback.deferred), + ).pipe( + Effect.tap((response) => + rememberCompletedXAiPromptId(completedXAiPromptIdsRef, response), + ), + Effect.ensuring( + unregisterXAiPromptCompletionFallback( + pendingXAiPromptCompletionsRef, + fallback.deferred, + ), + ), ), ), Effect.tap(() => @@ -757,3 +819,111 @@ const closeActiveAssistantSegment = ({ } satisfies AcpAssistantSegmentState, ] as const; }).pipe(Effect.flatMap((event) => (event ? Queue.offer(queue, event) : Effect.void))); + +const registerXAiPromptCompletionFallback = ( + pendingRef: Ref.Ref>, + sessionId: string, +) => + Deferred.make().pipe( + Effect.tap((deferred) => + Ref.update(pendingRef, (pending) => [...pending, { sessionId, deferred }]), + ), + Effect.map((deferred) => ({ deferred })), + ); + +const unregisterXAiPromptCompletionFallback = ( + pendingRef: Ref.Ref>, + deferred: Deferred.Deferred, +) => Ref.update(pendingRef, (pending) => pending.filter((entry) => entry.deferred !== deferred)); + +const resolveXAiPromptCompletionFallback = ({ + pendingRef, + completedPromptIdsRef, + notification, +}: { + readonly pendingRef: Ref.Ref>; + readonly completedPromptIdsRef: Ref.Ref>; + readonly notification: XAiPromptCompleteNotification; +}) => + Ref.get(completedPromptIdsRef).pipe( + Effect.flatMap((completedPromptIds) => { + if ( + notification.promptId !== undefined && + completedPromptIds.includes(notification.promptId) + ) { + return Effect.void; + } + return Ref.modify(pendingRef, (pending) => { + const index = pending.findIndex((entry) => entry.sessionId === notification.sessionId); + if (index < 0) { + return [Effect.void, pending] as const; + } + const entry = pending[index]; + if (!entry) { + return [Effect.void, pending] as const; + } + return [ + Deferred.succeed(entry.deferred, promptResponseFromXAi(notification)).pipe(Effect.asVoid), + [...pending.slice(0, index), ...pending.slice(index + 1)], + ] as const; + }).pipe(Effect.flatten); + }), + ); + +const rememberCompletedXAiPromptId = ( + completedPromptIdsRef: Ref.Ref>, + response: EffectAcpSchema.PromptResponse, +) => { + const promptId = promptIdFromResponse(response); + if (promptId === undefined) { + return Effect.void; + } + return Ref.update(completedPromptIdsRef, (completedPromptIds) => { + if (completedPromptIds.includes(promptId)) { + return completedPromptIds; + } + return [...completedPromptIds, promptId].slice(-completedXAiPromptIdLimit); + }); +}; + +function promptIdFromResponse(response: EffectAcpSchema.PromptResponse): string | undefined { + const meta = response._meta; + if (meta === null || typeof meta !== "object") { + return undefined; + } + const promptId = meta.promptId ?? meta.requestId; + return typeof promptId === "string" && promptId.length > 0 ? promptId : undefined; +} + +function promptResponseFromXAi( + notification: XAiPromptCompleteNotification, +): EffectAcpSchema.PromptResponse { + const stopReason = normalizeXAiStopReason(notification.stopReason); + const meta: Record = { + sessionId: notification.sessionId, + }; + if (notification.promptId !== undefined) { + meta.promptId = notification.promptId; + meta.requestId = notification.promptId; + } + if (notification.agentResult !== undefined) { + meta.agentResult = notification.agentResult; + } + return { + stopReason, + _meta: meta, + }; +} + +function normalizeXAiStopReason(value: string | undefined): EffectAcpSchema.StopReason { + switch (value) { + case "cancelled": + case "end_turn": + case "max_tokens": + case "max_turn_requests": + case "refusal": + return value; + default: + return "end_turn"; + } +} diff --git a/packages/effect-acp/src/client.test.ts b/packages/effect-acp/src/client.test.ts index aca87d45c62..0b825bd0592 100644 --- a/packages/effect-acp/src/client.test.ts +++ b/packages/effect-acp/src/client.test.ts @@ -17,18 +17,59 @@ import { it, assert } from "@effect/vitest"; import * as AcpClient from "./client.ts"; import * as AcpSchema from "./_generated/schema.gen.ts"; import * as AcpError from "./errors.ts"; -import { encodeJsonl, jsonRpcRequest, jsonRpcResponse } from "./_internal/shared.ts"; +import { + encodeJsonl, + jsonRpcNotification, + jsonRpcRequest, + jsonRpcResponse, +} from "./_internal/shared.ts"; import { makeInMemoryStdio } from "./_internal/stdio.ts"; const InitializeRequest = jsonRpcRequest("initialize", AcpSchema.InitializeRequest); const InitializeResponse = jsonRpcResponse(AcpSchema.InitializeResponse); const ExtRequest = jsonRpcRequest("x/test", Schema.Struct({ hello: Schema.String })); const ExtResponse = jsonRpcResponse(Schema.Struct({ ok: Schema.Boolean })); +const PromptRequest = jsonRpcRequest("session/prompt", AcpSchema.PromptRequest); +const PromptResponse = jsonRpcResponse(AcpSchema.PromptResponse); +const decodePromptRequestLine = Schema.decodeEffect(Schema.fromJsonString(PromptRequest)); +const XAiPromptCompleteNotification = jsonRpcNotification( + "_x.ai/session/prompt_complete", + Schema.Struct({ + sessionId: Schema.String, + promptId: Schema.String, + stopReason: Schema.String, + agentResult: Schema.NullOr(Schema.Unknown), + }), +); +const XAiQueueChangedNotification = jsonRpcNotification( + "_x.ai/queue/changed", + Schema.Struct({ + sessionId: Schema.String, + entries: Schema.Array(Schema.Unknown), + }), +); +const XAiSessionsChangedNotification = jsonRpcNotification( + "_x.ai/sessions/changed", + Schema.Struct({ + upserted: Schema.Array(Schema.Unknown), + removed: Schema.Array(Schema.Unknown), + }), +); const mockPeerPath = Effect.map(Effect.service(Path.Path), (path) => path.join(import.meta.dirname, "../test/fixtures/acp-mock-peer.ts"), ); const mockPeerArgs = (path: string) => [path]; +function concatBytes(chunks: ReadonlyArray): Uint8Array { + const batch = new Uint8Array(chunks.reduce((total, chunk) => total + chunk.length, 0)); + let offset = 0; + for (const chunk of chunks) { + batch.set(chunk, offset); + offset += chunk.length; + } + return batch; +} + it.layer(NodeServices.layer)("effect-acp client", (it) => { const makeHandle = (env?: Record) => Effect.gen(function* () { @@ -446,4 +487,90 @@ it.layer(NodeServices.layer)("effect-acp client", (it) => { yield* Scope.close(scope, Exit.void); }), ); + + it.effect( + "routes a standard prompt response after Grok extension notifications in the same batch", + () => + Effect.gen(function* () { + const { stdio, input, output } = yield* makeInMemoryStdio(); + const scope = yield* Scope.make(); + const acp = yield* AcpClient.make(stdio).pipe(Effect.provideService(Scope.Scope, scope)); + + const promptFiber = yield* acp.agent + .prompt({ + sessionId: "grok-session-1", + prompt: [{ type: "text", text: "run the ls command" }], + }) + .pipe(Effect.forkScoped); + + const outbound = yield* Queue.take(output); + const decodedPrompt = yield* decodePromptRequestLine(outbound); + + const responseBatch = concatBytes( + yield* Effect.all([ + encodeJsonl(XAiQueueChangedNotification, { + jsonrpc: "2.0", + method: "_x.ai/queue/changed", + params: { sessionId: "grok-session-1", entries: [] }, + }), + encodeJsonl(XAiPromptCompleteNotification, { + jsonrpc: "2.0", + method: "_x.ai/session/prompt_complete", + params: { + sessionId: "grok-session-1", + promptId: "prompt-1", + stopReason: "end_turn", + agentResult: null, + }, + }), + encodeJsonl(XAiSessionsChangedNotification, { + jsonrpc: "2.0", + method: "_x.ai/sessions/changed", + params: { + upserted: [ + { + sessionId: "grok-session-1", + title: null, + cwd: process.cwd(), + isWorktree: false, + modelId: "grok-composer-2.5-fast", + yolo: false, + activity: "idle", + resident: true, + lastChangeUnixMs: 1_710_000_000_000, + origin: { kind: "local" }, + }, + ], + removed: [], + }, + }), + encodeJsonl(PromptResponse, { + jsonrpc: "2.0", + id: decodedPrompt.id, + result: { + stopReason: "end_turn", + _meta: { + sessionId: "grok-session-1", + requestId: "prompt-1", + promptId: "prompt-1", + modelId: "grok-composer-2.5-fast", + }, + }, + }), + ]), + ); + yield* Queue.offer(input, responseBatch); + + assert.deepEqual(yield* Fiber.join(promptFiber), { + stopReason: "end_turn", + _meta: { + sessionId: "grok-session-1", + requestId: "prompt-1", + promptId: "prompt-1", + modelId: "grok-composer-2.5-fast", + }, + }); + yield* Scope.close(scope, Exit.void); + }), + ); });