Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/agent/src/acp-extensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ export const POSTHOG_NOTIFICATIONS = {

/** Marks a boundary for log compaction */
COMPACT_BOUNDARY: "_posthog/compact_boundary",

/** Token usage update for a session turn */
USAGE_UPDATE: "_posthog/usage_update",
} as const;
22 changes: 13 additions & 9 deletions packages/agent/src/adapters/claude/claude-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
} from "@anthropic-ai/claude-agent-sdk";
import { v7 as uuidv7 } from "uuid";
import packageJson from "../../../package.json" with { type: "json" };
import { POSTHOG_NOTIFICATIONS } from "../../acp-extensions";
import { unreachable, withTimeout } from "../../utils/common";
import { Logger } from "../../utils/logger";
import { Pushable } from "../../utils/streams";
Expand Down Expand Up @@ -442,16 +443,19 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
});
}

await this.client.extNotification("_posthog/usage_update", {
sessionId: params.sessionId,
used: {
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
cachedReadTokens: message.usage.cache_read_input_tokens,
cachedWriteTokens: message.usage.cache_creation_input_tokens,
await this.client.extNotification(
POSTHOG_NOTIFICATIONS.USAGE_UPDATE,
{
sessionId: params.sessionId,
used: {
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
cachedReadTokens: message.usage.cache_read_input_tokens,
cachedWriteTokens: message.usage.cache_creation_input_tokens,
},
cost: message.total_cost_usd,
},
cost: message.total_cost_usd,
});
);

const usage: Usage = {
inputTokens: this.session.accumulatedUsage.inputTokens,
Expand Down
125 changes: 51 additions & 74 deletions packages/agent/src/adapters/codex/codex-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import {
type AgentSideConnection,
type AuthenticateRequest,
type CancelNotification,
ClientSideConnection,
type ForkSessionRequest,
type ForkSessionResponse,
Expand All @@ -37,9 +36,8 @@ import {
import packageJson from "../../../package.json" with { type: "json" };
import { POSTHOG_NOTIFICATIONS } from "../../acp-extensions";
import {
CODEX_NATIVE_MODES,
type CodexNativeMode,
type PermissionMode,
CODE_EXECUTION_MODES,
type CodeExecutionMode,
} from "../../execution-mode";
import type { ProcessSpawnedCallback } from "../../types";
import { Logger } from "../../utils/logger";
Expand Down Expand Up @@ -85,19 +83,26 @@ type CodexSession = BaseSession & {
settingsManager: CodexSettingsManager;
};

function toPermissionMode(mode?: string): PermissionMode {
if (mode && (CODEX_NATIVE_MODES as readonly string[]).includes(mode)) {
return mode as CodexNativeMode;
function toCodeExecutionMode(mode?: string): CodeExecutionMode {
if (mode && (CODE_EXECUTION_MODES as readonly string[]).includes(mode)) {
return mode as CodeExecutionMode;
}
return "auto";
return "default";
}

const CODEX_NATIVE_MODE: Record<CodeExecutionMode, string> = {
default: "default",
acceptEdits: "default",
plan: "plan",
bypassPermissions: "default",
};

export class CodexAcpAgent extends BaseAcpAgent {
readonly adapterName = "codex";
declare session: CodexSession;
private codexProcess: CodexProcess;
private codexConnection!: ClientSideConnection;
private sessionState!: CodexSessionState;
private codexConnection: ClientSideConnection;
private sessionState: CodexSessionState;

constructor(client: AgentSideConnection, options: CodexAcpAgentOptions) {
super(client);
Expand Down Expand Up @@ -126,29 +131,14 @@ export class CodexAcpAgent extends BaseAcpAgent {
cancelled: false,
};

this.sessionState = createSessionState("", cwd);

// Create the ClientSideConnection to codex-acp.
// The Client handler delegates all requests from codex-acp to the upstream
// PostHog Code client via our AgentSideConnection.
this.codexConnection = new ClientSideConnection(
(_agent) =>
createCodexClient(
this.client,
this.logger,
this.sessionState ?? {
sessionId: "",
cwd: "",
modeId: "auto",
configOptions: [],
accumulatedUsage: {
inputTokens: 0,
outputTokens: 0,
cachedReadTokens: 0,
cachedWriteTokens: 0,
},
permissionMode: "auto",
cancelled: false,
},
),
createCodexClient(this.client, this.logger, this.sessionState),
codexStream,
);
}
Expand Down Expand Up @@ -195,7 +185,7 @@ export class CodexAcpAgent extends BaseAcpAgent {
taskId: meta?.taskId ?? meta?.persistence?.taskId,
modeId: response.modes?.currentModeId ?? "default",
modelId: response.models?.currentModelId,
permissionMode: toPermissionMode(meta?.permissionMode),
permissionMode: toCodeExecutionMode(meta?.permissionMode),
});
this.sessionId = response.sessionId;
this.sessionState.configOptions = response.configOptions ?? [];
Expand All @@ -219,9 +209,11 @@ export class CodexAcpAgent extends BaseAcpAgent {

async loadSession(params: LoadSessionRequest): Promise<LoadSessionResponse> {
const response = await this.codexConnection.loadSession(params);
const meta = params._meta as NewSessionMeta | undefined;

// Update session state
this.sessionState = createSessionState(params.sessionId, params.cwd);
this.sessionState = createSessionState(params.sessionId, params.cwd, {
permissionMode: toCodeExecutionMode(meta?.permissionMode),
});
this.sessionId = params.sessionId;
this.sessionState.configOptions = response.configOptions ?? [];

Expand All @@ -238,11 +230,15 @@ export class CodexAcpAgent extends BaseAcpAgent {
mcpServers: params.mcpServers ?? [],
});

this.sessionState = createSessionState(params.sessionId, params.cwd);
const meta = params._meta as NewSessionMeta | undefined;
this.sessionState = createSessionState(params.sessionId, params.cwd, {
taskRunId: meta?.taskRunId,
taskId: meta?.taskId ?? meta?.persistence?.taskId,
permissionMode: toCodeExecutionMode(meta?.permissionMode),
});
this.sessionId = params.sessionId;
this.sessionState.configOptions = loadResponse.configOptions ?? [];

const meta = params._meta as NewSessionMeta | undefined;
if (meta?.taskRunId) {
await this.client.extNotification(POSTHOG_NOTIFICATIONS.SDK_SESSION, {
taskRunId: meta.taskRunId,
Expand All @@ -268,7 +264,12 @@ export class CodexAcpAgent extends BaseAcpAgent {
_meta: params._meta,
});

this.sessionState = createSessionState(newResponse.sessionId, params.cwd);
const meta = params._meta as NewSessionMeta | undefined;
this.sessionState = createSessionState(newResponse.sessionId, params.cwd, {
taskRunId: meta?.taskRunId,
taskId: meta?.taskId ?? meta?.persistence?.taskId,
permissionMode: toCodeExecutionMode(meta?.permissionMode),
});
this.sessionId = newResponse.sessionId;
this.sessionState.configOptions = newResponse.configOptions ?? [];

Expand All @@ -284,31 +285,21 @@ export class CodexAcpAgent extends BaseAcpAgent {
async unstable_listSessions(
params: ListSessionsRequest,
): Promise<ListSessionsResponse> {
return this.codexConnection.listSessions(params);
return this.listSessions(params);
}

async prompt(params: PromptRequest): Promise<PromptResponse> {
if (this.sessionState) {
this.sessionState.cancelled = false;
this.sessionState.interruptReason = undefined;
resetUsage(this.sessionState);
}
this.session.cancelled = false;
this.session.interruptReason = undefined;
resetUsage(this.sessionState);

const response = await this.codexConnection.prompt(params);

if (this.sessionState && response.usage) {
// Accumulate token usage from the prompt response
this.sessionState.accumulatedUsage.inputTokens +=
response.usage.inputTokens ?? 0;
this.sessionState.accumulatedUsage.outputTokens +=
response.usage.outputTokens ?? 0;
this.sessionState.accumulatedUsage.cachedReadTokens +=
response.usage.cachedReadTokens ?? 0;
this.sessionState.accumulatedUsage.cachedWriteTokens +=
response.usage.cachedWriteTokens ?? 0;
}
// Usage is already accumulated via sessionUpdate notifications in
// codex-client.ts. Do NOT also add response.usage here or tokens
// get double-counted.

if (this.sessionState?.taskRunId) {
if (this.sessionState.taskRunId) {
const { accumulatedUsage } = this.sessionState;

await this.client.extNotification(POSTHOG_NOTIFICATIONS.TURN_COMPLETE, {
Expand All @@ -328,7 +319,7 @@ export class CodexAcpAgent extends BaseAcpAgent {
});

if (response.usage) {
await this.client.extNotification("_posthog/usage_update", {
await this.client.extNotification(POSTHOG_NOTIFICATIONS.USAGE_UPDATE, {
sessionId: params.sessionId,
used: {
inputTokens: response.usage.inputTokens ?? 0,
Expand All @@ -345,47 +336,32 @@ export class CodexAcpAgent extends BaseAcpAgent {
}

protected async interrupt(): Promise<void> {
if (this.sessionState) {
this.sessionState.cancelled = true;
}
await this.codexConnection.cancel({
sessionId: this.sessionId,
});
}

async cancel(params: CancelNotification): Promise<void> {
if (this.sessionState) {
this.sessionState.cancelled = true;
const meta = params._meta as { interruptReason?: string } | undefined;
if (meta?.interruptReason) {
this.sessionState.interruptReason = meta.interruptReason;
}
}
await this.codexConnection.cancel(params);
}

async setSessionMode(
params: SetSessionModeRequest,
): Promise<SetSessionModeResponse> {
const permissionMode = toPermissionMode(params.modeId);
const requestedMode = toCodeExecutionMode(params.modeId);
const nativeMode = CODEX_NATIVE_MODE[requestedMode];

const response = await this.codexConnection.setSessionMode({
...params,
modeId: permissionMode,
modeId: nativeMode,
});

if (this.sessionState) {
this.sessionState.modeId = permissionMode;
this.sessionState.permissionMode = permissionMode;
}
this.sessionState.modeId = nativeMode;
this.sessionState.permissionMode = requestedMode;
return response ?? {};
}

async setSessionConfigOption(
params: SetSessionConfigOptionRequest,
): Promise<SetSessionConfigOptionResponse> {
const response = await this.codexConnection.setSessionConfigOption(params);
if (this.sessionState && response.configOptions) {
if (response.configOptions) {
this.sessionState.configOptions = response.configOptions;
}
return response;
Expand All @@ -397,6 +373,7 @@ export class CodexAcpAgent extends BaseAcpAgent {

async closeSession(): Promise<void> {
this.logger.info("Closing Codex session", { sessionId: this.sessionId });
this.session.abortController.abort();
this.session.settingsManager.dispose();
try {
this.codexProcess.kill();
Expand Down
15 changes: 8 additions & 7 deletions packages/agent/src/adapters/codex/codex-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import type {
WriteTextFileRequest,
WriteTextFileResponse,
} from "@agentclientprotocol/sdk";
import type { PermissionMode } from "../../execution-mode";
import type { CodeExecutionMode } from "../../execution-mode";
import type { Logger } from "../../utils/logger";
import type { CodexSessionState } from "./session-state";

Expand All @@ -38,10 +38,11 @@ export interface CodexClientCallbacks {
onUsageUpdate?: (update: Record<string, unknown>) => void;
}

const AUTO_APPROVED_KINDS: Partial<Record<PermissionMode, Set<ToolKind>>> = {
auto: new Set(["read", "search", "fetch", "think"]),
"read-only": new Set(["read", "search", "fetch", "think"]),
"full-access": new Set([
const AUTO_APPROVED_KINDS: Record<CodeExecutionMode, Set<ToolKind>> = {
default: new Set(["read", "search", "fetch", "think"]),
acceptEdits: new Set(["read", "edit", "search", "fetch", "think"]),
plan: new Set(["read", "search", "fetch", "think"]),
bypassPermissions: new Set([
"read",
"edit",
"delete",
Expand All @@ -56,10 +57,10 @@ const AUTO_APPROVED_KINDS: Partial<Record<PermissionMode, Set<ToolKind>>> = {
};

function shouldAutoApprove(
mode: PermissionMode,
mode: CodeExecutionMode,
kind: ToolKind | null | undefined,
): boolean {
if (mode === "full-access") return true;
if (mode === "bypassPermissions") return true;
if (!kind) return false;
return AUTO_APPROVED_KINDS[mode]?.has(kind) ?? false;
}
Expand Down
13 changes: 5 additions & 8 deletions packages/agent/src/adapters/codex/session-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import type { SessionConfigOption } from "@agentclientprotocol/sdk";
import type { PermissionMode } from "../../execution-mode";
import type { CodeExecutionMode } from "../../execution-mode";

export interface CodexUsage {
inputTokens: number;
Expand All @@ -22,9 +22,7 @@ export interface CodexSessionState {
accumulatedUsage: CodexUsage;
contextSize?: number;
contextUsed?: number;
permissionMode: PermissionMode;
cancelled: boolean;
interruptReason?: string;
permissionMode: CodeExecutionMode;
taskRunId?: string;
taskId?: string;
}
Expand All @@ -37,13 +35,13 @@ export function createSessionState(
taskId?: string;
modeId?: string;
modelId?: string;
permissionMode?: PermissionMode;
permissionMode?: CodeExecutionMode;
},
): CodexSessionState {
return {
sessionId,
cwd,
modeId: opts?.modeId ?? "auto",
modeId: opts?.modeId ?? "default",
modelId: opts?.modelId,
configOptions: [],
accumulatedUsage: {
Expand All @@ -52,8 +50,7 @@ export function createSessionState(
cachedReadTokens: 0,
cachedWriteTokens: 0,
},
permissionMode: opts?.permissionMode ?? "auto",
cancelled: false,
permissionMode: opts?.permissionMode ?? "default",
taskRunId: opts?.taskRunId,
taskId: opts?.taskId,
};
Expand Down
2 changes: 2 additions & 0 deletions packages/agent/src/adapters/codex/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ function buildConfigArgs(options: CodexProcessOptions): string[] {
if (options.instructions) {
const escaped = options.instructions
.replace(/\\/g, "\\\\")
.replace(/\n/g, "\\n")
.replace(/\r/g, "\\r")
.replace(/"/g, '\\"');
args.push("-c", `instructions="${escaped}"`);
}
Expand Down
Loading