Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions apps/server/scripts/acp-mock-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const emitInterleavedAssistantToolCalls =
const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHOLDERS === "1";
const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1";
const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1";
const emitXAiPromptCompleteThenHang = process.env.T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG === "1";
const emitStaleXAiPromptCompleteBeforeSecondHang =
process.env.T3_ACP_EMIT_STALE_XAI_PROMPT_COMPLETE_BEFORE_SECOND_HANG === "1";
const failPrompt = process.env.T3_ACP_FAIL_PROMPT === "1";
const failSetConfigOption = process.env.T3_ACP_FAIL_SET_CONFIG_OPTION === "1";
const exitOnSetConfigOption = process.env.T3_ACP_EXIT_ON_SET_CONFIG_OPTION === "1";
const promptResponseText = process.env.T3_ACP_PROMPT_RESPONSE_TEXT;
Expand All @@ -36,6 +40,7 @@ let parameterizedModelPicker = false;
let currentReasoning = "medium";
let currentContext = "272k";
let currentFast = false;
let promptCount = 0;
const cancelledSessions = new Set<string>();

function logExit(reason: string): void {
Expand All @@ -45,6 +50,10 @@ function logExit(reason: string): void {
appendFileSync(exitLogPath, `${reason}\n`, "utf8");
}

function writeJsonRpcNotification(method: string, params: unknown): void {
process.stdout.write(`${JSON.stringify({ jsonrpc: "2.0", method, params })}\n`);
}

process.once("SIGTERM", () => {
logExit("SIGTERM");
process.exit(0);
Expand Down Expand Up @@ -364,11 +373,71 @@ const program = Effect.gen(function* () {
yield* agent.handlePrompt((request) =>
Effect.gen(function* () {
const requestedSessionId = String(request.sessionId ?? sessionId);
promptCount += 1;

if (Number.isFinite(promptDelayMs) && promptDelayMs > 0) {
yield* Effect.sleep(`${promptDelayMs} millis`);
}

if (failPrompt) {
return yield* AcpError.AcpRequestError.internalError("Mock prompt failure");
}

if (emitStaleXAiPromptCompleteBeforeSecondHang && promptCount === 1) {
return {
stopReason: "end_turn",
_meta: {
promptId: "mock-stale-xai-prompt-1",
requestId: "mock-stale-xai-prompt-1",
},
};
}

if (emitStaleXAiPromptCompleteBeforeSecondHang && promptCount === 2) {
writeJsonRpcNotification("_x.ai/session/prompt_complete", {
sessionId: requestedSessionId,
promptId: "mock-stale-xai-prompt-1",
stopReason: "end_turn",
agentResult: null,
});

writeJsonRpcNotification("_x.ai/session/prompt_complete", {
sessionId: requestedSessionId,
promptId: "mock-current-xai-prompt-2",
stopReason: "end_turn",
agentResult: null,
});

return yield* Effect.never;
}

if (emitXAiPromptCompleteThenHang) {
writeJsonRpcNotification("session/update", {
sessionId: requestedSessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "hello from " },
},
});

writeJsonRpcNotification("_x.ai/session/prompt_complete", {
sessionId: requestedSessionId,
promptId: "mock-xai-prompt-1",
stopReason: "end_turn",
agentResult: null,
});

writeJsonRpcNotification("session/update", {
sessionId: requestedSessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "mock" },
},
});

return yield* Effect.never;
}

if (emitInterleavedAssistantToolCalls) {
const toolCallId = "tool-call-1";

Expand Down
210 changes: 210 additions & 0 deletions apps/server/src/provider/Layers/GrokAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as Deferred from "effect/Deferred";
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
import * as Layer from "effect/Layer";
import * as Ref from "effect/Ref";
import * as Schema from "effect/Schema";
import * as Stream from "effect/Stream";

Expand Down Expand Up @@ -175,6 +176,215 @@ it.layer(grokAdapterTestLayer)("GrokAdapterLive", (it) => {
}),
);

it.effect("reports a Grok session running only while the prompt is in flight", () =>
Effect.gen(function* () {
const threadId = ThreadId.make("grok-session-ready-after-prompt");
const wrapperPath = yield* Effect.promise(() =>
makeMockGrokWrapper({
T3_ACP_EMIT_TOOL_CALLS: "1",
}),
);
const adapter = yield* makeTestAdapter(wrapperPath);
const requestOpened =
yield* Deferred.make<Extract<ProviderRuntimeEvent, { type: "request.opened" }>>();
const eventsFiber = yield* Stream.runForEach(adapter.streamEvents, (event) =>
event.type === "request.opened"
? Deferred.succeed(requestOpened, event).pipe(Effect.ignore)
: Effect.void,
).pipe(Effect.forkChild);

yield* adapter.startSession({
threadId,
provider: ProviderDriverKind.make("grok"),
cwd: process.cwd(),
runtimeMode: "approval-required",
modelSelection: { instanceId: ProviderInstanceId.make("grok"), model: "grok-build" },
});

const sendTurnFiber = yield* adapter
.sendTurn({ threadId, input: "check lifecycle", attachments: [] })
.pipe(Effect.forkChild);
const requestOpenedEvent = yield* Deferred.await(requestOpened);

const runningSessions = yield* adapter.listSessions();
const runningSession = runningSessions.find((session) => session.threadId === threadId);
assert.equal(runningSession?.status, "running");
assert.isDefined(runningSession?.activeTurnId);

yield* adapter.respondToRequest(
threadId,
ApprovalRequestId.make(String(requestOpenedEvent.requestId)),
"accept",
);
yield* Fiber.join(sendTurnFiber);

const readySessions = yield* adapter.listSessions();
const readySession = readySessions.find((session) => session.threadId === threadId);
assert.equal(readySession?.status, "ready");
assert.isUndefined(readySession?.activeTurnId);

yield* Fiber.interrupt(eventsFiber);
yield* adapter.stopSession(threadId);
}),
);

it.effect("completes a Grok turn from xAI prompt completion when the prompt RPC hangs", () =>
Effect.gen(function* () {
const threadId = ThreadId.make("grok-xai-prompt-complete-fallback");
const wrapperPath = yield* Effect.promise(() =>
makeMockGrokWrapper({
T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG: "1",
}),
);
const adapter = yield* makeTestAdapter(wrapperPath);

const runtimeEvents: ProviderRuntimeEvent[] = [];
const turnCompleted = yield* Deferred.make<void>();
const runtimeEventsFiber = yield* Stream.runForEach(adapter.streamEvents, (event) =>
Effect.sync(() => {
runtimeEvents.push(event);
}).pipe(
Effect.andThen(
event.type === "turn.completed"
? Deferred.succeed(turnCompleted, undefined)
: Effect.void,
),
),
).pipe(Effect.forkChild);

yield* adapter.startSession({
threadId,
provider: ProviderDriverKind.make("grok"),
cwd: process.cwd(),
runtimeMode: "full-access",
modelSelection: { instanceId: ProviderInstanceId.make("grok"), model: "grok-build" },
});

const sendTurnResult = yield* adapter.sendTurn({
threadId,
input: "exercise fallback",
attachments: [],
});

yield* Deferred.await(turnCompleted);
const readySessions = yield* adapter.listSessions();
const readySession = readySessions.find((session) => session.threadId === threadId);
const turnCompletedEvent = runtimeEvents.find(
(event): event is Extract<ProviderRuntimeEvent, { type: "turn.completed" }> =>
event.type === "turn.completed",
);
const eventTypes = runtimeEvents.map((event) => event.type);

assert.equal(sendTurnResult.threadId, threadId);
assert.include(eventTypes, "turn.completed");
assert.equal(turnCompletedEvent?.payload.stopReason, "end_turn");
assert.equal(readySession?.status, "ready");
assert.isUndefined(readySession?.activeTurnId);

yield* Fiber.interrupt(runtimeEventsFiber);
yield* adapter.stopSession(threadId);
}),
);

it.effect("settles the in-flight prompt before emitting completion", () =>
Effect.gen(function* () {
const threadId = ThreadId.make("grok-completion-before-next-turn");
const wrapperPath = yield* Effect.promise(() => makeMockGrokWrapper());
const adapter = yield* makeTestAdapter(wrapperPath);
const completedCountRef = yield* Ref.make(0);
const secondTurnCompleted = yield* Deferred.make<void>();

const runtimeEventsFiber = yield* Stream.runForEach(adapter.streamEvents, (event) => {
if (event.type !== "turn.completed" || String(event.threadId) !== String(threadId)) {
return Effect.void;
}

return Ref.modify(completedCountRef, (count) => {
const nextCount = count + 1;
return [nextCount, nextCount] as const;
}).pipe(
Effect.flatMap((count) => {
if (count === 1) {
return adapter
.sendTurn({
threadId,
input: "second turn after completion",
attachments: [],
})
.pipe(Effect.forkChild, Effect.asVoid);
}
if (count === 2) {
return Deferred.succeed(secondTurnCompleted, undefined).pipe(Effect.asVoid);
}
return Effect.void;
}),
);
}).pipe(Effect.forkChild);

yield* adapter.startSession({
threadId,
provider: ProviderDriverKind.make("grok"),
cwd: process.cwd(),
runtimeMode: "full-access",
modelSelection: { instanceId: ProviderInstanceId.make("grok"), model: "grok-build" },
});

yield* adapter.sendTurn({
threadId,
input: "first turn",
attachments: [],
});
yield* Deferred.await(secondTurnCompleted);

const completedCount = yield* Ref.get(completedCountRef);
const readySessions = yield* adapter.listSessions();
const readySession = readySessions.find((session) => session.threadId === threadId);

assert.equal(completedCount, 2);
assert.equal(readySession?.status, "ready");
assert.isUndefined(readySession?.activeTurnId);

yield* Fiber.interrupt(runtimeEventsFiber);
yield* adapter.stopSession(threadId);
}),
);

it.effect("restores a Grok session to ready when the prompt RPC fails", () =>
Effect.gen(function* () {
const threadId = ThreadId.make("grok-prompt-failure-ready");
const wrapperPath = yield* Effect.promise(() =>
makeMockGrokWrapper({
T3_ACP_FAIL_PROMPT: "1",
}),
);
const adapter = yield* makeTestAdapter(wrapperPath);

yield* adapter.startSession({
threadId,
provider: ProviderDriverKind.make("grok"),
cwd: process.cwd(),
runtimeMode: "full-access",
modelSelection: { instanceId: ProviderInstanceId.make("grok"), model: "grok-build" },
});

const error = yield* Effect.flip(
adapter.sendTurn({
threadId,
input: "fail prompt",
attachments: [],
}),
);
const readySessions = yield* adapter.listSessions();
const readySession = readySessions.find((session) => session.threadId === threadId);

assert.equal(error._tag, "ProviderAdapterRequestError");
assert.equal(readySession?.status, "ready");
assert.isUndefined(readySession?.activeTurnId);

yield* adapter.stopSession(threadId);
}),
);

it.effect("rejects startSession when provider mismatches", () =>
Effect.gen(function* () {
const wrapperPath = yield* Effect.promise(() => makeMockGrokWrapper());
Expand Down
Loading
Loading