diff --git a/.changeset/agent-goal-tracking.md b/.changeset/agent-goal-tracking.md new file mode 100644 index 0000000000..9a87ad814c --- /dev/null +++ b/.changeset/agent-goal-tracking.md @@ -0,0 +1,81 @@ +--- +'@electric-ax/agents-server-ui': patch +'@electric-ax/agents-runtime': patch +'@electric-ax/agents': patch +--- + +Add `/goal` slash command to Horton sessions. Lets the user set an +objective with an optional token budget; the agent works autonomously +toward the goal and stops when it calls `mark_goal_complete` or when +the run exceeds the budget. + +```text +/goal set "ship feature X" --tokens 50k # default 50k tokens +/goal set "explore" --unlimited # opt out of the cap +/goal show # current state +/goal complete # mark done manually +/goal clear # remove the goal +``` + +## Behaviour + +- **One goal per session**, persisted as a `kind: 'goal'` entry on the + `manifests` collection — resumes automatically across desktop + restarts. +- **Mid-run token enforcement**: an `onStepEnd` hook on the outbound + bridge surfaces per-step token counts; Horton accumulates them and + aborts the active `ctx.agent.run()` via an `AbortController` once + `tokensUsed >= tokenBudget`. The cap counts **new input (fresh + + cache-write tokens) + output** per step — prompt-cache reads (which + re-count the whole conversation on every warm step) are excluded, so + the budget tracks new work rather than context size. +- **Live progress**: the goal banner ticks up after each step. The + manifest update is written via `writeEvent` directly (not the + wake-session's staged manifest transaction, which only commits at + end-of-wake — too late for a long-running run). +- **`mark_goal_complete` tool**: registered on Horton's tool list. + Flips status to `complete`, surfaces in the chat as an ordinary + agent reply via the new `ctx.replyText` helper. +- **State-changing `/goal` commands interrupt the active run** — + typing `/goal complete`, `/goal clear`, or `/goal set` while a run + is in flight signals SIGINT alongside sending the message, so the + prior run aborts instead of finishing the old work first. `/goal + show` is read-only and does not interrupt. +- **Budget-limited stop message**: when the cap is hit mid-run, the + agent posts a synthetic reply explaining what happened and + suggesting a larger budget to resume. + +## Plumbing + +- `entity-schema.ts` — new `ManifestGoalEntryValue` (objective, + status, tokenBudget, tokensUsed, createdAt, updatedAt) added to the + manifest discriminated union. +- `goal-api.ts` (new) — `setGoal` / `clearGoal` / `getGoal` / + `markGoalComplete` / `updateGoalUsage`. All goal mutations share a + single ordered write channel (direct `writeEvent` upserts, live for + the UI) plus an in-wake read-your-writes cache, so a mutation firing + mid-run can never snapshot — and replay — a stale `tokensUsed` over + a fresher one. `updateGoalUsage` additionally never decreases the + counter. +- `goal-command.ts` (new) — `/goal` parser (`--tokens N|50k|1.2m| + unlimited`, `--unlimited` flag, subcommand aliases `done`/`status`) + and dispatcher. +- `tools/goal-tools.ts` (new) — `createMarkGoalCompleteTool` exposes + the completion signal to the LLM. +- `outbound-bridge.ts` — new optional `OutboundBridgeHooks.onStepEnd` + callback, threaded through `pi-adapter` and the `AgentConfig` passed + to `useAgent`. +- `context-factory.ts` — `AgentHandle.run` now accepts an optional + `abortSignal` and combines it with the runtime's `runSignal`. New + `ctx.replyText(text)` writes a complete runs + texts + textDeltas + sequence so synthetic replies render in the chat. New goal-related + methods exposed on `HandlerContext`. +- `horton.ts` — `tryHandleSlashCommand` intercepts `/goal *` before + the LLM; `/goal set` enqueues a one-shot kickoff so the agent starts + immediately; `assistantHandler` wires the budget-enforcing + `onStepEnd`, aborts on overflow, and posts the explanation reply. +- `agents-server-ui` — new `GoalBanner` component above the timeline + (objective + budget bar + status badge). `MessageInput` aborts the + active run when a state-changing `/goal` command is submitted. + `EntityTimeline` / `EntityContextDrawer` handle the new `goal` + manifest kind. diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index c25001f662..b242362ca6 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -40,6 +40,12 @@ export { createSlashCommandTokenRegex, SLASH_COMMAND_TRIGGER_REGEX, } from './composer-input' +// The /goal text grammar — pure parsing, shared with the UI so composer +// behavior (e.g. which subcommands interrupt a running agent) can't +// drift from the runtime dispatcher. +export { isGoalCommandText, parseGoalCommand } from './goal-command' +export { formatTokenCount } from './token-budget' +export type { GoalCommand } from './goal-command' export type { EntityStreamDB, @@ -58,8 +64,10 @@ export type { AttachmentStatus, AttachmentSubject, AttachmentSubjectType, + GoalStatus, Manifest, ManifestAttachmentEntry, + ManifestGoalEntry, } from './entity-schema' export type { AttachmentCreateInput, diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index 006b5c7241..c635b145a1 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -2,8 +2,13 @@ import { queryOnce } from '@durable-streams/state/db' import { assembleContext } from './context-assembly' import { createContextEntriesApi } from './context-entries' import { entityStateSchema } from './entity-schema' +import { createGoalApi } from './goal-api' import { formatPointerOrderToken } from './event-pointer' -import { createOutboundBridge, loadOutboundIdSeed } from './outbound-bridge' +import { + allocateRunKey, + createOutboundBridge, + loadOutboundIdSeed, +} from './outbound-bridge' import { createPiAgentAdapter } from './pi-adapter' import { timelineMessages as runtimeTimelineMessages, @@ -249,6 +254,29 @@ function getTriggerMessageText( }) } +function combineAbortSignals(a: AbortSignal, b: AbortSignal): AbortSignal { + // Prefer the platform helper when available (Node 20+, modern browsers). + const any = ( + AbortSignal as unknown as { + any?: (sigs: Array) => AbortSignal + } + ).any + if (typeof any === `function`) return any.call(AbortSignal, [a, b]) + const controller = new AbortController() + const linkTo = (source: AbortSignal): void => { + if (source.aborted) { + controller.abort(source.reason) + return + } + source.addEventListener(`abort`, () => controller.abort(source.reason), { + once: true, + }) + } + linkTo(a) + linkTo(b) + return controller.signal +} + function toHandlerWake(wakeEvent: WakeEvent): HandlerWake { if (wakeEvent.type === `inbox`) { return { @@ -450,23 +478,16 @@ export function createHandlerContext( let useContextConfig: UseContextConfig | null = null let useContextHash = `` let useContextRegistrations = 0 - // Lazy-loaded run-id counter used by ctx.recordRun(). Initialized - // from the runs already present in the entity's StreamDB so keys - // remain monotonic across handler invocations. - let recordRunCounter: number | null = null + // Run-id allocation for ctx.recordRun() / ctx.replyText(). Delegates + // to the outbound bridge's shared id-seed cache so synthetic runs + // can't collide with `run-N` keys the bridge allocated for events + // that haven't round-tripped into the local collection yet. The local + // floor keeps sequential allocations monotonic within this handler + // even when the collection lags (or has no stable id, as in tests). + let localRunFloor = 0 const nextRunKey = (): string => { - if (recordRunCounter == null) { - let max = 0 - const rows = config.db.collections.runs.toArray as Array<{ key: string }> - for (const row of rows) { - const m = row.key.match(/^run-(\d+)/) - if (!m) continue - max = Math.max(max, parseInt(m[1]!, 10) + 1) - } - recordRunCounter = max - } - const key = `run-${recordRunCounter}` - recordRunCounter += 1 + const key = allocateRunKey(config.db, localRunFloor) + localRunFloor = parseInt(key.slice(`run-`.length), 10) + 1 return key } @@ -476,6 +497,12 @@ export function createHandlerContext( wakeSession: config.wakeSession, }) + const goalApi = createGoalApi({ + db: config.db, + wakeSession: config.wakeSession, + writeEvent: config.writeEvent, + }) + const listAttachments: AttachmentsApi[`list`] = (filter) => { const attachments = config.db.collections.manifests.toArray .filter((entry) => entry.kind === `attachment`) @@ -713,7 +740,10 @@ export function createHandlerContext( } const agent: AgentHandle = { - async run(input?: string): Promise { + async run( + input?: string, + abortSignal?: AbortSignal + ): Promise { if (!agentConfig) { throw new Error( `[agent-runtime] agent.run() called without useAgent().` @@ -755,6 +785,8 @@ export function createHandlerContext( getApiKey: activeAgentConfig.getApiKey, onPayload: activeAgentConfig.onPayload, + + onStepEnd: activeAgentConfig.onStepEnd, }) const handle = adapterFactory({ entityUrl: config.entityUrl, @@ -802,7 +834,11 @@ export function createHandlerContext( ) } - await handle.run(runInput, config.runSignal) + const combinedSignal = + config.runSignal && abortSignal + ? combineAbortSignals(config.runSignal, abortSignal) + : (abortSignal ?? config.runSignal) + await handle.run(runInput, combinedSignal) runtimeLog.info(logPrefix, `agent.run completed`) return { @@ -947,6 +983,11 @@ export function createHandlerContext( removeContext: contextApi.removeContext, getContext: contextApi.getContext, listContext: contextApi.listContext, + setGoal: goalApi.setGoal, + clearGoal: goalApi.clearGoal, + getGoal: goalApi.getGoal, + markGoalComplete: goalApi.markGoalComplete, + updateGoalUsage: goalApi.updateGoalUsage, __debug: { useContextRegistrations: () => useContextRegistrations, }, @@ -1040,6 +1081,53 @@ export function createHandlerContext( }, } }, + // Renders `text` as an ordinary assistant message in the chat without + // calling the LLM. Used for runtime-driven replies like slash-command + // responses and budget-limit notices. The five writes synthesize the + // same run + text + delta event sequence the outbound bridge would + // emit for a real LLM turn; the UI needs all of them to render. + replyText(text: string): void { + if (typeof text !== `string` || text.length === 0) return + const runKey = nextRunKey() + const msgKey = `${runKey}:msg` + config.writeEvent( + entityStateSchema.runs.insert({ + key: runKey, + value: { status: `started` } as never, + }) as ChangeEvent + ) + config.writeEvent( + entityStateSchema.texts.insert({ + key: msgKey, + value: { status: `streaming`, run_id: runKey } as never, + }) as ChangeEvent + ) + config.writeEvent( + entityStateSchema.textDeltas.insert({ + key: `${msgKey}:0`, + value: { + text_id: msgKey, + run_id: runKey, + delta: text, + } as never, + }) as ChangeEvent + ) + config.writeEvent( + entityStateSchema.texts.update({ + key: msgKey, + value: { status: `completed`, run_id: runKey } as never, + }) as ChangeEvent + ) + config.writeEvent( + entityStateSchema.runs.update({ + key: runKey, + value: { + status: `completed`, + finish_reason: `stop`, + } as never, + }) as ChangeEvent + ) + }, sleep(): void { sleepRequested = true }, @@ -1051,5 +1139,8 @@ export function createHandlerContext( }, } - return { ctx, getSleepRequested: () => sleepRequested } + return { + ctx, + getSleepRequested: () => sleepRequested, + } } diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index 0db9426126..aa4a224698 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -355,6 +355,25 @@ type ManifestFutureSendScheduleEntryValue = { failedAt?: string lastError?: string } +type GoalStatusValue = `active` | `complete` | `budget_limited` +type ManifestGoalEntryValue = { + key?: string + kind: `goal` + id: string + objective: string + status: GoalStatusValue + // `null` means unbounded — the user must opt in explicitly. + tokenBudget: number | null + // Maintained by the handler's in-memory step accumulator (the single + // write path for usage); enforcement aborts mid-run via the step-end hook. + tokensUsed: number + // Optional completion note recorded by mark_goal_complete — what was + // accomplished, or what blocked the goal. + summary?: string + // ISO strings, matching every other manifest kind. + createdAt: string + updatedAt: string +} type ReplayWatermarkValue = { key?: string source_id: string @@ -728,6 +747,7 @@ function createManifestSchema(): Schema< | ManifestContextEntryValue | ManifestCronScheduleEntryValue | ManifestFutureSendScheduleEntryValue + | ManifestGoalEntryValue > { return z.union([ z.object({ @@ -830,6 +850,19 @@ function createManifestSchema(): Schema< failedAt: z.string().optional(), lastError: z.string().optional(), }), + z.object({ + key: z.string().optional(), + ...timelineOrderField, + kind: z.literal(`goal`), + id: z.string(), + objective: z.string(), + status: z.enum([`active`, `complete`, `budget_limited`]), + tokenBudget: z.number().int().positive().nullable(), + tokensUsed: z.number().int().nonnegative(), + summary: z.string().optional(), + createdAt: z.string(), + updatedAt: z.string(), + }), ]) as unknown as Schema< | ManifestChildEntryValue | ManifestSourceEntryValue @@ -839,6 +872,7 @@ function createManifestSchema(): Schema< | ManifestContextEntryValue | ManifestCronScheduleEntryValue | ManifestFutureSendScheduleEntryValue + | ManifestGoalEntryValue > } @@ -893,6 +927,8 @@ export type ManifestCronScheduleEntry = SequencedPersistedRow export type ManifestFutureSendScheduleEntry = SequencedPersistedRow +export type GoalStatus = GoalStatusValue +export type ManifestGoalEntry = SequencedPersistedRow type ManifestUnion = | ManifestChildEntry | ManifestSourceEntry @@ -902,6 +938,7 @@ type ManifestUnion = | ManifestContextEntry | ManifestCronScheduleEntry | ManifestFutureSendScheduleEntry + | ManifestGoalEntry export type Manifest = ManifestUnion & { id?: string entity_url?: string @@ -933,10 +970,15 @@ export type Manifest = ManifestUnion & { targetUrl?: string producerId?: string messageType?: string - status?: FutureSendScheduleStatus | AttachmentStatusValue + status?: FutureSendScheduleStatus | AttachmentStatusValue | GoalStatusValue sentAt?: string failedAt?: string lastError?: string + objective?: string + tokenBudget?: number | null + tokensUsed?: number + summary?: string + updatedAt?: string } export type ReplayWatermark = SequencedPersistedRow diff --git a/packages/agents-runtime/src/goal-api.ts b/packages/agents-runtime/src/goal-api.ts new file mode 100644 index 0000000000..f2f08eeecc --- /dev/null +++ b/packages/agents-runtime/src/goal-api.ts @@ -0,0 +1,227 @@ +import { entityStateSchema } from './entity-schema' +import type { ChangeEvent } from '@durable-streams/state' +import type { + EntityStreamDBWithActions, + GoalEntry, + GoalInput, + ManifestGoalEntry, + WakeSession, +} from './types' + +export interface GoalApi { + setGoal: (input: GoalInput) => GoalEntry + clearGoal: () => boolean + getGoal: () => GoalEntry | undefined + markGoalComplete: (summary?: string) => GoalEntry | undefined + /** + * Persist an accurate `tokensUsed` from an authoritative source (e.g. the + * runtime's in-memory step accumulator, which doesn't suffer from the + * round-trip lag of summing the steps collection). Optionally also flips + * status. Never decreases `tokensUsed`. + */ + updateGoalUsage: ( + tokensUsed: number, + opts?: { status?: GoalEntry[`status`] } + ) => GoalEntry | undefined +} + +export const GOAL_MANIFEST_KEY = `goal` +const GOAL_ID = `goal` +export const DEFAULT_TOKEN_BUDGET = 50_000 + +function isGoalManifest( + row: unknown +): row is Record & { kind: `goal` } { + return ( + typeof row === `object` && + row !== null && + (row as { kind?: unknown }).kind === `goal` + ) +} + +function toGoalEntry(row: Record): GoalEntry { + const rawBudget = row.tokenBudget + const tokenBudget = + rawBudget === null + ? null + : typeof rawBudget === `number` + ? rawBudget + : DEFAULT_TOKEN_BUDGET + return { + id: String(row.id ?? GOAL_ID), + objective: String(row.objective ?? ``), + status: (row.status ?? `active`) as GoalEntry[`status`], + tokenBudget, + tokensUsed: typeof row.tokensUsed === `number` ? row.tokensUsed : 0, + ...(typeof row.summary === `string` && row.summary + ? { summary: row.summary } + : {}), + createdAt: typeof row.createdAt === `string` ? row.createdAt : ``, + updatedAt: typeof row.updatedAt === `string` ? row.updatedAt : ``, + } +} + +export function createGoalApi(opts: { + db: EntityStreamDBWithActions + wakeSession: WakeSession + writeEvent?: (event: ChangeEvent) => void + now?: () => string +}): GoalApi { + const now = opts.now ?? (() => new Date().toISOString()) + + // Read-your-writes cache for the duration of this wake. Events written via + // writeEvent only reach the local manifests collection after a round-trip + // through the stream, so a read straight after a write would observe the + // previous value — e.g. mark_goal_complete firing mid-run would snapshot a + // stale tokensUsed and clobber the fresher per-step counter. + let lastWritten: ManifestGoalEntry | undefined + let clearedThisWake = false + + function readRaw(): + | (ManifestGoalEntry & Record) + | undefined { + if (clearedThisWake) return undefined + if (lastWritten) { + return lastWritten as ManifestGoalEntry & Record + } + for (const row of opts.db.collections.manifests.toArray) { + if (isGoalManifest(row)) { + return row as ManifestGoalEntry & Record + } + } + return undefined + } + + function readLive(): GoalEntry | undefined { + const raw = readRaw() + return raw ? toGoalEntry(raw) : undefined + } + + // Single write channel: every goal mutation goes directly through + // writeEvent (live, ordered) when it is wired. Mixing this with the + // wake-session's staged manifest transaction (which replays at + // end-of-wake) re-introduces the ordering race where a snapshot staged + // mid-run lands after — and overwrites — fresher live writes. The staged + // path remains only as a fallback for tests that don't wire writeEvent. + function persist(entry: ManifestGoalEntry): GoalEntry { + if (opts.writeEvent) { + opts.writeEvent( + entityStateSchema.manifests.upsert({ + key: GOAL_MANIFEST_KEY, + value: entry as never, + }) as ChangeEvent + ) + } else { + opts.wakeSession.registerManifestEntry(entry) + } + lastWritten = entry + clearedThisWake = false + return toGoalEntry(entry as unknown as Record) + } + + return { + setGoal(input) { + const existing = readRaw() + const timestamp = now() + // Re-setting the same objective (e.g. to raise the budget) preserves + // accumulated usage; a new objective starts from zero. + const isSameObjective = existing?.objective === input.objective + const tokenBudget = + input.tokenBudget === undefined + ? DEFAULT_TOKEN_BUDGET + : input.tokenBudget + const entry: ManifestGoalEntry = { + key: GOAL_MANIFEST_KEY, + kind: `goal`, + id: GOAL_ID, + objective: input.objective, + status: input.status ?? `active`, + tokenBudget, + tokensUsed: isSameObjective + ? typeof existing?.tokensUsed === `number` + ? existing.tokensUsed + : 0 + : 0, + createdAt: + isSameObjective && typeof existing?.createdAt === `string` + ? existing.createdAt + : timestamp, + updatedAt: timestamp, + } + return persist(entry) + }, + + clearGoal() { + const existed = readRaw() !== undefined + if (opts.writeEvent) { + if (existed) { + opts.writeEvent( + entityStateSchema.manifests.delete({ + key: GOAL_MANIFEST_KEY, + }) as ChangeEvent + ) + } + } else { + opts.wakeSession.removeManifestEntry(GOAL_MANIFEST_KEY) + } + lastWritten = undefined + clearedThisWake = true + return existed + }, + + getGoal() { + return readLive() + }, + + markGoalComplete(summary) { + const existing = readRaw() + if (!existing) return undefined + const trimmed = summary?.trim() + const next: ManifestGoalEntry = { + ...(existing as unknown as ManifestGoalEntry), + key: GOAL_MANIFEST_KEY, + kind: `goal`, + status: `complete`, + ...(trimmed ? { summary: trimmed } : {}), + updatedAt: now(), + } + return persist(next) + }, + + updateGoalUsage(tokensUsed, opts2) { + const existing = readRaw() + if (!existing) return undefined + const previousTokens = + typeof existing.tokensUsed === `number` ? existing.tokensUsed : 0 + // Never decrease — guards against a stale value clobbering a fresher one. + const nextTokens = Math.max(previousTokens, Math.max(0, tokensUsed)) + const nextStatus = (opts2?.status ?? + existing.status ?? + `active`) as ManifestGoalEntry[`status`] + if (nextTokens === previousTokens && nextStatus === existing.status) { + return toGoalEntry(existing as unknown as Record) + } + const next: ManifestGoalEntry = { + key: GOAL_MANIFEST_KEY, + kind: `goal`, + id: String(existing.id ?? GOAL_ID), + objective: String(existing.objective ?? ``), + status: nextStatus, + tokenBudget: + existing.tokenBudget === null + ? null + : typeof existing.tokenBudget === `number` + ? existing.tokenBudget + : DEFAULT_TOKEN_BUDGET, + tokensUsed: nextTokens, + ...(typeof existing.summary === `string` && existing.summary + ? { summary: existing.summary } + : {}), + createdAt: + typeof existing.createdAt === `string` ? existing.createdAt : now(), + updatedAt: now(), + } + return persist(next) + }, + } +} diff --git a/packages/agents-runtime/src/goal-command.ts b/packages/agents-runtime/src/goal-command.ts new file mode 100644 index 0000000000..50790360d1 --- /dev/null +++ b/packages/agents-runtime/src/goal-command.ts @@ -0,0 +1,241 @@ +import { formatTokenCount } from './token-budget' +import type { SlashCommandDefinition } from './composer-input' +import type { HandlerContext } from './types' + +// Static registry entry so the composer offers /goal in autocomplete. +// Kept next to the parser so the definition and the grammar it +// describes can't drift apart. +export const GOAL_SLASH_COMMAND: SlashCommandDefinition = { + name: `goal`, + description: `Session goal: set [--tokens N|unlimited] | show | complete | clear`, + arguments: [ + { + name: `subcommand`, + type: `string`, + required: true, + description: `set | show | complete | clear`, + }, + { + name: `args`, + type: `string`, + description: `objective text and flags, e.g. "ship X" --tokens 50k`, + }, + ], +} + +export type GoalCommand = + | { kind: `set`; objective: string; tokenBudget?: number | null } + | { kind: `clear` } + | { kind: `show` } + | { kind: `complete` } + | { kind: `error`; message: string } + +const GOAL_PREFIX = `/goal` +const UNLIMITED_TOKENS = new Set([`infinite`, `unlimited`, `none`]) + +export function isGoalCommandText(text: string): boolean { + if (!text) return false + const trimmed = text.trimStart() + if (!trimmed.startsWith(GOAL_PREFIX)) return false + const next = trimmed.charAt(GOAL_PREFIX.length) + return next === `` || next === ` ` || next === `\t` || next === `\n` +} + +export function parseGoalCommand(text: string): GoalCommand { + const trimmed = text.trim() + const afterPrefix = trimmed.slice(GOAL_PREFIX.length).trim() + if (afterPrefix.length === 0) { + return { + kind: `error`, + message: `Usage: /goal set [--tokens N|unlimited] | /goal clear | /goal show | /goal complete`, + } + } + const tokens = tokenize(afterPrefix) + const [subcommand, ...rest] = tokens + switch (subcommand) { + case `set`: + return parseSet(rest) + case `clear`: + return { kind: `clear` } + case `show`: + case `status`: + return { kind: `show` } + case `complete`: + case `done`: + return { kind: `complete` } + default: + return { + kind: `error`, + message: `Unknown subcommand "${subcommand}". Use /goal set, /goal clear, /goal show, or /goal complete.`, + } + } +} + +function parseSet(rest: Array): GoalCommand { + if (rest.length === 0) { + return { + kind: `error`, + message: `Usage: /goal set [--tokens N|unlimited]`, + } + } + // tokenBudget: undefined = use default; null = unlimited; number = explicit cap. + let tokenBudget: number | null | undefined + let unlimited = false + const objectiveParts: Array = [] + for (let i = 0; i < rest.length; i++) { + const token = rest[i]! + if (token === `--unlimited`) { + unlimited = true + continue + } + if (token === `--tokens` || token === `-t`) { + const value = rest[i + 1] + if (!value) { + return { + kind: `error`, + message: `--tokens requires a value (number, "50k", "1m", or "unlimited")`, + } + } + const parsed = parseTokens(value) + if (parsed === `invalid`) { + return { + kind: `error`, + message: `--tokens value "${value}" is not a valid count (use a positive integer, "50k", "1m", or "unlimited")`, + } + } + tokenBudget = parsed + i += 1 + continue + } + if (token.startsWith(`--tokens=`)) { + const value = token.slice(`--tokens=`.length) + const parsed = parseTokens(value) + if (parsed === `invalid`) { + return { + kind: `error`, + message: `--tokens value "${value}" is not a valid count`, + } + } + tokenBudget = parsed + continue + } + objectiveParts.push(token) + } + const objective = objectiveParts.join(` `).trim() + if (!objective) { + return { + kind: `error`, + message: `Objective is required: /goal set [--tokens N]`, + } + } + if (unlimited) tokenBudget = null + return tokenBudget === undefined + ? { kind: `set`, objective } + : { kind: `set`, objective, tokenBudget } +} + +function parseTokens(raw: string): number | null | `invalid` { + const value = raw.trim().toLowerCase() + if (UNLIMITED_TOKENS.has(value)) return null + const match = /^([0-9]+(?:\.[0-9]+)?)([km])?$/.exec(value) + if (!match) return `invalid` + const base = parseFloat(match[1]!) + if (!Number.isFinite(base) || base <= 0) return `invalid` + const suffix = match[2] + const multiplier = suffix === `k` ? 1_000 : suffix === `m` ? 1_000_000 : 1 + return Math.round(base * multiplier) +} + +function tokenize(input: string): Array { + const out: Array = [] + let current = `` + let quote: `"` | `'` | null = null + for (let i = 0; i < input.length; i++) { + const ch = input.charAt(i) + if (quote) { + if (ch === quote) { + quote = null + } else { + current += ch + } + continue + } + if (ch === `"` || ch === `'`) { + quote = ch + continue + } + if (ch === ` ` || ch === `\t` || ch === `\n`) { + if (current.length > 0) { + out.push(current) + current = `` + } + continue + } + current += ch + } + if (current.length > 0) out.push(current) + return out +} + +export interface GoalDispatchResult { + handled: boolean + message?: string +} + +export function dispatchGoalCommand( + ctx: Pick< + HandlerContext, + `setGoal` | `clearGoal` | `getGoal` | `markGoalComplete` + >, + command: GoalCommand +): GoalDispatchResult { + switch (command.kind) { + case `set`: { + const input = + command.tokenBudget === undefined + ? { objective: command.objective } + : { objective: command.objective, tokenBudget: command.tokenBudget } + const entry = ctx.setGoal(input) + const budgetLabel = + entry.tokenBudget === null + ? `unlimited` + : `${formatTokenCount(entry.tokenBudget)} tokens` + return { + handled: true, + message: `Goal set: ${entry.objective} (budget: ${budgetLabel})`, + } + } + case `clear`: { + const cleared = ctx.clearGoal() + return { + handled: true, + message: cleared ? `Goal cleared` : `No goal to clear`, + } + } + case `complete`: { + const updated = ctx.markGoalComplete() + return { + handled: true, + message: updated ? `Goal marked complete` : `No goal to mark complete`, + } + } + case `show`: { + const goal = ctx.getGoal() + if (!goal) return { handled: true, message: `No goal set` } + const budgetLabel = + goal.tokenBudget === null + ? `${formatTokenCount(goal.tokensUsed)} tokens used (unlimited)` + : `${formatTokenCount(goal.tokensUsed)} / ${formatTokenCount( + goal.tokenBudget + )} tokens` + return { + handled: true, + message: `Goal: ${goal.objective} (${budgetLabel}, ${goal.status})${ + goal.summary ? `\nSummary: ${goal.summary}` : `` + }`, + } + } + case `error`: + return { handled: true, message: command.message } + } +} diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 7942ae81e0..5950712c22 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -63,6 +63,10 @@ export type { ContextInserted, ContextRemoved, ContextEntryAttrs, + GoalEntry, + GoalInput, + GoalStatus, + ManifestGoalEntry, CollectionInsert, CollectionKey, CollectionRow, @@ -338,8 +342,17 @@ export type { export { DEFAULT_STATE_SCHEMAS } from './default-state-schemas' export { createContextEntriesApi } from './context-entries' +export { createGoalApi, GOAL_MANIFEST_KEY } from './goal-api' +export type { GoalApi } from './goal-api' +export { + GOAL_SLASH_COMMAND, + dispatchGoalCommand, + isGoalCommandText, + parseGoalCommand, +} from './goal-command' +export type { GoalCommand, GoalDispatchResult } from './goal-command' export { assembleContext } from './context-assembly' -export { approxTokens, sliceChars } from './token-budget' +export { approxTokens, formatTokenCount, sliceChars } from './token-budget' export { createContextTools } from './tools/context-tools' export { completeWithLowCostModel, diff --git a/packages/agents-runtime/src/outbound-bridge.ts b/packages/agents-runtime/src/outbound-bridge.ts index 75eb1400f4..3b65a6ab50 100644 --- a/packages/agents-runtime/src/outbound-bridge.ts +++ b/packages/agents-runtime/src/outbound-bridge.ts @@ -97,6 +97,40 @@ export async function loadOutboundIdSeed( return seed } +/** + * Synchronously allocate the next `run-N` key, coordinated with the + * outbound bridge's id-seed cache. Writers like `ctx.recordRun()` and + * `ctx.replyText()` emit run rows via `writeEvent`, which has no + * synchronous local apply — the collection only catches up when the + * event round-trips, so seeding a counter from `runs.toArray` alone can + * reuse a key the bridge just allocated. Consulting (and advancing) the + * shared cache keeps all allocators collision-free within the process. + */ +export function allocateRunKey( + db: Pick, + floor = 0 +): string { + const cacheKey = db.collections.runs.id + const fromDb = nextCounterFromKeys( + db.collections.runs.toArray.map((run) => run.key), + `run` + ) + // Without a stable collection id the shared cache would cross-contaminate + // unrelated DBs (e.g. test fixtures); rely on the caller's floor instead. + const cached = cacheKey ? outboundIdSeedCache.get(cacheKey) : undefined + const next = Math.max(fromDb, cached?.run ?? 0, floor) + if (cacheKey) { + outboundIdSeedCache.set(cacheKey, { + run: next + 1, + step: cached?.step ?? 0, + msg: cached?.msg ?? 0, + tc: cached?.tc ?? 0, + cacheKey, + }) + } + return `run-${next}` +} + export interface OutboundBridge { onRunStart: () => void onRunEnd: (opts?: { finishReason?: string }) => void @@ -104,6 +138,9 @@ export interface OutboundBridge { onStepEnd: (opts?: { finishReason?: string tokenInput?: number + // Uncached portion of the input side (no cacheRead/cacheWrite). Not + // persisted to the step row — forwarded to hooks for budget accounting. + tokenInputUncached?: number tokenOutput?: number durationMs?: number }) => void @@ -121,9 +158,28 @@ export interface OutboundBridge { onToolCallEnd(name: string, result: unknown, isError: boolean): void } +export interface OutboundBridgeHooks { + /** + * Called after a step ends and has been written to the entity stream. + * Receives the token counts (zero if the provider did not report them): + * `input` is the full prompt volume the model saw (including prompt-cache + * reads — what the meta row displays), `uncachedInput` is the new input + * this step only (fresh tokens plus cache writes; cache *reads* excluded), + * `output` is completion tokens. Budget accounting should use + * `uncachedInput + output` so warm-cache turns don't re-count the entire + * conversation every step. + */ + onStepEnd?: (stats: { + input: number + uncachedInput: number + output: number + }) => void +} + export function createOutboundBridge( existingEvents: Array | OutboundIdSeed, - writeEvent: (event: ChangeEvent) => void + writeEvent: (event: ChangeEvent) => void, + hooks?: OutboundBridgeHooks ): OutboundBridge { const counters: IdCounters = Array.isArray(existingEvents) ? scanCounters(existingEvents) @@ -216,6 +272,7 @@ export function createOutboundBridge( onStepEnd(opts?: { finishReason?: string tokenInput?: number + tokenInputUncached?: number tokenOutput?: number durationMs?: number }) { @@ -240,6 +297,11 @@ export function createOutboundBridge( } as never, }) as ChangeEvent ) + hooks?.onStepEnd?.({ + input: opts?.tokenInput ?? 0, + uncachedInput: opts?.tokenInputUncached ?? opts?.tokenInput ?? 0, + output: opts?.tokenOutput ?? 0, + }) }, onTextStart() { diff --git a/packages/agents-runtime/src/pi-adapter.ts b/packages/agents-runtime/src/pi-adapter.ts index ab823f4304..52ab5296fa 100644 --- a/packages/agents-runtime/src/pi-adapter.ts +++ b/packages/agents-runtime/src/pi-adapter.ts @@ -42,6 +42,14 @@ export interface PiAdapterOptions { provider: string ) => Promise | string | undefined onPayload?: SimpleStreamOptions[`onPayload`] + // Invoked after each step ends with the token counts reported by the + // provider. Used by goal-budget enforcement to abort mid-run; see + // OutboundBridgeHooks for the field semantics. + onStepEnd?: (stats: { + input: number + uncachedInput: number + output: number + }) => void } interface PiAgentAdapterConfig { @@ -213,7 +221,8 @@ export function createPiAgentAdapter( return (config: PiAgentAdapterConfig): PiAgentHandle => { const bridge = createOutboundBridge( config.outboundIdSeed, - config.writeEvent + config.writeEvent, + opts.onStepEnd ? { onStepEnd: opts.onStepEnd } : undefined ) const history = toAgentHistory(config.messages) @@ -393,6 +402,21 @@ export function createPiAgentAdapter( (typeof usage?.inputTokens === `number` ? usage.inputTokens : undefined) + // Non-cache-hit input — what goal-budget enforcement + // accumulates. On warm turns `cacheRead` re-counts the whole + // conversation every step, so budgeting on the display sum + // would burn a budget in a couple of steps regardless of how + // much *new* work happened. `cacheWrite` IS counted: on + // cache-enabled providers the newly appended prompt tokens + // are reported there (with `usage.input` collapsing to ~0), + // so excluding it would make the budget track output only. + // Legacy flat `inputTokens` has no cache split, so the whole + // side counts as uncached. + const usageInputUncached = + sumPresentNumbers([usage?.input, usage?.cacheWrite]) ?? + (typeof usage?.inputTokens === `number` + ? usage.inputTokens + : undefined) const usageOutput = typeof usage?.output === `number` ? usage.output @@ -403,6 +427,9 @@ export function createPiAgentAdapter( finishReason, durationMs: Date.now() - stepStartTime, ...(usageInput !== undefined && { tokenInput: usageInput }), + ...(usageInputUncached !== undefined && { + tokenInputUncached: usageInputUncached, + }), ...(usageOutput !== undefined && { tokenOutput: usageOutput, }), diff --git a/packages/agents-runtime/src/token-budget.ts b/packages/agents-runtime/src/token-budget.ts index c22a584f93..93671b4b93 100644 --- a/packages/agents-runtime/src/token-budget.ts +++ b/packages/agents-runtime/src/token-budget.ts @@ -23,3 +23,21 @@ export function approxTokens(value: unknown): number { export function sliceChars(value: string, from: number, to: number): string { return value.slice(from, to) } + +/** + * Single shared token-count formatter: `Intl.NumberFormat` with + * `notation: 'compact'` gives "1.2k", "12k", "1.2m" — locale-aware + * and bounded in width. Lowercased suffix to match muted meta rows. + * Used by the /goal command replies, the goal banner, and the + * per-response token meta row so the same count never renders two + * different ways in one UI. + */ +const compactTokenFormatter = new Intl.NumberFormat(undefined, { + notation: `compact`, + maximumFractionDigits: 1, +}) + +export function formatTokenCount(n: number): string { + if (n < 1000) return String(n) + return compactTokenFormatter.format(n).toLowerCase() +} diff --git a/packages/agents-runtime/src/tools.ts b/packages/agents-runtime/src/tools.ts index aa0f8741f8..7119739b7a 100644 --- a/packages/agents-runtime/src/tools.ts +++ b/packages/agents-runtime/src/tools.ts @@ -7,3 +7,4 @@ export { createFetchUrlTool } from './tools/fetch-url' export { createScheduleTools } from './tools/schedules' export { createEventSourceTools } from './tools/event-sources' export { createSendTool } from './tools/send' +export { createMarkGoalCompleteTool } from './tools/goal-tools' diff --git a/packages/agents-runtime/src/tools/goal-tools.ts b/packages/agents-runtime/src/tools/goal-tools.ts new file mode 100644 index 0000000000..dbafb417df --- /dev/null +++ b/packages/agents-runtime/src/tools/goal-tools.ts @@ -0,0 +1,51 @@ +import { Type } from '@sinclair/typebox' +import type { AgentTool, HandlerContext } from '../types' + +function asToolResult(value: unknown) { + return { + content: [ + { + type: `text` as const, + text: + typeof value === `string` ? value : JSON.stringify(value, null, 2), + }, + ], + details: {}, + } +} + +// `mark_goal_complete` is the LLM's signal that the active goal has been met. +// The runtime uses this to break out of the continuation loop. +export function createMarkGoalCompleteTool( + ctx: Pick +): AgentTool { + return { + name: `mark_goal_complete`, + label: `Mark Goal Complete`, + description: `Mark the active goal as complete. Call this when you have finished the work described in the goal. After this is called, the runtime will stop driving you toward the goal.`, + parameters: Type.Object({ + summary: Type.Optional( + Type.String({ + description: `Optional short summary of what was accomplished. Recorded with the goal but not required.`, + }) + ), + }), + execute: async (_toolCallId, params) => { + const before = ctx.getGoal() + if (!before) { + return asToolResult({ + completed: false, + message: `No active goal to mark complete.`, + }) + } + const { summary } = (params ?? {}) as { summary?: string } + const updated = ctx.markGoalComplete(summary) + return asToolResult({ + completed: true, + objective: updated?.objective ?? before.objective, + status: updated?.status ?? `complete`, + ...(updated?.summary ? { summary: updated.summary } : {}), + }) + }, + } +} diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index ec366ab670..26a9873eca 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -36,6 +36,7 @@ import type { ContextInserted as EntityContextInserted, ContextRemoved as EntityContextRemoved, EntitySignal, + GoalStatus as EntityGoalStatus, Manifest as EntityManifest, ManifestAttachmentEntry as EntityManifestAttachmentEntry, ManifestChildEntry as EntityManifestChildEntry, @@ -43,6 +44,7 @@ import type { ManifestCronScheduleEntry as EntityManifestCronScheduleEntry, ManifestEffectEntry as EntityManifestEffectEntry, ManifestFutureSendScheduleEntry as EntityManifestFutureSendScheduleEntry, + ManifestGoalEntry as EntityManifestGoalEntry, ManifestSharedStateEntry as EntityManifestSharedStateEntry, ManifestSourceEntry as EntityManifestSourceEntry, Signal as EntitySignalEntry, @@ -321,11 +323,13 @@ export type ManifestCronScheduleEntry = EntityManifestCronScheduleEntry export type ManifestEffectEntry = EntityManifestEffectEntry export type ManifestFutureSendScheduleEntry = EntityManifestFutureSendScheduleEntry +export type ManifestGoalEntry = EntityManifestGoalEntry export type ManifestSourceEntry = EntityManifestSourceEntry export type ManifestSharedStateEntry = EntityManifestSharedStateEntry export type ContextInserted = EntityContextInserted export type ContextRemoved = EntityContextRemoved export type ContextEntryAttrs = EntityContextEntryAttrs +export type GoalStatus = EntityGoalStatus export interface ContextEntryInput { name: string @@ -338,6 +342,27 @@ export interface ContextEntry extends ContextEntryInput { insertedAt: number } +export interface GoalInput { + objective: string + status?: GoalStatus + // `null` means unbounded; omitted means "use the runtime default". + tokenBudget?: number | null +} + +export interface GoalEntry { + id: string + objective: string + status: GoalStatus + tokenBudget: number | null + tokensUsed: number + // Completion note recorded by mark_goal_complete — what was + // accomplished, or what blocked the goal. + summary?: string + // ISO strings, matching every other manifest kind. + createdAt: string + updatedAt: string +} + export type AttachmentCreateInput = { bytes: Uint8Array | ArrayBuffer | Blob mimeType?: string @@ -916,6 +941,17 @@ export interface AgentConfig { provider: string ) => Promise | string | undefined onPayload?: SimpleStreamOptions[`onPayload`] + // Invoked after each step ends with the provider-reported token counts. + // `input` is the full prompt volume (incl. prompt-cache reads/writes, as + // displayed in the meta row); `uncachedInput` is the new input this step + // only (fresh tokens + cache writes; cache reads excluded). Budget + // accounting should use `uncachedInput + output` so warm cache turns + // don't re-count the whole conversation each step. + onStepEnd?: (stats: { + input: number + uncachedInput: number + output: number + }) => void testResponses?: TestResponses } @@ -946,7 +982,7 @@ export interface OutboundBridgeHandle { } export interface AgentHandle { - run: (input?: string) => Promise + run: (input?: string, abortSignal?: AbortSignal) => Promise } /** @@ -1024,6 +1060,14 @@ export interface HandlerContext< removeContext: (id: string) => void getContext: (id: string) => ContextEntry | undefined listContext: () => Array + setGoal: (input: GoalInput) => GoalEntry + clearGoal: () => boolean + getGoal: () => GoalEntry | undefined + markGoalComplete: (summary?: string) => GoalEntry | undefined + updateGoalUsage: ( + tokensUsed: number, + opts?: { status?: GoalEntry[`status`] } + ) => GoalEntry | undefined agent: AgentHandle spawn: ( type: string, @@ -1128,6 +1172,13 @@ export interface HandlerContext< * `useAgent` flow records runs internally via the outbound bridge. */ recordRun: () => RunHandle + /** + * Write a synthetic agent text reply to the entity. Emits a complete + * runs + texts + text_delta sequence so the chat UI renders it as an + * ordinary assistant message. Use for runtime-driven replies (slash + * commands, error messages) that don't involve the LLM. + */ + replyText: (text: string) => void sleep: () => void setTag: (key: string, value: string) => Promise deleteTag: (key: string) => Promise diff --git a/packages/agents-runtime/test/goal-api.test.ts b/packages/agents-runtime/test/goal-api.test.ts new file mode 100644 index 0000000000..5ef6fc835a --- /dev/null +++ b/packages/agents-runtime/test/goal-api.test.ts @@ -0,0 +1,227 @@ +import { describe, expect, it, vi } from 'vitest' +import { DEFAULT_TOKEN_BUDGET, createGoalApi } from '../src/goal-api' +import type { ChangeEvent } from '@durable-streams/state' + +// Minimal harness: a mutable manifests array that registerManifestEntry +// upserts into (mirroring the wake-session's optimistic apply), plus an +// optional captured writeEvent channel for the live-update path. +function makeApi(opts?: { withWriteEvent?: boolean }) { + const rows: Array> = [] + const events: Array = [] + const wakeSession = { + registerManifestEntry: vi.fn((entry: Record) => { + const idx = rows.findIndex((row) => row.kind === `goal`) + if (idx >= 0) rows[idx] = entry + else rows.push(entry) + return true + }), + removeManifestEntry: vi.fn(() => { + const idx = rows.findIndex((row) => row.kind === `goal`) + if (idx >= 0) { + rows.splice(idx, 1) + return true + } + return false + }), + } + let tick = 0 + const api = createGoalApi({ + db: { collections: { manifests: { toArray: rows } } } as never, + wakeSession: wakeSession as never, + ...(opts?.withWriteEvent + ? { writeEvent: (event: ChangeEvent) => events.push(event) } + : {}), + now: () => `2026-06-10T00:00:0${tick++}.000Z`, + }) + return { api, rows, events, wakeSession } +} + +describe(`createGoalApi.setGoal`, () => { + it(`defaults the budget and starts usage at zero`, () => { + const { api } = makeApi() + const goal = api.setGoal({ objective: `ship X` }) + + expect(goal.tokenBudget).toBe(DEFAULT_TOKEN_BUDGET) + expect(goal.tokensUsed).toBe(0) + expect(goal.status).toBe(`active`) + expect(goal.createdAt).toBe(`2026-06-10T00:00:00.000Z`) + }) + + it(`explicit null budget means unlimited`, () => { + const { api } = makeApi() + expect( + api.setGoal({ objective: `x`, tokenBudget: null }).tokenBudget + ).toBeNull() + }) + + it(`re-setting the same objective preserves usage and createdAt`, () => { + const { api } = makeApi() + api.setGoal({ objective: `ship X`, tokenBudget: 1_000 }) + api.updateGoalUsage(700) + const raised = api.setGoal({ objective: `ship X`, tokenBudget: 5_000 }) + + expect(raised.tokensUsed).toBe(700) + expect(raised.tokenBudget).toBe(5_000) + expect(raised.createdAt).toBe(`2026-06-10T00:00:00.000Z`) + }) + + it(`a new objective resets usage and createdAt`, () => { + const { api } = makeApi() + api.setGoal({ objective: `ship X` }) + api.updateGoalUsage(700) + const next = api.setGoal({ objective: `ship Y` }) + + expect(next.tokensUsed).toBe(0) + expect(next.createdAt).not.toBe(`2026-06-10T00:00:00.000Z`) + }) +}) + +describe(`createGoalApi.updateGoalUsage`, () => { + it(`never decreases tokensUsed`, () => { + const { api } = makeApi() + api.setGoal({ objective: `x` }) + api.updateGoalUsage(500) + const after = api.updateGoalUsage(300) + + expect(after?.tokensUsed).toBe(500) + }) + + it(`flips status when requested`, () => { + const { api } = makeApi() + api.setGoal({ objective: `x` }) + const limited = api.updateGoalUsage(60_000, { + status: `budget_limited`, + }) + + expect(limited?.status).toBe(`budget_limited`) + expect(limited?.tokensUsed).toBe(60_000) + }) + + it(`is a no-op write when nothing changed`, () => { + const { api, wakeSession } = makeApi() + api.setGoal({ objective: `x` }) + api.updateGoalUsage(500) + const writesBefore = wakeSession.registerManifestEntry.mock.calls.length + api.updateGoalUsage(500) + + expect(wakeSession.registerManifestEntry.mock.calls.length).toBe( + writesBefore + ) + }) + + it(`returns undefined when no goal exists`, () => { + const { api } = makeApi() + expect(api.updateGoalUsage(100)).toBeUndefined() + }) + + it(`writes through writeEvent when wired (live path)`, () => { + const { api, events, wakeSession } = makeApi({ withWriteEvent: true }) + api.setGoal({ objective: `x` }) + api.updateGoalUsage(500) + + expect(events.length).toBe(2) + const value = (events[1] as { value?: { tokensUsed?: number } }).value + expect(value?.tokensUsed).toBe(500) + // Single write channel: nothing may go through the wake-session's + // staged transaction when writeEvent is wired — staged entries replay + // at end-of-wake and would clobber fresher live writes. + expect(wakeSession.registerManifestEntry).not.toHaveBeenCalled() + }) + + it(`preserves a recorded summary across usage writes`, () => { + const { api, rows } = makeApi() + api.setGoal({ objective: `x` }) + api.markGoalComplete(`done it`) + api.updateGoalUsage(900) + + expect(rows[0]?.summary).toBe(`done it`) + }) +}) + +describe(`createGoalApi.markGoalComplete`, () => { + it(`flips status and records the trimmed summary`, () => { + const { api } = makeApi() + api.setGoal({ objective: `x` }) + const done = api.markGoalComplete(` shipped the thing `) + + expect(done?.status).toBe(`complete`) + expect(done?.summary).toBe(`shipped the thing`) + }) + + it(`omits an empty summary`, () => { + const { api } = makeApi() + api.setGoal({ objective: `x` }) + expect(api.markGoalComplete(` `)?.summary).toBeUndefined() + }) + + it(`returns undefined when no goal exists`, () => { + const { api } = makeApi() + expect(api.markGoalComplete()).toBeUndefined() + }) + + // Regression: events written via writeEvent only reach the local + // manifests collection after a stream round-trip. mark_goal_complete + // firing mid-run must read its own wake's latest write (the per-step + // usage counter), not a stale collection row — previously it snapshotted + // the lagging value and overwrote a fresher tokensUsed. + it(`does not clobber fresher usage written earlier in the same wake`, () => { + const { api, events } = makeApi({ withWriteEvent: true }) + api.setGoal({ objective: `x` }) + api.updateGoalUsage(5_728) + const done = api.markGoalComplete(`all wrapped up`) + + expect(done?.tokensUsed).toBe(5_728) + const last = events.at(-1) as { value?: Record } + expect(last.value?.tokensUsed).toBe(5_728) + expect(last.value?.status).toBe(`complete`) + }) + + it(`status survives a usage write that follows completion (lagging reads)`, () => { + const { api, events } = makeApi({ withWriteEvent: true }) + api.setGoal({ objective: `x` }) + api.markGoalComplete() + api.updateGoalUsage(9_000) + + const last = events.at(-1) as { value?: Record } + expect(last.value?.status).toBe(`complete`) + expect(last.value?.tokensUsed).toBe(9_000) + }) +}) + +describe(`createGoalApi.clearGoal / getGoal`, () => { + it(`round-trips through getGoal and clears`, () => { + const { api } = makeApi() + expect(api.getGoal()).toBeUndefined() + api.setGoal({ objective: `x` }) + expect(api.getGoal()?.objective).toBe(`x`) + expect(api.clearGoal()).toBe(true) + expect(api.getGoal()).toBeUndefined() + expect(api.clearGoal()).toBe(false) + }) + + it(`clears via a delete event when writeEvent is wired`, () => { + // Seed a goal row as if persisted by a previous wake. + const { api, rows, events, wakeSession } = makeApi({ + withWriteEvent: true, + }) + rows.push({ + key: `goal`, + kind: `goal`, + id: `goal`, + objective: `x`, + status: `active`, + tokenBudget: 1_000, + tokensUsed: 10, + createdAt: `2026-06-09T00:00:00.000Z`, + updatedAt: `2026-06-09T00:00:00.000Z`, + }) + + expect(api.clearGoal()).toBe(true) + expect(events.length).toBe(1) + // The stale collection row hasn't round-tripped away yet, but reads in + // this wake must already observe the goal as gone. + expect(api.getGoal()).toBeUndefined() + expect(api.clearGoal()).toBe(false) + expect(wakeSession.removeManifestEntry).not.toHaveBeenCalled() + }) +}) diff --git a/packages/agents-runtime/test/goal-command.test.ts b/packages/agents-runtime/test/goal-command.test.ts new file mode 100644 index 0000000000..7c101f4d9b --- /dev/null +++ b/packages/agents-runtime/test/goal-command.test.ts @@ -0,0 +1,247 @@ +import { describe, expect, it } from 'vitest' +import { + dispatchGoalCommand, + isGoalCommandText, + parseGoalCommand, +} from '../src/goal-command' +import type { GoalEntry, GoalInput, HandlerContext } from '../src/index' + +describe(`isGoalCommandText`, () => { + it(`matches /goal with a trailing space or end-of-line`, () => { + expect(isGoalCommandText(`/goal`)).toBe(true) + expect(isGoalCommandText(`/goal `)).toBe(true) + expect(isGoalCommandText(`/goal set X`)).toBe(true) + expect(isGoalCommandText(` /goal show`)).toBe(true) + }) + + it(`does not match unrelated text`, () => { + expect(isGoalCommandText(`/goalkeeper`)).toBe(false) + expect(isGoalCommandText(`hello /goal`)).toBe(false) + expect(isGoalCommandText(``)).toBe(false) + expect(isGoalCommandText(`/help`)).toBe(false) + }) +}) + +describe(`parseGoalCommand`, () => { + it(`parses /goal set with a quoted objective`, () => { + expect(parseGoalCommand(`/goal set "ship feature X"`)).toEqual({ + kind: `set`, + objective: `ship feature X`, + }) + }) + + it(`parses /goal set with --tokens (number)`, () => { + expect(parseGoalCommand(`/goal set "ship X" --tokens 50000`)).toEqual({ + kind: `set`, + objective: `ship X`, + tokenBudget: 50000, + }) + }) + + it(`parses --tokens with k/m suffixes`, () => { + expect(parseGoalCommand(`/goal set foo --tokens 50k`)).toEqual({ + kind: `set`, + objective: `foo`, + tokenBudget: 50_000, + }) + expect(parseGoalCommand(`/goal set foo --tokens 1.2m`)).toEqual({ + kind: `set`, + objective: `foo`, + tokenBudget: 1_200_000, + }) + }) + + it(`accepts --tokens=N inline form`, () => { + expect(parseGoalCommand(`/goal set foo --tokens=20k`)).toEqual({ + kind: `set`, + objective: `foo`, + tokenBudget: 20_000, + }) + }) + + it(`parses --tokens unlimited as null`, () => { + expect(parseGoalCommand(`/goal set foo --tokens unlimited`)).toEqual({ + kind: `set`, + objective: `foo`, + tokenBudget: null, + }) + expect(parseGoalCommand(`/goal set foo --tokens infinite`)).toEqual({ + kind: `set`, + objective: `foo`, + tokenBudget: null, + }) + }) + + it(`accepts --unlimited as a standalone flag`, () => { + expect(parseGoalCommand(`/goal set foo --unlimited`)).toEqual({ + kind: `set`, + objective: `foo`, + tokenBudget: null, + }) + }) + + it(`leaves tokenBudget undefined when not specified (runtime defaults it)`, () => { + const parsed = parseGoalCommand(`/goal set "ship X"`) + expect(parsed.kind).toBe(`set`) + if (parsed.kind === `set`) { + expect(parsed.tokenBudget).toBeUndefined() + } + }) + + it(`returns clear / show / complete`, () => { + expect(parseGoalCommand(`/goal clear`)).toEqual({ kind: `clear` }) + expect(parseGoalCommand(`/goal show`)).toEqual({ kind: `show` }) + expect(parseGoalCommand(`/goal status`)).toEqual({ kind: `show` }) + expect(parseGoalCommand(`/goal complete`)).toEqual({ kind: `complete` }) + expect(parseGoalCommand(`/goal done`)).toEqual({ kind: `complete` }) + }) + + it(`reports error on bare /goal`, () => { + expect(parseGoalCommand(`/goal`).kind).toBe(`error`) + }) + + it(`reports error on unknown subcommand`, () => { + expect(parseGoalCommand(`/goal foo bar`).kind).toBe(`error`) + }) + + it(`reports error on /goal set with no objective`, () => { + expect(parseGoalCommand(`/goal set`).kind).toBe(`error`) + expect(parseGoalCommand(`/goal set --tokens 50k`).kind).toBe(`error`) + }) + + it(`reports error on invalid token budget`, () => { + expect(parseGoalCommand(`/goal set foo --tokens`).kind).toBe(`error`) + expect(parseGoalCommand(`/goal set foo --tokens abc`).kind).toBe(`error`) + expect(parseGoalCommand(`/goal set foo --tokens 0`).kind).toBe(`error`) + expect(parseGoalCommand(`/goal set foo --tokens -1`).kind).toBe(`error`) + }) +}) + +describe(`dispatchGoalCommand`, () => { + function makeStubCtx(initialGoal?: GoalEntry) { + let goal: GoalEntry | undefined = initialGoal + const ctx = { + setGoal: (input: GoalInput) => { + const tokenBudget = + input.tokenBudget === undefined ? 50_000 : input.tokenBudget + const next: GoalEntry = { + id: `goal`, + objective: input.objective, + status: input.status ?? `active`, + tokenBudget, + tokensUsed: 0, + createdAt: `2026-01-01T00:00:00.000Z`, + updatedAt: `2026-01-01T00:00:00.000Z`, + } + goal = next + return next + }, + clearGoal: () => { + if (!goal) return false + goal = undefined + return true + }, + getGoal: () => goal, + markGoalComplete: () => { + if (!goal) return undefined + const next: GoalEntry = { + ...goal, + status: `complete`, + updatedAt: `2026-01-02T00:00:00.000Z`, + } + goal = next + return next + }, + } satisfies Pick< + HandlerContext, + `setGoal` | `clearGoal` | `getGoal` | `markGoalComplete` + > + return { ctx, getGoal: () => goal } + } + + it(`dispatches set and stores the goal with the requested budget`, () => { + const stub = makeStubCtx() + const result = dispatchGoalCommand(stub.ctx, { + kind: `set`, + objective: `ship X`, + tokenBudget: 20_000, + }) + expect(result.handled).toBe(true) + expect(stub.getGoal()?.objective).toBe(`ship X`) + expect(stub.getGoal()?.tokenBudget).toBe(20_000) + }) + + it(`dispatches set with unlimited budget`, () => { + const stub = makeStubCtx() + dispatchGoalCommand(stub.ctx, { + kind: `set`, + objective: `explore`, + tokenBudget: null, + }) + expect(stub.getGoal()?.tokenBudget).toBeNull() + }) + + it(`dispatches clear and removes the goal`, () => { + const stub = makeStubCtx({ + id: `goal`, + objective: `existing`, + status: `active`, + tokenBudget: 10_000, + tokensUsed: 0, + createdAt: `2026-01-01T00:00:00.000Z`, + updatedAt: `2026-01-01T00:00:00.000Z`, + }) + const result = dispatchGoalCommand(stub.ctx, { kind: `clear` }) + expect(result.handled).toBe(true) + expect(stub.getGoal()).toBeUndefined() + }) + + it(`dispatches complete and flips status`, () => { + const stub = makeStubCtx({ + id: `goal`, + objective: `existing`, + status: `active`, + tokenBudget: 10_000, + tokensUsed: 2_000, + createdAt: `2026-01-01T00:00:00.000Z`, + updatedAt: `2026-01-01T00:00:00.000Z`, + }) + dispatchGoalCommand(stub.ctx, { kind: `complete` }) + expect(stub.getGoal()?.status).toBe(`complete`) + }) + + it(`reports "no goal" on show when none is set`, () => { + const stub = makeStubCtx() + const result = dispatchGoalCommand(stub.ctx, { kind: `show` }) + expect(result.message).toMatch(/no goal/i) + }) + + it(`reports current goal on show with formatted tokens`, () => { + const stub = makeStubCtx({ + id: `goal`, + objective: `ship X`, + status: `active`, + tokenBudget: 50_000, + tokensUsed: 12_345, + createdAt: `2026-01-01T00:00:00.000Z`, + updatedAt: `2026-01-01T00:00:00.000Z`, + }) + const result = dispatchGoalCommand(stub.ctx, { kind: `show` }) + expect(result.message).toMatch(/ship X/) + expect(result.message).toMatch(/12\.3k.*50k/) + }) + + it(`labels show output as unlimited when budget is null`, () => { + const stub = makeStubCtx({ + id: `goal`, + objective: `explore`, + status: `active`, + tokenBudget: null, + tokensUsed: 7, + createdAt: `2026-01-01T00:00:00.000Z`, + updatedAt: `2026-01-01T00:00:00.000Z`, + }) + const result = dispatchGoalCommand(stub.ctx, { kind: `show` }) + expect(result.message).toMatch(/unlimited/) + }) +}) diff --git a/packages/agents-runtime/test/record-run.test.ts b/packages/agents-runtime/test/record-run.test.ts index 927188da93..2c31db33ff 100644 --- a/packages/agents-runtime/test/record-run.test.ts +++ b/packages/agents-runtime/test/record-run.test.ts @@ -15,20 +15,27 @@ interface RecordRunHarness { writeEvent: ReturnType } +// Unique collection ids per harness: the run-key allocator coordinates +// through a process-wide seed cache keyed by `runs.id` (mirroring how each +// entity has a unique runs collection in production), so reusing one id +// across test cases would leak allocations between them. +let harnessSeq = 0 + function buildHarness(opts?: { existingRunKeys?: Array }): { ctx: ReturnType[`ctx`] writeEvent: ReturnType } { + const harnessId = harnessSeq++ const collections: Record = {} for (const [name] of Object.entries(ENTITY_COLLECTIONS)) { if (name === `runs`) continue collections[name] = createLocalOnlyTestCollection([], { - id: `test-${name}`, + id: `test-${harnessId}-${name}`, }) } collections.runs = createLocalOnlyTestCollection( (opts?.existingRunKeys ?? []).map((key) => ({ key, status: `completed` })), - { id: `test-runs` } + { id: `test-${harnessId}-runs` } ) const db = { diff --git a/packages/agents-server-ui/src/components/EntityContextDrawer.tsx b/packages/agents-server-ui/src/components/EntityContextDrawer.tsx index 7f93db305d..292b147c89 100644 --- a/packages/agents-server-ui/src/components/EntityContextDrawer.tsx +++ b/packages/agents-server-ui/src/components/EntityContextDrawer.tsx @@ -569,6 +569,8 @@ function manifestKindLabel(manifest: Manifest): string { return `Context` case `schedule`: return manifest.scheduleType === `cron` ? `Cron schedule` : `Future send` + case `goal`: + return `Goal` } } @@ -706,6 +708,11 @@ function createManifestEntry( action: { kind: `inspect` }, entity: null, } + + // Goal entries are surfaced via the `GoalBanner` above the timeline, + // not the manifest drawer. + case `goal`: + return null } } diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index d625c2c801..d0f4527041 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -749,6 +749,8 @@ function manifestKindLabel(manifest: Manifest): string { return `Context` case `schedule`: return `Schedule` + case `goal`: + return `Goal` } } @@ -763,6 +765,7 @@ function manifestTitle(manifest: Manifest): string { case `attachment`: case `context`: case `schedule`: + case `goal`: return manifest.id } } @@ -785,6 +788,8 @@ function manifestMeta(manifest: Manifest): string { return manifest.scheduleType === `cron` ? `${manifest.expression}${manifest.timezone ? ` · ${manifest.timezone}` : ``}` : `${manifest.fireAt} · ${manifest.status}` + case `goal`: + return manifest.status ?? `active` } } @@ -843,6 +848,18 @@ function manifestDetails( { label: `Target`, value: manifest.targetUrl }, { label: `Status`, value: manifest.status ?? `pending` }, ] + case `goal`: + return [ + { label: `Objective`, value: manifest.objective ?? `` }, + { label: `Status`, value: manifest.status ?? `active` }, + { + label: `Tokens`, + value: + manifest.tokenBudget === null || manifest.tokenBudget === undefined + ? `${manifest.tokensUsed ?? 0}` + : `${manifest.tokensUsed ?? 0} / ${manifest.tokenBudget}`, + }, + ] } } diff --git a/packages/agents-server-ui/src/components/GoalBanner.module.css b/packages/agents-server-ui/src/components/GoalBanner.module.css new file mode 100644 index 0000000000..c4d0672146 --- /dev/null +++ b/packages/agents-server-ui/src/components/GoalBanner.module.css @@ -0,0 +1,79 @@ +.banner { + display: flex; + align-items: center; + gap: 0.75rem; + padding: 0.5rem 0.875rem; + border-bottom: 1px solid var(--surface-border, rgba(0, 0, 0, 0.08)); + background: var(--surface-subtle, rgba(0, 0, 0, 0.03)); + font-size: 0.85rem; + color: var(--text-default); +} + +.label { + font-weight: 600; + letter-spacing: 0.02em; + text-transform: uppercase; + font-size: 0.7rem; + color: var(--text-muted, rgba(0, 0, 0, 0.55)); + flex-shrink: 0; +} + +.objective { + flex: 1 1 auto; + min-width: 0; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.usage { + flex-shrink: 0; + font-variant-numeric: tabular-nums; + color: var(--text-muted, rgba(0, 0, 0, 0.6)); +} + +.status { + flex-shrink: 0; + text-transform: capitalize; + padding: 0.1rem 0.45rem; + border-radius: 0.5rem; + font-size: 0.7rem; + font-weight: 600; + background: var(--surface-strong, rgba(0, 0, 0, 0.08)); +} + +.statusActive { + background: var(--success-surface, rgba(34, 139, 34, 0.12)); + color: var(--success-text, #1b6b1b); +} + +.statusBudgetLimited { + background: var(--danger-surface, rgba(200, 50, 50, 0.12)); + color: var(--danger-text, #a02020); +} + +.statusComplete { + background: var(--info-surface, rgba(60, 120, 200, 0.12)); + color: var(--info-text, #1f4f8f); +} + +.bar { + position: relative; + flex-shrink: 0; + width: 6rem; + height: 0.35rem; + border-radius: 0.5rem; + overflow: hidden; + background: var(--surface-strong, rgba(0, 0, 0, 0.08)); +} + +.barFill { + position: absolute; + inset: 0; + background: var(--accent, #4078c0); + transform-origin: left center; +} + +.barFillOver { + background: var(--danger, #c84040); +} diff --git a/packages/agents-server-ui/src/components/GoalBanner.tsx b/packages/agents-server-ui/src/components/GoalBanner.tsx new file mode 100644 index 0000000000..6f6fdd1913 --- /dev/null +++ b/packages/agents-server-ui/src/components/GoalBanner.tsx @@ -0,0 +1,98 @@ +import { useMemo } from 'react' +import { useLiveQuery } from '@tanstack/react-db' +import { formatTokenCount } from '@electric-ax/agents-runtime/client' +import styles from './GoalBanner.module.css' +import type { + EntityStreamDBWithActions, + GoalStatus, + Manifest, +} from '@electric-ax/agents-runtime/client' + +type GoalRow = { + objective: string + status: GoalStatus + tokenBudget: number | null + tokensUsed: number +} + +interface GoalBannerProps { + db: EntityStreamDBWithActions | null +} + +export function GoalBanner({ db }: GoalBannerProps): React.ReactElement | null { + const { data: manifests = [] } = useLiveQuery( + (q) => { + if (!db) return undefined + return q.from({ manifest: db.collections.manifests }) + }, + [db] + ) + const goal = useMemo(() => { + for (const m of manifests as Array) { + if (m.kind === `goal`) { + const rawBudget = m.tokenBudget + return { + objective: String(m.objective ?? ``), + status: (m.status ?? `active`) as GoalStatus, + tokenBudget: + rawBudget === null + ? null + : typeof rawBudget === `number` + ? rawBudget + : null, + tokensUsed: typeof m.tokensUsed === `number` ? m.tokensUsed : 0, + } + } + } + return null + }, [manifests]) + + if (!goal) return null + + const usageLabel = + goal.tokenBudget === null + ? `${formatTokenCount(goal.tokensUsed)} tokens` + : `${formatTokenCount(goal.tokensUsed)} / ${formatTokenCount(goal.tokenBudget)} tokens` + const fillRatio = + goal.tokenBudget !== null && goal.tokenBudget > 0 + ? Math.min(1, goal.tokensUsed / goal.tokenBudget) + : null + const over = goal.tokenBudget !== null && goal.tokensUsed >= goal.tokenBudget + + return ( +
+ Goal + + {goal.objective} + + {fillRatio !== null && ( +
+
+
+ )} + {usageLabel} + + {goal.status.replace(`_`, ` `)} + +
+ ) +} + +function statusClass(status: GoalStatus): string { + switch (status) { + case `active`: + return styles.statusActive! + case `complete`: + return styles.statusComplete! + case `budget_limited`: + return styles.statusBudgetLimited! + } +} diff --git a/packages/agents-server-ui/src/components/MessageInput.tsx b/packages/agents-server-ui/src/components/MessageInput.tsx index 74e79243ec..e957c5218a 100644 --- a/packages/agents-server-ui/src/components/MessageInput.tsx +++ b/packages/agents-server-ui/src/components/MessageInput.tsx @@ -9,7 +9,11 @@ import { createUpdateInboxMessageAction, readTextPayload, } from '../lib/sendMessage' -import { serializeComposerInput } from '@electric-ax/agents-runtime/client' +import { + isGoalCommandText, + parseGoalCommand, + serializeComposerInput, +} from '@electric-ax/agents-runtime/client' import { ComposerEditor } from './ComposerEditor' import { ComposerShell } from './ComposerShell' import { Icon, Stack, Text, Tooltip } from '../ui' @@ -27,6 +31,17 @@ import type { } from '@electric-ax/agents-runtime/client' import type { OptimisticInboxMessage } from '../lib/sendMessage' +// /goal commands that mutate state should interrupt any in-flight agent +// run so the user doesn't have to wait for the old work to finish before +// the new goal/state takes effect. /goal show is read-only and never +// aborts. Delegates to the runtime's parser so the recognized grammar +// (including subcommand aliases) can't drift from the dispatcher. +function isAbortingGoalCommand(text: string): boolean { + if (!isGoalCommandText(text)) return false + const kind = parseGoalCommand(text).kind + return kind === `set` || kind === `clear` || kind === `complete` +} + export function MessageInput({ db, baseUrl, @@ -186,6 +201,15 @@ export function MessageInput({ ...(files.length > 0 ? { attachments: files } : {}), }) if (!tx) return + // State-changing /goal commands should interrupt any in-flight agent + // run — otherwise the agent keeps working on the old goal/work even + // though the user just told it to stop or pivot. /goal show is purely + // a read, so it never aborts. `onStop` itself no-ops when nothing is + // running, so this is safe to call unconditionally for the matching + // commands. + if (!editingMessage && isAbortingGoalCommand(text)) { + onStop?.() + } if (!editingMessage) onSend?.() setValue(``) clearAttachments() @@ -209,6 +233,7 @@ export function MessageInput({ updateAction, editingMessage, onSend, + onStop, effectiveSlashCommands, ] ) diff --git a/packages/agents-server-ui/src/components/TokenUsage.tsx b/packages/agents-server-ui/src/components/TokenUsage.tsx index 7e515693b6..1dce9e4856 100644 --- a/packages/agents-server-ui/src/components/TokenUsage.tsx +++ b/packages/agents-server-ui/src/components/TokenUsage.tsx @@ -1,3 +1,4 @@ +import { formatTokenCount } from '@electric-ax/agents-runtime/client' import { Text } from '../ui' import styles from './TokenUsage.module.css' @@ -39,19 +40,3 @@ export function TokenUsage({ ) } - -/** - * `Intl.NumberFormat` with `notation: 'compact'` gives us "1.2K", - * "12K", "1.2M" etc., locale-aware and bounded in width — better - * than a hand-rolled rounder. We force lowercase `k`/`m` afterward - * so the suffix tone matches the muted meta row. - */ -const compactFormatter = new Intl.NumberFormat(undefined, { - notation: `compact`, - maximumFractionDigits: 1, -}) - -function formatTokenCount(n: number): string { - if (n < 1000) return String(n) - return compactFormatter.format(n).toLowerCase() -} diff --git a/packages/agents-server-ui/src/components/views/ChatView.tsx b/packages/agents-server-ui/src/components/views/ChatView.tsx index 0f64c6c239..ec1b544ba8 100644 --- a/packages/agents-server-ui/src/components/views/ChatView.tsx +++ b/packages/agents-server-ui/src/components/views/ChatView.tsx @@ -3,6 +3,7 @@ import { useNavigate } from '@tanstack/react-router' import { eq, useLiveQuery } from '@tanstack/react-db' import { useEntityTimeline } from '../../hooks/useEntityTimeline' import { EntityTimeline } from '../EntityTimeline' +import { GoalBanner } from '../GoalBanner' import { MessageInput } from '../MessageInput' import { EntityContextDrawer } from '../EntityContextDrawer' import { useElectricAgents } from '../../lib/ElectricAgentsProvider' @@ -349,6 +350,7 @@ function GenericChatBody({ return ( <> + { } } +function extractWakeText(wake: WakeEvent): string | null { + if (wake.type !== `inbox`) return null + const payload = wake.payload + if (typeof payload === `string`) return payload + if (payload && typeof payload === `object`) { + const record = payload as { text?: unknown; source?: unknown } + if (typeof record.text === `string`) return record.text + // Composer-structured messages (WireComposerInputPayload) carry the + // raw typed text in `source` alongside the parsed `nodes`. The /goal + // grammar is text-based, so the raw source is what we dispatch on. + if (typeof record.source === `string`) return record.source + } + return null +} + +async function tryHandleSlashCommand( + ctx: HandlerContext, + wake: WakeEvent +): Promise { + const text = extractWakeText(wake) + if (text === null) return false + if (isGoalCommandText(text)) { + const command = parseGoalCommand(text) + const result = dispatchGoalCommand(ctx, command) + if (result.message) { + serverLog.info(`[horton ${ctx.entityUrl}] ${result.message}`) + // Surface the response in chat so the user gets visible feedback + // for every slash command, not just the ones that change the banner. + writeSlashCommandReply(ctx, result.message) + } + // /goal set kicks off the agent in a fresh wake so it starts working + // immediately. Token-budget enforcement (in assistantHandler) handles + // the stop condition; no iterative continuation is needed. + if (command.kind === `set`) { + await kickoffGoalRun(ctx) + } + return result.handled + } + return false +} + +const GOAL_KICKOFF_TEXT = `Start working toward the active goal now. Call \`mark_goal_complete\` when you believe it is done.` + +async function kickoffGoalRun(ctx: HandlerContext): Promise { + const goal = ctx.getGoal() + if (!goal || goal.status !== `active`) return + try { + await ctx.send( + ctx.entityUrl, + { kind: `goal_kickoff`, text: GOAL_KICKOFF_TEXT }, + { type: `inbox` } + ) + } catch (err) { + serverLog.warn( + `[horton ${ctx.entityUrl}] failed to enqueue goal kickoff: ${ + err instanceof Error ? err.message : String(err) + }` + ) + } +} + +function writeSlashCommandReply(ctx: HandlerContext, text: string): void { + try { + ctx.replyText(text) + } catch (err) { + serverLog.warn( + `[horton ${ctx.entityUrl}] failed to render slash command reply: ${ + err instanceof Error ? err.message : String(err) + }` + ) + } +} + function createAssistantHandler(options: { streamFn?: StreamFn docsSupport: HortonDocsSupport | null @@ -514,6 +621,10 @@ function createAssistantHandler(options: { ctx: HandlerContext, wake: WakeEvent ): Promise { + // Slash commands handled directly by the runtime (no LLM turn). These + // run as soon as the wake fires, mutate manifest state, and return. + if (await tryHandleSlashCommand(ctx, wake)) return + const loadedSkills = await skillLoader.load(ctx) const readSet = new Set() @@ -671,6 +782,49 @@ function createAssistantHandler(options: { }) } + // Only an *active* goal drives any goal behavior. A goal that is + // `complete` or `budget_limited` stays in the manifest (for the banner + // and /goal show) but must not accumulate usage from unrelated chat + // turns, trip the budget on them, or appear in the prompt. + const goal = ctx.getGoal() + const enforcedGoal = goal && goal.status === `active` ? goal : undefined + const activeGoalPromptInfo = enforcedGoal + ? { + objective: enforcedGoal.objective, + tokenBudget: enforcedGoal.tokenBudget, + tokensUsed: enforcedGoal.tokensUsed, + } + : undefined + + // Mid-run budget enforcement + live banner updates: after each step we + // (a) advance our in-memory accumulator, (b) write it to the goal entry + // so the UI banner reflects it optimistically, and (c) abort the run if + // the budget is exhausted. + const budgetAbort = new AbortController() + let runTokensUsed = enforcedGoal?.tokensUsed ?? 0 + let budgetTripped = false + const onStepEnd = enforcedGoal + ? (stats: { input: number; uncachedInput: number; output: number }) => { + if (budgetTripped) return + // Budget on new work only: uncached input + output. The display + // sum (`stats.input`) includes prompt-cache reads which re-count + // the entire conversation on every warm step and would exhaust + // any budget in a handful of steps. + runTokensUsed += stats.uncachedInput + stats.output + ctx.updateGoalUsage(runTokensUsed) + if ( + enforcedGoal.tokenBudget !== null && + runTokensUsed >= enforcedGoal.tokenBudget + ) { + budgetTripped = true + serverLog.info( + `[horton ${ctx.entityUrl}] goal budget exhausted (${runTokensUsed} tokens) — aborting run` + ) + budgetAbort.abort() + } + } + : undefined + ctx.useAgent({ systemPrompt: buildHortonSystemPrompt(sandboxCwd, { hasDocsSupport: Boolean(docsSupport), @@ -680,6 +834,7 @@ function createAssistantHandler(options: { modelId: String(modelConfig.model), hasEventSourceTools, hasScheduleTools, + ...(activeGoalPromptInfo && { activeGoal: activeGoalPromptInfo }), }), ...modelConfig, // mcp.tools() inserts sentinel objects that the runtime's @@ -687,8 +842,41 @@ function createAssistantHandler(options: { // useAgent doesn't model this, so cast at the boundary. tools: tools as AgentTool[], ...(streamFn && { streamFn }), + ...(onStepEnd && { onStepEnd }), }) - await ctx.agent.run() + try { + await ctx.agent.run(undefined, budgetAbort.signal) + } catch (err) { + if (!budgetTripped) throw err + // Swallow the AbortError when WE aborted for budget — the goal status + // flip below is the user-visible outcome. + serverLog.info( + `[horton ${ctx.entityUrl}] agent.run aborted by budget enforcement` + ) + } + // Persist accurate token usage from the in-memory accumulator. + // The steps collection round-trips back into the local DB + // asynchronously, so summing it post-run can undercount. + if (enforcedGoal) { + ctx.updateGoalUsage( + runTokensUsed, + budgetTripped ? { status: `budget_limited` } : undefined + ) + } + if (budgetTripped && enforcedGoal && enforcedGoal.tokenBudget !== null) { + const budget = enforcedGoal.tokenBudget + const suggestedNext = Math.max(budget * 2, budget + 10_000) + writeSlashCommandReply( + ctx, + `⚠️ Stopped — goal hit the token budget (${formatTokenCount( + runTokensUsed + )} / ${formatTokenCount( + budget + )} tokens used). Raise the budget with \`/goal set "..." --tokens ${formatTokenCount( + suggestedNext + )}\`, or call \`/goal complete\` to finalize.` + ) + } await titlePromise } } @@ -778,7 +966,10 @@ export function registerHorton( permission: `manage`, }, ], - slashCommands: buildSkillSlashCommands(skillsRegistry), + slashCommands: [ + GOAL_SLASH_COMMAND, + ...buildSkillSlashCommands(skillsRegistry), + ], handler: assistantHandler, }) diff --git a/packages/agents/test/horton-goal-enforcement.test.ts b/packages/agents/test/horton-goal-enforcement.test.ts new file mode 100644 index 0000000000..86e672344a --- /dev/null +++ b/packages/agents/test/horton-goal-enforcement.test.ts @@ -0,0 +1,209 @@ +import { describe, expect, it, vi } from 'vitest' +import { createEntityRegistry } from '@electric-ax/agents-runtime' +import { registerHorton } from '../src/agents/horton' +import type { GoalEntry } from '@electric-ax/agents-runtime' +import type { BuiltinModelCatalog } from '../src/model-catalog' + +const modelCatalog: BuiltinModelCatalog = { + defaultChoice: { + provider: `anthropic`, + id: `claude-sonnet-4-6`, + label: `Anthropic Claude Sonnet 4.6`, + value: `anthropic:claude-sonnet-4-6`, + reasoning: true, + input: [`text`, `image`], + }, + choices: [ + { + provider: `anthropic`, + id: `claude-sonnet-4-6`, + label: `Anthropic Claude Sonnet 4.6`, + value: `anthropic:claude-sonnet-4-6`, + reasoning: true, + input: [`text`, `image`], + }, + ], +} + +function goalEntry(overrides: Partial = {}): GoalEntry { + return { + id: `goal`, + objective: `ship X`, + status: `active`, + tokenBudget: 1_000, + tokensUsed: 0, + createdAt: `2026-01-01T00:00:00.000Z`, + updatedAt: `2026-01-01T00:00:00.000Z`, + ...overrides, + } as GoalEntry +} + +async function runHandler(goal: GoalEntry | undefined) { + const registry = createEntityRegistry() + registerHorton(registry, { workingDirectory: `/tmp`, modelCatalog }) + const def = registry.get(`horton`) + + const useAgent = vi.fn() + const updateGoalUsage = vi.fn() + const replyText = vi.fn() + // The run mock fires the captured onStepEnd (when wired) so the test can + // exercise the budget-trip path the way a real step boundary would. The + // budget accumulates uncachedInput + output — `input` (display sum incl. + // cache reads) must NOT count, which the active-goal assertion verifies. + const run = vi.fn(async () => { + const config = useAgent.mock.calls[0]?.[0] as + | { + onStepEnd?: (stats: { + input: number + uncachedInput: number + output: number + }) => void + } + | undefined + config?.onStepEnd?.({ input: 50_000, uncachedInput: 5_000, output: 100 }) + }) + + const fakeCtx = { + args: {}, + electricTools: [], + events: [], + firstWake: false, + tags: {}, + db: { collections: { inbox: { toArray: [] }, runs: { toArray: [] } } }, + sandbox: { + workingDirectory: `/work`, + readFile: vi.fn(async () => { + throw new Error(`ENOENT`) + }), + }, + slashCommands: { replaceOwned: vi.fn() }, + insertContext: vi.fn(), + removeContext: vi.fn(), + getContext: vi.fn(), + useContext: vi.fn(), + useAgent, + agent: { run }, + getGoal: vi.fn(() => goal), + updateGoalUsage, + replyText, + } as any + + await def!.definition.handler(fakeCtx, { type: `inbox` } as any) + const agentConfig = useAgent.mock.calls[0]?.[0] as { + onStepEnd?: unknown + systemPrompt: string + tools: Array<{ name?: string }> + } + const hasCompleteTool = agentConfig.tools.some( + (tool) => tool.name === `mark_goal_complete` + ) + return { agentConfig, hasCompleteTool, updateGoalUsage, replyText, run } +} + +describe(`horton goal enforcement gating`, () => { + it(`wires enforcement and trips the budget for an active goal`, async () => { + const { agentConfig, hasCompleteTool, updateGoalUsage, replyText } = + await runHandler(goalEntry({ status: `active`, tokenBudget: 1_000 })) + + expect(agentConfig.onStepEnd).toBeTypeOf(`function`) + expect(hasCompleteTool).toBe(true) + expect(agentConfig.systemPrompt).toContain(`Active goal`) + // The simulated 5.1k-token step exceeds the 1k budget → status flip + + // user-visible stop message. + expect(updateGoalUsage).toHaveBeenCalledWith(5_100, { + status: `budget_limited`, + }) + expect(replyText).toHaveBeenCalledWith( + expect.stringContaining(`token budget`) + ) + }) + + it(`does NOT wire enforcement for a budget_limited goal`, async () => { + const { agentConfig, hasCompleteTool, updateGoalUsage, replyText } = + await runHandler( + goalEntry({ status: `budget_limited`, tokensUsed: 1_500 }) + ) + + expect(agentConfig.onStepEnd).toBeUndefined() + expect(hasCompleteTool).toBe(false) + expect(agentConfig.systemPrompt).not.toContain(`Active goal`) + expect(updateGoalUsage).not.toHaveBeenCalled() + expect(replyText).not.toHaveBeenCalled() + }) + + it(`does NOT wire enforcement for a complete goal`, async () => { + const { agentConfig, updateGoalUsage } = await runHandler( + goalEntry({ status: `complete`, tokensUsed: 900 }) + ) + + expect(agentConfig.onStepEnd).toBeUndefined() + expect(agentConfig.systemPrompt).not.toContain(`Active goal`) + expect(updateGoalUsage).not.toHaveBeenCalled() + }) + + it(`does NOT wire enforcement when no goal exists`, async () => { + const { agentConfig, hasCompleteTool, updateGoalUsage } = + await runHandler(undefined) + + expect(agentConfig.onStepEnd).toBeUndefined() + expect(hasCompleteTool).toBe(false) + expect(updateGoalUsage).not.toHaveBeenCalled() + }) +}) + +describe(`/goal interception across wake payload shapes`, () => { + async function runWake(payload: unknown) { + const registry = createEntityRegistry() + registerHorton(registry, { workingDirectory: `/tmp`, modelCatalog }) + const def = registry.get(`horton`) + const useAgent = vi.fn() + const replyText = vi.fn() + const fakeCtx = { + args: {}, + electricTools: [], + events: [], + firstWake: false, + tags: {}, + db: { collections: { inbox: { toArray: [] }, runs: { toArray: [] } } }, + sandbox: { + workingDirectory: `/work`, + readFile: vi.fn(async () => { + throw new Error(`ENOENT`) + }), + }, + slashCommands: { replaceOwned: vi.fn() }, + insertContext: vi.fn(), + removeContext: vi.fn(), + getContext: vi.fn(), + useContext: vi.fn(), + useAgent, + agent: { run: vi.fn(async () => {}) }, + getGoal: vi.fn(() => goalEntry({ status: `active` })), + updateGoalUsage: vi.fn(), + markGoalComplete: vi.fn(), + clearGoal: vi.fn(), + setGoal: vi.fn(), + replyText, + } as any + await def!.definition.handler(fakeCtx, { type: `inbox`, payload } as any) + return { useAgent, replyText } + } + + it(`intercepts plain-text payloads`, async () => { + const { useAgent, replyText } = await runWake({ text: `/goal show` }) + expect(replyText).toHaveBeenCalledWith(expect.stringContaining(`Goal:`)) + expect(useAgent).not.toHaveBeenCalled() + }) + + it(`intercepts composer-structured payloads (source + nodes)`, async () => { + // Shape produced by the native composer (#4533): raw text in `source`, + // parsed slash-command nodes alongside. The /goal grammar dispatches on + // the raw source. + const { useAgent, replyText } = await runWake({ + source: `/goal show`, + nodes: [{ kind: `slash_command`, start: 0, end: 5, raw: `/goal` }], + }) + expect(replyText).toHaveBeenCalledWith(expect.stringContaining(`Goal:`)) + expect(useAgent).not.toHaveBeenCalled() + }) +}) diff --git a/packages/agents/test/horton-model-selection.test.ts b/packages/agents/test/horton-model-selection.test.ts index 14c399cd70..4e12671ced 100644 --- a/packages/agents/test/horton-model-selection.test.ts +++ b/packages/agents/test/horton-model-selection.test.ts @@ -97,7 +97,7 @@ describe(`horton model selection`, () => { events: [], firstWake: false, tags: {}, - db: { collections: { inbox: { toArray: [] } } }, + db: { collections: { inbox: { toArray: [] }, runs: { toArray: [] } } }, sandbox: { workingDirectory: `/work`, readFile: vi.fn(async () => { @@ -111,6 +111,8 @@ describe(`horton model selection`, () => { useContext: vi.fn(), useAgent, agent: { run }, + getGoal: vi.fn(() => undefined), + updateGoalUsage: vi.fn(), } as any await def!.definition.handler(fakeCtx, { type: `inbox` } as any) diff --git a/packages/agents/test/horton-tool-composition.test.ts b/packages/agents/test/horton-tool-composition.test.ts index 24f3f2c8f8..2f07870489 100644 --- a/packages/agents/test/horton-tool-composition.test.ts +++ b/packages/agents/test/horton-tool-composition.test.ts @@ -48,7 +48,7 @@ async function captureAgentConfig( events: [], firstWake: false, tags: {}, - db: { collections: { inbox: { toArray: [] } } }, + db: { collections: { inbox: { toArray: [] }, runs: { toArray: [] } } }, sandbox: { workingDirectory: `/work`, readFile: vi.fn(async () => { @@ -63,6 +63,11 @@ async function captureAgentConfig( useAgent, agent: { run: vi.fn(async () => {}) }, setTag: vi.fn(async () => {}), + // The assistant handler reads the active goal up front and wires + // budget enforcement on agent.run. Stub these to a no-goal state so + // the captured config reflects the tool-composition path only. + getGoal: vi.fn(() => undefined), + updateGoalUsage: vi.fn(), ...ctxOverrides, } as any await registry diff --git a/packages/agents/test/observe-pg-sync-tool.test.ts b/packages/agents/test/observe-pg-sync-tool.test.ts index fa7cbd0f87..3fbd10b2a1 100644 --- a/packages/agents/test/observe-pg-sync-tool.test.ts +++ b/packages/agents/test/observe-pg-sync-tool.test.ts @@ -118,7 +118,13 @@ describe(`observe_pg_sync tool`, () => { it(`is included in Horton's tool list`, () => { const tools = createHortonTools( { workingDirectory: `/tmp` } as any, - { send: vi.fn(), observe: vi.fn() } as any, + // getGoal: tool composition is goal-aware (mark_goal_complete is only + // registered for an active goal), so the ctx stub needs it present. + { + send: vi.fn(), + observe: vi.fn(), + getGoal: vi.fn(() => undefined), + } as any, new Set() )