diff --git a/.changeset/custom-collections.md b/.changeset/custom-collections.md new file mode 100644 index 0000000000..6e8b9c40d1 --- /dev/null +++ b/.changeset/custom-collections.md @@ -0,0 +1,13 @@ +--- +"@electric-ax/agents-runtime": patch +"@electric-ax/agents-server": patch +"@electric-ax/agents-server-ui": patch +--- + +Custom collections on entity streams. + +Server: entity types declare the custom collections they accept via `custom_collection_schemas: { : }` (same shape as `inbox_schemas` / `state_schemas`). `POST /_electric/entities/:type/:id/collections/:name` validates every write against the declared schema before appending; writes to a name the entity type did not declare are rejected with 422, and reserved built-in collection names (`BUILT_IN_COLLECTION_TYPES` exported from `@electric-ax/agents-runtime`) are rejected too so the runtime stays the sole writer of agent-managed collections. + +Runtime: `createEntityTimelineQuery` accepts an optional `customSource` query-builder branch. Callers shape their custom-collection rows into the `EntityTimelineCustomRow` envelope (`{ collection, key, order, value }`) and the runtime splices them into the same unionAll/orderBy pipeline as the built-in timeline collections, so consumers don't have to client-side merge a second source. `EntityDefinition.customCollectionSchemas` lets typed entity definitions declare schemas that the runtime forwards through entity-type registration. + +UI: session comments are wired on top of the generic mechanism — registered as a `comments` custom collection in the StreamDB customState, projected through the runtime's `customSource` branch, and reshaped back into a `comment` row variant for renderers. Comment surface (bubbles, reply affordances, comments-only tile view) is gated on whether the entity's type opted in to a `comment` custom collection schema. diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index 0d467fd7f1..faa453fbdf 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -1,4 +1,5 @@ export { createEntityStreamDB } from './entity-stream-db' +export { passthrough } from './entity-schema' export { createAgentsClient } from './agents-client' export { compareTimelineOrders, diff --git a/packages/agents-runtime/src/create-handler.ts b/packages/agents-runtime/src/create-handler.ts index c4008862d4..4176d13b59 100644 --- a/packages/agents-runtime/src/create-handler.ts +++ b/packages/agents-runtime/src/create-handler.ts @@ -503,6 +503,11 @@ export function createRuntimeRouter( ...(definition.inboxSchemas && { inbox_schemas: mapSchemas(definition.inboxSchemas), }), + ...(definition.customCollectionSchemas && { + custom_collection_schemas: mapSchemas( + definition.customCollectionSchemas + ), + }), ...(definition.slashCommands && { slash_commands: definition.slashCommands, }), diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index 7d70d3cef2..0cb7a2f53d 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -1131,6 +1131,10 @@ export const builtInCollections: EntityCollectionsDefinition = { export const entityStateSchema: StateSchema = createStateSchema(builtInCollections) +export const BUILT_IN_COLLECTION_TYPES: ReadonlySet = new Set( + Object.values(builtInCollections).map((collection) => collection.type) +) + // ============================================================================ // Management Event Guard // ============================================================================ diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index 0520982298..16a7ed5cca 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -14,6 +14,7 @@ import { import { caseWhen } from '@tanstack/db' import type { Collection, + Context, InitialQueryBuilder, QueryBuilder, } from '@tanstack/db' @@ -176,8 +177,34 @@ export interface EntityTimelineData { export type EntityTimelineInboxMode = `processed` | `all` +/** + * Envelope for custom-collection rows projected into the timeline. Each + * custom collection is an entity-type-declared shape (see + * `custom_collection_schemas` on the entity type); the caller pre-shapes + * its rows into `{ collection, key, order, value }` and the runtime + * weaves the source into the same unionAll/orderBy pipeline as the + * built-in timeline collections so there's a single ordered live query. + */ +export interface EntityTimelineCustomRow { + collection: string + key: string + order: TimelineOrder + value: Record +} + +/** + * Pre-shaped query-builder branch a caller supplies to project custom + * collections into the timeline. The builder's selected row shape must + * match `EntityTimelineCustomRow`. Multiple custom collections compose by + * unioning them client-side before passing the result as `customSource`. + */ +export type EntityTimelineCustomSource = QueryBuilder< + Context & { result: EntityTimelineCustomRow; hasResult: true } +> + export interface EntityTimelineQueryOptions { inboxMode?: EntityTimelineInboxMode + customSource?: EntityTimelineCustomSource } export interface EntityTimelineTextChunk { @@ -259,6 +286,7 @@ export type EntityTimelineQueryRow = wake?: undefined signal?: undefined manifest?: undefined + custom?: undefined } | { $key: string @@ -267,6 +295,7 @@ export type EntityTimelineQueryRow = wake?: undefined signal?: undefined manifest?: undefined + custom?: undefined } | { $key: string @@ -275,6 +304,7 @@ export type EntityTimelineQueryRow = wake: EntityTimelineWakeRow signal?: undefined manifest?: undefined + custom?: undefined } | { $key: string @@ -283,6 +313,7 @@ export type EntityTimelineQueryRow = wake?: undefined signal: EntityTimelineSignalRow manifest?: undefined + custom?: undefined } | { $key: string @@ -291,6 +322,16 @@ export type EntityTimelineQueryRow = wake?: undefined signal?: undefined manifest: ManifestEntry + custom?: undefined + } + | { + $key: string + inbox?: undefined + run?: undefined + wake?: undefined + signal?: undefined + manifest?: undefined + custom: EntityTimelineCustomRow } function normalizeTimelineRun(run: IncludesRun): IncludesRun { @@ -1376,36 +1417,51 @@ function buildEntityTimelineQuery( })), })) + const baseBranches = { + inbox: inboxSource, + run: runSource, + wake: wakeSource, + signal: signalSource, + manifest: db.collections.manifests, + } + const branches = opts.customSource + ? { ...baseBranches, custom: opts.customSource } + : baseBranches + return q - .unionAll({ - inbox: inboxSource, - run: runSource, - wake: wakeSource, - signal: signalSource, - manifest: db.collections.manifests, - }) - .orderBy(({ inbox, run, wake, signal, manifest }) => + .unionAll(branches) + .orderBy((args) => coalesce( - inbox.order, - run.order, - wake.order, - signal.order, - manifest._timeline_order, + args.inbox.order, + args.run.order, + args.wake.order, + args.signal.order, + args.manifest._timeline_order, + `custom` in args ? args.custom.order : undefined, `~` ) ) - .orderBy(({ inbox, run, wake, signal, manifest }) => + .orderBy((args) => coalesce( - caseWhen(inbox.key, `inbox`), - caseWhen(run.key, `run`), - caseWhen(wake.key, `wake`), - caseWhen(signal.key, `signal`), - caseWhen(manifest.key, `manifest`), + caseWhen(args.inbox.key, `inbox`), + caseWhen(args.run.key, `run`), + caseWhen(args.wake.key, `wake`), + caseWhen(args.signal.key, `signal`), + caseWhen(args.manifest.key, `manifest`), + `custom` in args ? caseWhen(args.custom.key, `custom`) : undefined, `` ) ) - .orderBy(({ inbox, run, wake, signal, manifest }) => - coalesce(inbox.key, run.key, wake.key, signal.key, manifest.key, ``) + .orderBy((args) => + coalesce( + args.inbox.key, + args.run.key, + args.wake.key, + args.signal.key, + args.manifest.key, + `custom` in args ? args.custom.key : undefined, + `` + ) ) } diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 3275e31be3..c8a00858a6 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -81,6 +81,7 @@ export type { } from './types' export { + BUILT_IN_COLLECTION_TYPES, ENTITY_COLLECTIONS, builtInCollections, entityStateSchema, @@ -181,6 +182,8 @@ export { export type { EntityTimelineData, EntityTimelineContentItem, + EntityTimelineCustomRow, + EntityTimelineCustomSource, EntityTimelineInboxMode, EntityTimelineQueryOptions, EntityTimelineQueryRow, diff --git a/packages/agents-runtime/src/tags.ts b/packages/agents-runtime/src/tags.ts index 291b0bdd97..7c0d234615 100644 --- a/packages/agents-runtime/src/tags.ts +++ b/packages/agents-runtime/src/tags.ts @@ -89,6 +89,10 @@ export const entityMembershipRowSchema = z.object({ type_revision: z.number().int().nullable().optional(), inbox_schemas: z.record(z.string(), z.unknown()).nullable().optional(), state_schemas: z.record(z.string(), z.unknown()).nullable().optional(), + custom_collection_schemas: z + .record(z.string(), z.unknown()) + .nullable() + .optional(), created_at: z.number(), updated_at: z.number(), }) diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index ec366ab670..f4009712dc 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -1151,6 +1151,10 @@ export interface EntityDefinition< creationSchema?: TCreationSchema inboxSchemas?: Record stateSchemas?: Record + customCollectionSchemas?: Record< + string, + StandardSchemaV1 | Readonly> + > permissionGrants?: ReadonlyArray slashCommands?: Array diff --git a/packages/agents-runtime/test/entity-timeline-custom-source.test.ts b/packages/agents-runtime/test/entity-timeline-custom-source.test.ts new file mode 100644 index 0000000000..5f8678acfb --- /dev/null +++ b/packages/agents-runtime/test/entity-timeline-custom-source.test.ts @@ -0,0 +1,232 @@ +import { describe, expect, it } from 'vitest' +import { createCollection, createLiveQueryCollection } from '@tanstack/db' +import { coalesce } from '@durable-streams/state/db' +import { createEntityTimelineQuery } from '../src/entity-timeline' + +let nextOffset = 1 +function offset(): { offset: string; subOffset: number } { + return { + offset: `0000000000000000_${(nextOffset++).toString().padStart(16, `0`)}`, + subOffset: 1, + } +} + +/** + * Minimal sync collection that exposes raw sync hooks the same way StreamDB + * does. The timeline query reacts to these collections, so registering a + * custom collection here is the closest end-to-end check that the + * `customSource` plumbing lands rows in the unioned, ordered timeline. + */ +function createSyncCollection< + T extends Record & { key: string }, +>(id: string) { + let begin: () => void + let write: (msg: { type: string; value: T }) => void + let commit: () => void + const collection = createCollection({ + id, + getKey: (item) => item.key, + sync: { + sync: (params: any) => { + begin = params.begin + write = params.write + commit = params.commit + params.markReady() + return () => {} + }, + }, + startSync: true, + gcTime: 0, + }) + return { + collection, + insert(value: T) { + begin!() + write!({ type: `insert`, value }) + commit!() + }, + } +} + +function emptyDbCollections() { + const take = () => offset() + return { + runs: createSyncCollection<{ key: string; status: string }>( + `runs-${take().offset}` + ), + texts: createSyncCollection<{ + key: string + run_id: string + status: string + }>(`texts-${take().offset}`), + textDeltas: createSyncCollection<{ + key: string + text_id: string + run_id: string + delta: string + }>(`textDeltas-${take().offset}`), + toolCalls: createSyncCollection<{ + key: string + tool_name: string + status: string + }>(`toolCalls-${take().offset}`), + steps: createSyncCollection<{ + key: string + run_id: string + step_number: number + status: string + }>(`steps-${take().offset}`), + errors: createSyncCollection<{ + key: string + run_id: string + error_code: string + message: string + }>(`errors-${take().offset}`), + inbox: createSyncCollection<{ + key: string + from: string + payload: unknown + timestamp: string + status: string + _timeline_order?: string + }>(`inbox-${take().offset}`), + wakes: createSyncCollection<{ + key: string + timestamp: string + source: string + timeout: boolean + changes: Array + _timeline_order?: string + }>(`wakes-${take().offset}`), + signals: createSyncCollection<{ + key: string + signal: string + status: string + timestamp: string + _timeline_order?: string + }>(`signals-${take().offset}`), + manifests: createSyncCollection<{ + key: string + kind: string + _timeline_order?: string + }>(`manifests-${take().offset}`), + } +} + +describe(`createEntityTimelineQuery customSource`, () => { + it(`interleaves custom collection rows with built-in rows by _timeline_order`, async () => { + const syncs = emptyDbCollections() + const comments = createSyncCollection<{ + key: string + body: string + from_principal: string + timestamp: string + _timeline_order?: string + }>(`comments-test`) + + const db = { + collections: { + runs: syncs.runs.collection, + texts: syncs.texts.collection, + textDeltas: syncs.textDeltas.collection, + toolCalls: syncs.toolCalls.collection, + steps: syncs.steps.collection, + errors: syncs.errors.collection, + inbox: syncs.inbox.collection, + wakes: syncs.wakes.collection, + signals: syncs.signals.collection, + manifests: syncs.manifests.collection, + }, + } as any + + const liveQuery = createLiveQueryCollection({ + query: (q) => + createEntityTimelineQuery(db, { + customSource: q + .from({ comment: comments.collection }) + .select(({ comment }) => ({ + collection: `comment` as const, + order: coalesce(comment._timeline_order, `~`), + key: comment.key, + value: comment, + })) as any, + })(q), + startSync: true, + }) + await liveQuery.preload() + + syncs.inbox.insert({ + key: `msg-1`, + from: `user`, + payload: `hi`, + timestamp: `2026-04-15T18:00:00Z`, + status: `processed`, + _timeline_order: `010`, + }) + comments.insert({ + key: `c-1`, + body: `nice`, + from_principal: `/principal/user%3Ame`, + timestamp: `2026-04-15T18:01:00Z`, + _timeline_order: `020`, + }) + syncs.inbox.insert({ + key: `msg-2`, + from: `user`, + payload: `bye`, + timestamp: `2026-04-15T18:02:00Z`, + status: `processed`, + _timeline_order: `030`, + }) + + await new Promise((resolve) => setTimeout(resolve, 80)) + + const rows = Array.from(liveQuery.entries()).map(([, v]: any) => v) + + expect(rows).toHaveLength(3) + expect(rows[0]).toMatchObject({ inbox: { key: `msg-1` } }) + expect(rows[1]).toMatchObject({ + custom: { collection: `comment`, key: `c-1`, value: { body: `nice` } }, + }) + expect(rows[2]).toMatchObject({ inbox: { key: `msg-2` } }) + }) + + it(`omits the custom slot when no customSource is provided`, async () => { + const syncs = emptyDbCollections() + const db = { + collections: { + runs: syncs.runs.collection, + texts: syncs.texts.collection, + textDeltas: syncs.textDeltas.collection, + toolCalls: syncs.toolCalls.collection, + steps: syncs.steps.collection, + errors: syncs.errors.collection, + inbox: syncs.inbox.collection, + wakes: syncs.wakes.collection, + signals: syncs.signals.collection, + manifests: syncs.manifests.collection, + }, + } as any + + const liveQuery = createLiveQueryCollection({ + query: (q) => createEntityTimelineQuery(db)(q), + startSync: true, + }) + await liveQuery.preload() + + syncs.inbox.insert({ + key: `msg-1`, + from: `user`, + payload: `hi`, + timestamp: `2026-04-15T18:00:00Z`, + status: `processed`, + _timeline_order: `010`, + }) + await new Promise((resolve) => setTimeout(resolve, 80)) + + const rows = Array.from(liveQuery.entries()).map(([, v]: any) => v) + expect(rows).toHaveLength(1) + expect(rows[0]).toMatchObject({ inbox: { key: `msg-1` } }) + expect(rows[0].custom).toBeUndefined() + }) +}) diff --git a/packages/agents-server-ui/src/components/AgentResponse.tsx b/packages/agents-server-ui/src/components/AgentResponse.tsx index 312e5d0fc5..314bc5a51c 100644 --- a/packages/agents-server-ui/src/components/AgentResponse.tsx +++ b/packages/agents-server-ui/src/components/AgentResponse.tsx @@ -1,4 +1,4 @@ -import { Check, Copy, GitFork } from 'lucide-react' +import { Check, Copy, GitFork, Reply } from 'lucide-react' import { memo, useEffect, @@ -385,6 +385,8 @@ export const AgentResponseLive = memo(function AgentResponseLive({ timestamp, renderWidth = 0, forkFromHere, + onReply, + onReplyToToolCall, onSearchTextChange, }: { rowKey: string @@ -393,6 +395,8 @@ export const AgentResponseLive = memo(function AgentResponseLive({ timestamp?: number | null renderWidth?: number forkFromHere?: ForkFromHereAction + onReply?: () => void + onReplyToToolCall?: (item: EntityTimelineToolCallItem) => void onSearchTextChange?: (rowKey: string, text: string) => void }): React.ReactElement { const { data: items = [] } = useLiveQuery( @@ -502,6 +506,11 @@ export const AgentResponseLive = memo(function AgentResponseLive({ onReplyToToolCall(item.toolCall!) + : undefined + } /> ) })} @@ -551,6 +560,7 @@ export const AgentResponseLive = memo(function AgentResponseLive({ copied={copied} onCopy={() => void copyResponseText()} forkFromHere={done ? forkFromHere : undefined} + onReply={onReply} /> @@ -562,20 +572,37 @@ function ResponseMetaActions({ copied, onCopy, forkFromHere, + onReply, }: { showCopy: boolean copied: boolean onCopy: () => void forkFromHere?: ForkFromHereAction + onReply?: () => void }): React.ReactElement | null { const showFork = forkFromHere !== undefined - if (!showCopy && !showFork) return null + if (!showCopy && !showFork && !onReply) return null const forkDisabled = forkFromHere?.disabled === true || !forkFromHere?.onFork const forkLabel = forkDisabled ? `Fork permission required` : `Fork from here` return ( + {onReply && ( + + + + + + )} {showFork && ( @@ -622,12 +649,14 @@ export const AgentResponse = memo(function AgentResponse({ timestamp, renderWidth = 0, forkFromHere, + onReply, }: { section: AgentResponseSection isStreaming: boolean timestamp?: number | null renderWidth?: number forkFromHere?: ForkFromHereAction + onReply?: () => void }): React.ReactElement { const canCache = !isStreaming && section.done === true const [copied, setCopied] = useState(false) @@ -755,6 +784,7 @@ export const AgentResponse = memo(function AgentResponse({ copied={copied} onCopy={() => void copyResponseText()} forkFromHere={section.done ? forkFromHere : undefined} + onReply={onReply} /> diff --git a/packages/agents-server-ui/src/components/CommentBubble.module.css b/packages/agents-server-ui/src/components/CommentBubble.module.css new file mode 100644 index 0000000000..9beebc9298 --- /dev/null +++ b/packages/agents-server-ui/src/components/CommentBubble.module.css @@ -0,0 +1,176 @@ +.root { + display: flex; + width: 100%; + box-sizing: border-box; +} + +.root[data-own='true'] { + justify-content: flex-end; +} + +.root[data-own='false'] { + justify-content: flex-start; +} + +.column { + display: grid; + gap: 4px; + max-width: min(72%, 640px); +} + +.root[data-own='true'] .column { + justify-items: end; +} + +.root[data-own='false'] .column { + justify-items: start; +} + +.message { + display: grid; + gap: 4px; + width: fit-content; + max-width: 100%; +} + +.root[data-own='true'] .message { + justify-self: end; +} + +.root[data-own='false'] .message { + justify-self: start; +} + +.bubble { + min-width: 0; + padding: 9px 12px; + border: 1px solid #000; + border-radius: var(--ds-radius-4); + background: #000; + color: #fff; + box-shadow: var(--ds-shadow-1); +} + +.root[data-single-line='true'] .bubble { + border-radius: var(--ds-radius-5); + padding-block: 7px; +} + +:global(html[data-theme='dark']) .bubble { + border-color: #fff; + background: #fff; + color: #000; +} + +.body, +.deletedBody { + white-space: pre-wrap; + overflow-wrap: anywhere; + font-size: var(--ds-chat-text); + line-height: var(--ds-chat-text-lh); +} + +.deletedBody { + color: inherit; + opacity: 0.64; + font-style: italic; +} + +.preview { + display: flex; + align-items: flex-start; + gap: 2px; + max-width: 100%; + margin: 0; + padding: 0; + border: 0; + background: transparent; + color: var(--ds-text-3); + font: inherit; + text-align: left; +} + +.previewButton { + border-radius: var(--ds-radius-2); + cursor: pointer; +} + +.previewButton:hover .previewContent { + border-left-color: var(--ds-text-3); +} + +.previewButton:focus-visible { + outline: 2px solid var(--ds-focus-ring); + outline-offset: 2px; +} + +.previewIcon { + flex: 0 0 auto; + margin-top: 1px; + color: var(--ds-text-4); +} + +.previewContent { + display: grid; + gap: 2px; + min-width: 0; + padding-left: 6px; + border-left: 2px solid var(--ds-gray-a8); +} + +.previewLabel, +.previewText { + min-width: 0; + overflow: hidden; + text-overflow: ellipsis; +} + +.previewLabel { + white-space: nowrap; + font-size: var(--ds-text-xs); + font-weight: 600; + line-height: var(--ds-text-xs-lh); +} + +.previewText { + display: -webkit-box; + -webkit-box-orient: vertical; + -webkit-line-clamp: 2; + font-size: var(--ds-text-xs); + line-height: var(--ds-text-xs-lh); +} + +.meta { + display: inline-flex; + align-items: center; + gap: 6px; + width: 100%; + box-sizing: border-box; + padding-inline: 8px; +} + +.meta > :not(.metaActions) { + opacity: 0.5; +} + +.metaActions { + margin-left: auto; + display: inline-flex; + align-items: center; + gap: 2px; +} + +.metaActionButton { + color: var(--ds-text-4); + opacity: 0.7; +} + +.metaActionButton:hover { + opacity: 1; +} + +@media (max-width: 700px) { + .column { + max-width: min(88%, 640px); + } +} diff --git a/packages/agents-server-ui/src/components/CommentBubble.tsx b/packages/agents-server-ui/src/components/CommentBubble.tsx new file mode 100644 index 0000000000..2fcf830078 --- /dev/null +++ b/packages/agents-server-ui/src/components/CommentBubble.tsx @@ -0,0 +1,180 @@ +import { memo } from 'react' +import { Reply } from 'lucide-react' +import type { + CommentSnapshot, + CommentTarget, + EntityTimelineCommentRow, +} from '../lib/comments' +import { Icon, IconButton, Text, Tooltip } from '../ui' +import { principalKeyFromInput } from '../lib/principals' +import { TimeText } from './TimeText' +import type { ElectricUser } from '../lib/ElectricAgentsProvider' +import styles from './CommentBubble.module.css' + +export const CommentBubble = memo(function CommentBubble({ + comment, + currentPrincipal, + usersById, + showMeta = true, + onReply, + onTargetClick, +}: { + comment: EntityTimelineCommentRow + currentPrincipal?: string + usersById?: Map + showMeta?: boolean + onReply?: (comment: EntityTimelineCommentRow) => void + onTargetClick?: (target: CommentTarget) => void +}): React.ReactElement { + const isOwn = + principalKeyFromInput(comment.from_principal) === + principalKeyFromInput(currentPrincipal) + const sender = formatSender(comment.from_principal, { + currentPrincipal, + usersById, + }) + const timestamp = Date.parse(comment.timestamp) + const deleted = Boolean(comment.deleted_at) + const singleLine = !deleted && !/[\r\n]/.test(comment.body) + + return ( +
+
+ {comment.target_snapshot && ( + onTargetClick(comment.reply_to!) + : undefined + } + /> + )} +
+
+
+ {deleted ? `Comment deleted` : comment.body} +
+
+ {showMeta && ( +
+ + {sender.label} + + {Number.isFinite(timestamp) && ( + <> + + - + + + + )} + {comment.edited_at && !deleted && ( + <> + + - + + + edited + + + )} + {onReply && !deleted && ( + + + onReply(comment)} + > + + + + + )} +
+ )} +
+
+
+ ) +}) + +function ReplyPreview({ + snapshot, + onClick, +}: { + snapshot: CommentSnapshot + onClick?: () => void +}): React.ReactElement { + const content = ( + <> + +
+
{snapshot.label}
+ {snapshot.text && ( +
{snapshot.text}
+ )} +
+ + ) + + if (onClick) { + return ( + + ) + } + + return
{content}
+} + +function formatSender( + from: string | null | undefined, + options: { + currentPrincipal?: string + usersById?: Map + } = {} +): { + label: string + title?: string +} { + const key = principalKeyFromInput(from) + if (!key) return { label: from || `user` } + if (key === principalKeyFromInput(options.currentPrincipal)) { + return { label: `Me`, title: key } + } + const colon = key.indexOf(`:`) + if (colon <= 0) return { label: key, title: key } + const kind = key.slice(0, colon) + const id = key.slice(colon + 1) + if (kind === `user`) { + const user = options.usersById?.get(id) + const label = user?.display_name || user?.email + if (label) return { label, title: key } + } + return { + label: `${kind}:${formatPrincipalId(id)}`, + title: key, + } +} + +function formatPrincipalId(id: string): string { + if (id.length <= 18) return id + return `${id.slice(0, 8)}...${id.slice(-6)}` +} diff --git a/packages/agents-server-ui/src/components/EntityTimeline.module.css b/packages/agents-server-ui/src/components/EntityTimeline.module.css index ca9e59c46d..3e38026550 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.module.css +++ b/packages/agents-server-ui/src/components/EntityTimeline.module.css @@ -191,6 +191,20 @@ box-sizing: border-box; } +.virtualRow[data-highlighted='true'] { + animation: targetPulse 1600ms ease-out; +} + +@keyframes targetPulse { + 0%, + 65% { + background: color-mix(in oklab, var(--ds-accent-a4) 70%, transparent); + } + 100% { + background: transparent; + } +} + /* Jump-to-bottom affordance — centered over the chat column so it relates to the conversation rather than the viewport edge. It stays mounted and toggles visibility with opacity/translate for a soft diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index 214b806066..286c5240aa 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -23,6 +23,7 @@ import { FileJson, GitBranch, Radio, + Reply, } from 'lucide-react' import { loadTimelineRowHeights, @@ -46,6 +47,7 @@ import { Icon, IconButton, ScrollArea, Stack, Text, Tooltip } from '../ui' import { UserMessage } from './UserMessage' import type { ForkFromHereAction, UserMessageAttachment } from './UserMessage' import { AgentResponseLive } from './AgentResponse' +import { CommentBubble } from './CommentBubble' import { InlineEventCard } from './InlineEventCard' import { InlineStatusBadge } from './InlineStatusBadge' import { @@ -57,13 +59,19 @@ import { formatChatTimestamp, } from '../lib/formatTime' import { readTextPayload } from '../lib/sendMessage' +import { principalKeyFromInput } from '../lib/principals' import styles from './EntityTimeline.module.css' import type { ElectricUser } from '../lib/ElectricAgentsProvider' import type { - EntityTimelineSection, + CommentTarget, EntityTimelineQueryRow, + SelectedCommentTarget, +} from '../lib/comments' +import type { + EntityTimelineSection, EntityTimelineRunItem, EntityTimelineRunRow, + EntityTimelineToolCallItem, IncludesEntity, Manifest, } from '@electric-ax/agents-runtime/client' @@ -72,6 +80,10 @@ import type { PaneFindAdapter, PaneFindMatch } from '../hooks/usePaneFind' type RenderTimelineRow = EntityTimelineQueryRow type WakeSection = Extract +export type TimelineRowAdjacency = { + previousRow?: EntityTimelineQueryRow + nextRow?: EntityTimelineQueryRow +} function renderRowKey(row: RenderTimelineRow): string { return row.$key @@ -212,7 +224,8 @@ class TimelineRowErrorBoundary extends Component< */ function estimateRowHeight( row: RenderTimelineRow | undefined, - contentWidth: number + contentWidth: number, + nextRow?: RenderTimelineRow ): number { if (!row) return 120 @@ -227,12 +240,16 @@ function estimateRowHeight( 1, Math.ceil(readInboxText(row.inbox.payload).length / charsPerLine) ) - return Math.max(64, 48 + lines * lineHeight) + timelineRowGap(row) + return Math.max(64, 48 + lines * lineHeight) + timelineRowGap(row, nextRow) + } + if (row.comment) { + const lines = Math.max(1, Math.ceil(row.comment.body.length / charsPerLine)) + return Math.max(58, 42 + lines * lineHeight) + timelineRowGap(row, nextRow) } if (row.wake || row.signal || row.manifest) { - return 76 + timelineRowGap(row) + return 76 + timelineRowGap(row, nextRow) } - return 120 + timelineRowGap(row) + return 120 + timelineRowGap(row, nextRow) } const BOTTOM_PIN_THRESHOLD = 8 @@ -242,10 +259,37 @@ const MANIFEST_ROW_GAP = 10 const ROW_SETTLE_MS = 500 type EntityStatus = NonNullable -function timelineRowGap(row: RenderTimelineRow): number { +function timelineRowGap( + row: RenderTimelineRow, + nextRow?: RenderTimelineRow +): number { + if (shouldCollapseCommentMeta(row, nextRow)) return 6 return row.manifest || row.wake || row.signal ? MANIFEST_ROW_GAP : ROW_GAP } +function isPlainCommentRow(row: RenderTimelineRow | undefined): boolean { + const comment = row?.comment + if (!comment) return false + return !comment.deleted_at && !comment.reply_to && !comment.target_snapshot +} + +function shouldCollapseCommentMeta( + row: RenderTimelineRow | undefined, + nextRow: RenderTimelineRow | undefined +): boolean { + if (!isPlainCommentRow(row) || !isPlainCommentRow(nextRow)) return false + const principal = principalKeyFromInput(row?.comment?.from_principal) + if (!principal) return false + return principal === principalKeyFromInput(nextRow?.comment?.from_principal) +} + +function shouldShowCommentMeta( + row: RenderTimelineRow, + nextRow: RenderTimelineRow | undefined +): boolean { + return !shouldCollapseCommentMeta(row, nextRow) +} + type TimelinePaneFindMatch = PaneFindMatch & { rowKey: string rowIndex: number @@ -256,6 +300,7 @@ function timelineRowSearchText( row: RenderTimelineRow, runSearchTextByKey: Map ): string { + if (row.comment) return row.comment.body if (row.inbox) return readInboxText(row.inbox.payload) if (row.wake) { return wakeSectionText({ @@ -270,6 +315,7 @@ function timelineRowSearchText( } function timelineRowLabel(row: RenderTimelineRow): string { + if (row.comment) return `Comment` if (row.inbox?.from_agent) return `Agent message` if (row.inbox) return `User message` if (row.wake) return `Wake` @@ -278,6 +324,160 @@ function timelineRowLabel(row: RenderTimelineRow): string { return `Agent response` } +function truncateCommentPreview(text: string, maxLength = 280): string { + const compact = text.replace(/\s+/g, ` `).trim() + return compact.length <= maxLength + ? compact + : `${compact.slice(0, maxLength - 3)}...` +} + +function createReplyTargetForRow( + row: RenderTimelineRow, + runSearchTextByKey: Map +): SelectedCommentTarget | null { + if (row.comment) { + return { + target: { kind: `comment`, key: row.comment.key }, + snapshot: { + label: `Comment`, + text: truncateCommentPreview(row.comment.body), + from: row.comment.from_principal, + timestamp: row.comment.timestamp, + collection: `comment`, + }, + } + } + + if (row.inbox) { + return { + target: { kind: `timeline`, collection: `inbox`, key: row.inbox.key }, + snapshot: { + label: row.inbox.from_agent ? `Agent message` : `User message`, + text: truncateCommentPreview(readInboxText(row.inbox.payload)), + from: row.inbox.from, + timestamp: row.inbox.timestamp, + collection: `inbox`, + }, + } + } + + if (row.run) { + return { + target: { kind: `timeline`, collection: `run`, key: row.run.key }, + snapshot: { + label: `Assistant response`, + text: truncateCommentPreview( + runSearchTextByKey.get(row.$key) ?? runSearchTextFromSnapshot(row.run) + ), + collection: `run`, + }, + } + } + + if (row.wake) { + return { + target: { kind: `timeline`, collection: `wake`, key: row.wake.key }, + snapshot: { + label: `Wake`, + text: truncateCommentPreview(stringifyPayload(row.wake.payload)), + timestamp: row.wake.payload.timestamp, + collection: `wake`, + }, + } + } + + if (row.signal) { + return { + target: { kind: `timeline`, collection: `signal`, key: row.signal.key }, + snapshot: { + label: `Signal`, + text: truncateCommentPreview(signalSearchText(row.signal)), + timestamp: row.signal.timestamp, + collection: `signal`, + }, + } + } + + if (row.manifest) { + return { + target: { + kind: `timeline`, + collection: `manifest`, + key: row.manifest.key, + }, + snapshot: { + label: manifestKindLabel(row.manifest), + text: truncateCommentPreview(manifestSearchText(row.manifest)), + collection: `manifest`, + }, + } + } + + return null +} + +function createReplyTargetForToolCall( + row: RenderTimelineRow, + toolCall: EntityTimelineToolCallItem +): SelectedCommentTarget { + const runId = row.run?.key ?? toolCall.run_id + return { + target: { + kind: `timeline`, + collection: `tool_call`, + key: toolCall.key, + ...(runId ? { run_id: runId } : {}), + }, + snapshot: { + label: `Tool call`, + text: truncateCommentPreview( + [ + toolCall.tool_name, + stringifySearchPayload(toolCall.args), + stringifySearchPayload(toolCall.result), + stringifySearchPayload(toolCall.error), + ] + .filter((text) => text.length > 0) + .join(` `) + ), + collection: `tool_call`, + }, + } +} + +function timelineRowMatchesCommentTarget( + row: RenderTimelineRow, + target: CommentTarget +): boolean { + if (target.kind === `comment`) { + return row.comment?.key === target.key + } + + switch (target.collection) { + case `inbox`: + return row.inbox?.key === target.key + case `run`: + return row.run?.key === target.key + case `wake`: + return row.wake?.key === target.key + case `signal`: + return row.signal?.key === target.key + case `manifest`: + return row.manifest?.key === target.key + case `text`: + case `tool_call`: { + const run = row.run + if (!run) return false + if (target.run_id && run.key === target.run_id) return true + return run.items.toArray.some((item) => + target.collection === `text` + ? item.text?.key === target.key + : item.toolCall?.key === target.key + ) + } + } +} + function firstSelfSendWakeChange( section: WakeSection, entityUrl?: string | null @@ -336,9 +536,11 @@ function wakeSectionText( function WakeTimelineRow({ section, entityUrl, + onReply, }: { section: WakeSection entityUrl?: string | null + onReply?: () => void }): React.ReactElement { const reason = wakeReason(section, entityUrl) const details = wakeDetails(section, entityUrl) @@ -350,7 +552,13 @@ function WakeTimelineRow({ icon={Radio} title="woke" summary={`${reason} · ${formatChatTimestamp(section.timestamp)}`} + actions={ + onReply ? ( + + ) : undefined + } defaultExpanded={false} + collapsible headerSurface >
@@ -375,9 +583,11 @@ function WakeTimelineRow({ function AgentInboxMessageRow({ inbox, entityUrl, + onReply, }: { inbox: NonNullable entityUrl?: string | null + onReply?: () => void }): React.ReactElement { const parsed = Date.parse(inbox.timestamp) const timestamp = Number.isFinite(parsed) ? parsed : Date.now() @@ -398,7 +608,16 @@ function AgentInboxMessageRow({ icon={Radio} title={isSelfSend ? `sent to itself` : `agent message`} summary={`${isSelfSend ? `self-send` : fromAgent} · ${formatChatTimestamp(timestamp)}`} + actions={ + onReply ? ( + + ) : undefined + } defaultExpanded={false} + collapsible headerSurface >
@@ -419,8 +638,10 @@ function AgentInboxMessageRow({ function SignalTimelineRow({ signal, + onReply, }: { signal: NonNullable + onReply?: () => void }): React.ReactElement { return (
@@ -428,6 +649,11 @@ function SignalTimelineRow({ icon={CircleStop} title={`signal ${signal.signal}`} summary={signalSummary(signal)} + actions={ + onReply ? ( + + ) : undefined + } headerSurface />
@@ -574,11 +800,13 @@ function ManifestTimelineRow({ manifest, entityUrl, entityStatus, + onReply, }: { manifest: Manifest entityUrl: string | null tileId: string | null entityStatus?: EntityStatus + onReply?: () => void }): React.ReactElement { const workspace = useOptionalWorkspace() const navigate = useNavigate() @@ -648,10 +876,17 @@ function ManifestTimelineRow({ ) : null + const replyAction = onReply ? ( + + ) : null const actions = - statusBadge || openAction ? ( + statusBadge || openAction || replyAction ? ( <> {statusBadge} + {replyAction} {openAction} ) : undefined @@ -891,6 +1126,32 @@ function titleCase(value: string): string { .join(` `) } +function TimelineReplyAction({ + label, + onReply, +}: { + label: string + onReply?: () => void +}): React.ReactElement | null { + if (!onReply) return null + return ( + + + + + + ) +} + function stableEntityUrlKey(urls: Iterable): string { return Array.from(new Set(urls)).sort().join(`\0`) } @@ -901,6 +1162,8 @@ function entityUrlsFromKey(key: string): Array { const TimelineRow = memo(function TimelineRow({ row, + previousRow, + nextRow, responseTimestamp, isInitialUserMessage, entityStopped, @@ -917,8 +1180,13 @@ const TimelineRow = memo(function TimelineRow({ onStopGeneration, onForkFromHere, onRunSearchTextChange, + onReplyToRow, + onReplyToToolCall, + onCommentTargetClick, }: { row: RenderTimelineRow + previousRow?: RenderTimelineRow + nextRow?: RenderTimelineRow responseTimestamp: number | null isInitialUserMessage: boolean entityStopped: boolean @@ -938,10 +1206,34 @@ const TimelineRow = memo(function TimelineRow({ * we just invoke. */ onForkFromHere?: ForkFromHereAction onRunSearchTextChange: (rowKey: string, text: string) => void + onReplyToRow?: () => void + onReplyToToolCall?: (toolCall: EntityTimelineToolCallItem) => void + onCommentTargetClick?: (target: CommentTarget) => void }): React.ReactElement { + void previousRow + + if (row.comment) { + return ( + onReplyToRow() : undefined} + onTargetClick={onCommentTargetClick} + /> + ) + } + if (row.inbox) { if (row.inbox.from_agent) { - return + return ( + + ) } const timestamp = Date.parse(row.inbox.timestamp) return ( @@ -961,6 +1253,7 @@ const TimelineRow = memo(function TimelineRow({ } stopPending={stopPending} onStop={onStopGeneration} + onReply={onReplyToRow} /> ) } @@ -974,12 +1267,13 @@ const TimelineRow = memo(function TimelineRow({ timestamp: Date.parse(row.wake.payload.timestamp), }} entityUrl={entityUrl} + onReply={onReplyToRow} /> ) } if (row.signal) { - return + return } if (row.manifest) { @@ -993,6 +1287,7 @@ const TimelineRow = memo(function TimelineRow({ ? entityStatusByUrl.get(getManifestEntityUrl(row.manifest)!) : undefined } + onReply={onReplyToRow} /> ) } @@ -1006,12 +1301,15 @@ const TimelineRow = memo(function TimelineRow({ renderWidth={renderWidth} forkFromHere={onForkFromHere} onSearchTextChange={onRunSearchTextChange} + onReply={onReplyToRow} + onReplyToToolCall={onReplyToToolCall} /> ) }) export function EntityTimeline({ rows, + rowAdjacency, loading, error, entityStopped, @@ -1024,8 +1322,13 @@ export function EntityTimeline({ stopPending = false, onStopGeneration, forkFromHereByRunKey, + onReplyToRow, + focusTarget, + onFocusTargetHandled, + onCommentTargetClick, }: { rows: Array + rowAdjacency?: Array loading: boolean error: string | null entityStopped: boolean @@ -1044,6 +1347,10 @@ export function EntityTimeline({ * the fork pointer and runs the fork → navigate flow. */ forkFromHereByRunKey?: Map + onReplyToRow?: (target: SelectedCommentTarget) => void + focusTarget?: CommentTarget | null + onFocusTargetHandled?: () => void + onCommentTargetClick?: (target: CommentTarget) => void }): React.ReactElement { const { entitiesCollection, runnersCollection, usersCollection } = useElectricAgents() @@ -1133,6 +1440,9 @@ export function EntityTimeline({ const spawnMarkerRef = useRef(null) const [showJumpToBottom, setShowJumpToBottom] = useState(false) const [showTopDivider, setShowTopDivider] = useState(false) + const [highlightedRowKey, setHighlightedRowKey] = useState( + null + ) const [runSearchTextByKey, setRunSearchTextByKey] = useState( () => new Map() ) @@ -1140,6 +1450,7 @@ export function EntityTimeline({ const lastMeasureAtRef = useRef(new Map()) const settledKeysRef = useRef(new Set()) const settleCheckTimerRef = useRef | null>(null) + const highlightTimerRef = useRef | null>(null) const handledScrollSignalRef = useRef(scrollToBottomSignal) const previousStreamingAgentKeyRef = useRef(null) const textColumnWidth = Math.max(0, contentWidth - CHAT_SURFACE_GUTTER) @@ -1309,7 +1620,14 @@ export function EntityTimeline({ estimateSize: (index) => cachedSizeMapRef.current.get( displayRows[index] ? renderRowKey(displayRows[index]!) : `` - ) ?? estimateRowHeight(displayRows[index], textColumnWidth), + ) ?? + estimateRowHeight( + displayRows[index], + textColumnWidth, + displayRows[index] + ? (rowAdjacency?.[index]?.nextRow ?? displayRows[index + 1]) + : undefined + ), getItemKey: (index) => displayRows[index] ? renderRowKey(displayRows[index]!) : index, gap: 0, @@ -1318,6 +1636,50 @@ export function EntityTimeline({ enabled: displayRows.length > 0, }) + const revealCommentTarget = useCallback( + (target: CommentTarget): boolean => { + const targetIndex = displayRows.findIndex((row) => + timelineRowMatchesCommentTarget(row, target) + ) + if (targetIndex < 0) return false + + const row = displayRows[targetIndex] + if (!row) return false + + const rowKey = renderRowKey(row) + isNearBottom.current = false + setShowJumpToBottom(true) + rowVirtualizer.scrollToIndex(targetIndex, { align: `center` }) + setHighlightedRowKey(rowKey) + + if (highlightTimerRef.current !== null) { + clearTimeout(highlightTimerRef.current) + } + highlightTimerRef.current = setTimeout(() => { + highlightTimerRef.current = null + setHighlightedRowKey((current) => (current === rowKey ? null : current)) + }, 1600) + + return true + }, + [displayRows, rowVirtualizer] + ) + + const handleCommentTargetClick = useCallback( + (target: CommentTarget) => { + if (revealCommentTarget(target)) return + onCommentTargetClick?.(target) + }, + [onCommentTargetClick, revealCommentTarget] + ) + + useEffect(() => { + if (!focusTarget) return + if (revealCommentTarget(focusTarget)) { + onFocusTargetHandled?.() + } + }, [focusTarget, onFocusTargetHandled, revealCommentTarget]) + const paneFindAdapter = useMemo(() => { const getHighlightRoot = (match: PaneFindMatch): HTMLElement | null => { if (!contentElement || !isTimelineFindMatch(match)) return null @@ -1605,6 +1967,9 @@ export function EntityTimeline({ if (settleCheckTimerRef.current !== null) { clearTimeout(settleCheckTimerRef.current) } + if (highlightTimerRef.current !== null) { + clearTimeout(highlightTimerRef.current) + } }, [] ) @@ -1704,6 +2069,12 @@ export function EntityTimeline({ {rowVirtualizer.getVirtualItems().map((virtualRow) => { const row = displayRows[virtualRow.index] const rowKey = renderRowKey(row) + const previousRow = + rowAdjacency?.[virtualRow.index]?.previousRow ?? + displayRows[virtualRow.index - 1] + const nextRow = + rowAdjacency?.[virtualRow.index]?.nextRow ?? + displayRows[virtualRow.index + 1] // Stable row key. The previous implementation appended // `:${contentWidth}` to force remount on every column-width @@ -1721,15 +2092,20 @@ export function EntityTimeline({ data-index={virtualRow.index} data-item-key={rowKey} data-pane-find-row-key={rowKey} + data-highlighted={ + highlightedRowKey === rowKey ? `true` : undefined + } className={styles.virtualRow} style={{ transform: `translateY(${virtualRow.start}px)`, - paddingBottom: timelineRowGap(row), + paddingBottom: timelineRowGap(row, nextRow), }} > { + const target = createReplyTargetForRow( + row, + runSearchTextByKey + ) + if (target) onReplyToRow(target) + } + : undefined + } + onReplyToToolCall={ + onReplyToRow && row.run + ? (toolCall) => + onReplyToRow( + createReplyTargetForToolCall(row, toolCall) + ) + : undefined + } />
diff --git a/packages/agents-server-ui/src/components/InlineEventCard.test.tsx b/packages/agents-server-ui/src/components/InlineEventCard.test.tsx new file mode 100644 index 0000000000..959ff16a53 --- /dev/null +++ b/packages/agents-server-ui/src/components/InlineEventCard.test.tsx @@ -0,0 +1,39 @@ +import { describe, expect, it } from 'vitest' +import { renderToStaticMarkup } from 'react-dom/server' +import { Wrench } from 'lucide-react' +import { InlineEventCard } from './InlineEventCard' + +function hasNestedButton(markup: string): boolean { + let buttonDepth = 0 + const buttonTag = /<\/?button\b[^>]*>/g + for (const match of markup.matchAll(buttonTag)) { + const tag = match[0] + if (tag.startsWith(` 0) return true + buttonDepth += 1 + } + return false +} + +describe(`InlineEventCard`, () => { + it(`keeps header actions outside the expandable header button`, () => { + const markup = renderToStaticMarkup( + Reply} + collapsible + defaultExpanded + > +
result
+
+ ) + + expect(hasNestedButton(markup)).toBe(false) + expect(markup).toContain(`aria-label="Collapse details"`) + }) +}) diff --git a/packages/agents-server-ui/src/components/InlineEventCard.tsx b/packages/agents-server-ui/src/components/InlineEventCard.tsx index 2956052305..9d7691ff8c 100644 --- a/packages/agents-server-ui/src/components/InlineEventCard.tsx +++ b/packages/agents-server-ui/src/components/InlineEventCard.tsx @@ -32,7 +32,17 @@ export function InlineEventCard({ const [expanded, setExpanded] = useState(defaultExpanded) const showBody = children !== undefined && (!expandable || expanded) const headerOnly = children === undefined - const headerContent = ( + const toggle = () => setExpanded((value) => !value) + const toggleIcon = expandable ? ( + + ) : null + const headerLeadContent = ( <> {summary ? {summary} : null} {badge} + + ) + const headerContent = ( + <> + {headerLeadContent} {actions ? ( {actions} ) : null} - {expandable ? ( - - ) : null} + {toggleIcon} ) @@ -67,10 +74,36 @@ export function InlineEventCard({ className={toolBlock.card} data-header-surface={headerOnly || headerSurface ? `true` : undefined} > - {expandable ? ( + {expandable && actions ? ( + + + {actions} + + + ) : expandable ? (
+ ) : isCommentMode && commentTarget ? ( +
+
+ + {replyPreviewLabel} + + {replyPreviewText && ( + + {replyPreviewText} + + )} +
+ +
) : null } attachments={ - imageAttachmentsEnabled ? ( + imageAttachmentsEnabled && !isCommentMode ? ( + + + )} ) } + +function formatReplyBannerLabel(target: SelectedCommentTarget | null): string { + const label = target?.snapshot.label.trim() + if (!label) return `Reply` + return `Reply to ${label.charAt(0).toLowerCase()}${label.slice(1)}` +} diff --git a/packages/agents-server-ui/src/components/ToolCallView.module.css b/packages/agents-server-ui/src/components/ToolCallView.module.css index 6424f7f4a4..69df58a083 100644 --- a/packages/agents-server-ui/src/components/ToolCallView.module.css +++ b/packages/agents-server-ui/src/components/ToolCallView.module.css @@ -18,6 +18,19 @@ white-space: pre-wrap; } +.actionButton { + flex-shrink: 0; + width: 22px; + height: 22px; + border-radius: 4px; + color: var(--ds-gray-11); +} + +.actionButton:hover { + background: var(--ds-gray-a5); + color: var(--ds-gray-12); +} + .diffBlock { padding: 0; white-space: pre; diff --git a/packages/agents-server-ui/src/components/ToolCallView.tsx b/packages/agents-server-ui/src/components/ToolCallView.tsx index 30ce7c7f55..d736e58c8c 100644 --- a/packages/agents-server-ui/src/components/ToolCallView.tsx +++ b/packages/agents-server-ui/src/components/ToolCallView.tsx @@ -1,6 +1,6 @@ -import { Wrench } from 'lucide-react' +import { Reply, Wrench } from 'lucide-react' import type { EntityTimelineContentItem } from '@electric-ax/agents-runtime/client' -import { Badge, Stack, Text } from '../ui' +import { Badge, Icon, IconButton, Stack, Text, Tooltip } from '../ui' import type { BadgeTone } from '../ui' import { InlineEventCard } from './InlineEventCard' import { InlineStatusBadge } from './InlineStatusBadge' @@ -257,9 +257,28 @@ function ToolBody({ item }: { item: ToolCallItem }): React.ReactElement { export function ToolCallView({ item, + onReply, }: { item: ToolCallItem + onReply?: () => void }): React.ReactElement { + const replyAction = onReply ? ( + + + + + + ) : undefined + // send_message: same container style but always expanded with the message text if (item.toolName === `send_message` && typeof item.args.text === `string`) { const badge = statusBadge(item) @@ -270,6 +289,7 @@ export function ToolCallView({ title="send_message" titleFont="mono" collapsible={false} + actions={replyAction} badge={ badge ? ( @@ -299,6 +319,8 @@ export function ToolCallView({ titleFont="mono" summary={summary} defaultExpanded={shouldDefaultExpand} + collapsible + actions={replyAction} badge={ badge ? ( {badge.label} diff --git a/packages/agents-server-ui/src/components/UserMessage.module.css b/packages/agents-server-ui/src/components/UserMessage.module.css index c6eeed92c4..ffb672fcdd 100644 --- a/packages/agents-server-ui/src/components/UserMessage.module.css +++ b/packages/agents-server-ui/src/components/UserMessage.module.css @@ -77,6 +77,26 @@ } } +.meta { + width: 100%; +} + +.metaActions { + margin-left: auto; + display: inline-flex; + align-items: center; + gap: 2px; +} + +.metaActionButton { + color: var(--ds-text-4); + opacity: 0.7; +} + +.metaActionButton:hover { + opacity: 1; +} + .body { font-size: var(--ds-chat-text); line-height: var(--ds-chat-text-lh); @@ -205,9 +225,12 @@ } .meta { - opacity: 0.4; /* Match the bubble's 12px horizontal padding so the timestamp/sender * row aligns with the agent text column rather than with the wider * bubble background. */ padding-inline: 12px; } + +.meta > :not(.metaActions) { + opacity: 0.4; +} diff --git a/packages/agents-server-ui/src/components/UserMessage.tsx b/packages/agents-server-ui/src/components/UserMessage.tsx index f3317142d0..5ef2c70a47 100644 --- a/packages/agents-server-ui/src/components/UserMessage.tsx +++ b/packages/agents-server-ui/src/components/UserMessage.tsx @@ -4,10 +4,11 @@ import { Download, File as FileIcon, Image as ImageIcon, + Reply, Square, } from 'lucide-react' import type { EntityTimelineSection } from '@electric-ax/agents-runtime/client' -import { Icon, Stack, Text } from '../ui' +import { Icon, IconButton, Stack, Text, Tooltip } from '../ui' import { downloadAttachment, formatAttachmentSize } from '../lib/attachments' import { streamdownComponents, @@ -49,6 +50,7 @@ export const UserMessage = memo(function UserMessage({ currentPrincipal, usersById, onStop, + onReply, }: { section: UserMessageSection attachments?: Array @@ -57,6 +59,7 @@ export const UserMessage = memo(function UserMessage({ currentPrincipal?: string usersById?: Map onStop?: () => void + onReply?: () => void }): React.ReactElement { const sender = formatSender(section.from, { currentPrincipal, usersById }) @@ -112,6 +115,23 @@ export const UserMessage = memo(function UserMessage({ )} + {onReply && ( + + + + + + + + )} ) diff --git a/packages/agents-server-ui/src/components/toolBlock.module.css b/packages/agents-server-ui/src/components/toolBlock.module.css index dca3ea9c04..4ef2aea5cd 100644 --- a/packages/agents-server-ui/src/components/toolBlock.module.css +++ b/packages/agents-server-ui/src/components/toolBlock.module.css @@ -92,6 +92,59 @@ background: var(--ds-accent-a2); } +.headerWithActions { + padding: 0; + gap: 0; +} + +.headerContentToggle { + display: flex; + align-items: center; + gap: 8px; + min-width: 0; + flex: 1 1 auto; + align-self: stretch; + padding: 7px 0 7px 10px; + background: none; + border: none; + color: inherit; + font: inherit; + text-align: left; + cursor: pointer; + outline: none; + transition: background 0.08s ease; +} + +.headerContentToggle:hover, +.headerChevronButton:hover { + background: var(--ds-bg-hover); +} + +.headerContentToggle:focus-visible, +.headerChevronButton:focus-visible { + background: var(--ds-accent-a2); +} + +.headerChevronButton { + display: inline-flex; + align-items: center; + justify-content: center; + width: 32px; + align-self: stretch; + flex-shrink: 0; + padding: 0; + background: none; + border: none; + color: inherit; + cursor: pointer; + outline: none; + transition: background 0.08s ease; +} + +.headerChevronButton .toggleArrow { + margin-left: 0; +} + /* Chevron lives in a fixed-size 16px slot so the row height doesn't jiggle when expand/collapse swaps glyphs. */ .toggleArrow { @@ -130,6 +183,11 @@ margin-left: 0; } +.headerWithActions .headerActions { + margin-left: 0; + margin-right: 4px; +} + /* Tool name is the only mono token in the row — it's an identifier, so reading it as code helps. Summary + everything else stays in the body font for legibility at small sizes. */ diff --git a/packages/agents-server-ui/src/components/views/ChatView.test.ts b/packages/agents-server-ui/src/components/views/ChatView.test.ts new file mode 100644 index 0000000000..32591e35c7 --- /dev/null +++ b/packages/agents-server-ui/src/components/views/ChatView.test.ts @@ -0,0 +1,126 @@ +import { describe, expect, it } from 'vitest' +import { + buildCommentsTimeline, + commentFocusViewParams, + decodeCommentTargetParam, +} from './ChatView' +import type { CommentTarget, EntityTimelineQueryRow } from '../../lib/comments' + +function commentRow( + key: string, + fromPrincipal = `/principal/user%3Ame` +): EntityTimelineQueryRow { + return { + $key: `comment:${key}`, + comment: { + key, + order: key, + body: key, + from_principal: fromPrincipal, + timestamp: `2026-04-15T18:00:00.000Z`, + }, + } as EntityTimelineQueryRow +} + +function wakeRow(key: string): EntityTimelineQueryRow { + return { + $key: `wake:${key}`, + wake: { + key, + order: key, + payload: { + type: `wake`, + timestamp: `2026-04-15T18:00:00.000Z`, + source: `/chat/test`, + timeout: false, + changes: [], + }, + }, + } as EntityTimelineQueryRow +} + +function attachmentRow(key: string): EntityTimelineQueryRow { + return { + $key: `manifest:${key}`, + manifest: { + key, + kind: `attachment`, + id: key, + streamPath: `/chat/test/attachments/${key}`, + status: `complete`, + subject: { type: `inbox`, key: `msg-1` }, + mimeType: `text/plain`, + byteLength: 12, + createdAt: `2026-04-15T18:00:00.000Z`, + }, + } as EntityTimelineQueryRow +} + +describe(`buildCommentsTimeline`, () => { + it(`keeps comments in stream order while using full-timeline adjacency`, () => { + const first = commentRow(`first`) + const wake = wakeRow(`wake-1`) + const second = commentRow(`second`) + const third = commentRow(`third`) + const attachment = attachmentRow(`att-1`) + const fourth = commentRow(`fourth`) + + const timeline = buildCommentsTimeline([ + first, + wake, + second, + third, + attachment, + fourth, + ]) + + expect(timeline.rows.map((row) => row.comment?.key)).toEqual([ + `first`, + `second`, + `third`, + `fourth`, + ]) + expect(timeline.adjacency[0]).toEqual({ + previousRow: undefined, + nextRow: wake, + }) + expect(timeline.adjacency[1]).toEqual({ + previousRow: wake, + nextRow: third, + }) + expect(timeline.adjacency[2]).toEqual({ + previousRow: second, + nextRow: fourth, + }) + expect(timeline.adjacency[3]).toEqual({ + previousRow: third, + }) + }) +}) + +describe(`comment focus view params`, () => { + it(`round-trips timeline targets for comments-view navigation`, () => { + const target: CommentTarget = { + kind: `timeline`, + collection: `tool_call`, + key: `tool-call-1`, + run_id: `run-1`, + } + + const params = commentFocusViewParams(target) + + expect(decodeCommentTargetParam(params.focus)).toEqual(target) + }) + + it(`rejects invalid encoded target collections`, () => { + const encoded = encodeURIComponent( + JSON.stringify({ + kind: `timeline`, + collection: `unknown`, + key: `thing-1`, + }) + ) + + expect(decodeCommentTargetParam(encoded)).toBeNull() + }) +}) diff --git a/packages/agents-server-ui/src/components/views/ChatView.tsx b/packages/agents-server-ui/src/components/views/ChatView.tsx index 0f64c6c239..eb3e07ae6d 100644 --- a/packages/agents-server-ui/src/components/views/ChatView.tsx +++ b/packages/agents-server-ui/src/components/views/ChatView.tsx @@ -2,18 +2,24 @@ import { useCallback, useEffect, useMemo, useState } from 'react' import { useNavigate } from '@tanstack/react-router' import { eq, useLiveQuery } from '@tanstack/react-db' import { useEntityTimeline } from '../../hooks/useEntityTimeline' -import { EntityTimeline } from '../EntityTimeline' +import { EntityTimeline, type TimelineRowAdjacency } from '../EntityTimeline' import { MessageInput } from '../MessageInput' import { EntityContextDrawer } from '../EntityContextDrawer' import { useElectricAgents } from '../../lib/ElectricAgentsProvider' +import { useWorkspace } from '../../hooks/useWorkspace' +import { isAttachmentManifest } from '../../lib/attachments' import { schemaModelSupportsImageInput } from '../../lib/modelCapabilities' +import type { + CommentTarget, + EntityTimelineQueryRow, + SelectedCommentTarget, +} from '../../lib/comments' import { useEntityPermission, useEntityPermissions, type EntityPermission, } from '../../hooks/useEntityPermission' import type { ViewProps } from '../../lib/workspace/viewRegistry' -import type { EntityTimelineQueryRow } from '@electric-ax/agents-runtime/client' import type { EventPointer } from '@electric-ax/agents-runtime' import type { OptimisticInboxMessage } from '../../lib/sendMessage' import type { SlashCommandRow } from '@electric-ax/agents-runtime/client' @@ -24,6 +30,95 @@ const CHAT_VIEW_PERMISSIONS: ReadonlyArray = [ `signal`, `fork`, ] +const COMMENT_FOCUS_PARAM = `focus` +const COMMENT_TARGET_COLLECTIONS = new Set([ + `inbox`, + `run`, + `text`, + `tool_call`, + `wake`, + `signal`, + `manifest`, +]) + +export function encodeCommentTargetParam(target: CommentTarget): string { + return encodeURIComponent(JSON.stringify(target)) +} + +export function decodeCommentTargetParam( + value: string | undefined +): CommentTarget | null { + if (!value) return null + try { + const decoded = JSON.parse(decodeURIComponent(value)) as unknown + if (!isCommentTarget(decoded)) return null + return decoded + } catch { + return null + } +} + +export function commentFocusViewParams( + target: CommentTarget +): Record { + return { [COMMENT_FOCUS_PARAM]: encodeCommentTargetParam(target) } +} + +function isCommentTarget(value: unknown): value is CommentTarget { + if (!value || typeof value !== `object`) return false + const target = value as Partial + if (target.kind === `comment`) { + return typeof target.key === `string` + } + if (target.kind !== `timeline`) return false + const timelineTarget = target as Partial< + Extract + > + return ( + typeof timelineTarget.key === `string` && + typeof timelineTarget.collection === `string` && + COMMENT_TARGET_COLLECTIONS.has(timelineTarget.collection) && + (timelineTarget.run_id === undefined || + typeof timelineTarget.run_id === `string`) + ) +} + +export function buildCommentsTimeline( + timelineRows: Array +): { + rows: Array + adjacency: Array +} { + const rows: Array = [] + const adjacency: Array = [] + let previousRenderableRow: EntityTimelineQueryRow | undefined + let pendingCommentAdjacencyIndex: number | null = null + + for (const row of timelineRows) { + if (isAttachmentManifest(row.manifest)) continue + + if (pendingCommentAdjacencyIndex !== null) { + const pendingAdjacency = adjacency[pendingCommentAdjacencyIndex]! + adjacency[pendingCommentAdjacencyIndex] = { + ...pendingAdjacency, + nextRow: row, + } + pendingCommentAdjacencyIndex = null + } + + if (row.comment) { + rows.push(row) + adjacency.push({ + previousRow: previousRenderableRow, + }) + pendingCommentAdjacencyIndex = adjacency.length - 1 + } + + previousRenderableRow = row + } + + return { rows, adjacency } +} /** * The default view: chat / timeline + message composer. @@ -40,6 +135,7 @@ export function ChatView({ entityStopped, isSpawning, tileId, + viewParams, }: ViewProps): React.ReactElement { // While `spawning`, the entity has no inbox yet — `connectUrl` is null // so `useEntityTimeline` doesn't try to subscribe and we render an empty @@ -54,6 +150,7 @@ export function ChatView({ entityStopped={entityStopped} isSpawning={isSpawning} tileId={tileId} + viewParams={viewParams} /> ) } @@ -171,6 +268,84 @@ export function ChatLogView({ ) } +export function CommentsView({ + baseUrl, + entityUrl, + entity, + entityStopped, + isSpawning, + tileId, +}: ViewProps): React.ReactElement { + const connectUrl = isSpawning ? null : entityUrl + const { timelineRows, entities, db, loading, error } = useEntityTimeline( + baseUrl || null, + connectUrl + ) + const navigate = useNavigate() + const { helpers } = useWorkspace() + const canWrite = useEntityPermission(entity, `write`) + const [sentCommentSignal, setSentCommentSignal] = useState(0) + const [selectedCommentTarget, setSelectedCommentTarget] = + useState(null) + const commentsTimeline = useMemo( + () => buildCommentsTimeline(timelineRows), + [timelineRows] + ) + + useEffect(() => { + if (error && !isSpawning) { + void navigate({ to: `/` }) + } + }, [error, navigate, isSpawning]) + + useEffect(() => { + setSelectedCommentTarget(null) + }, [connectUrl]) + + const openFullTimelineTarget = useCallback( + (target: CommentTarget) => { + helpers.setTileView(tileId, `chat`, { + viewParams: commentFocusViewParams(target), + }) + }, + [helpers, tileId] + ) + + return ( + <> + + setSelectedCommentTarget(null)} + onSend={() => setSentCommentSignal((value) => value + 1)} + /> + + ) +} + function GenericChatBody({ baseUrl, entityUrl, @@ -178,6 +353,7 @@ function GenericChatBody({ entityStopped, isSpawning, tileId, + viewParams, }: { baseUrl: string entityUrl: string | null @@ -185,6 +361,7 @@ function GenericChatBody({ entityStopped: boolean isSpawning: boolean tileId: string + viewParams?: ViewProps[`viewParams`] }): React.ReactElement { const { timelineRows, @@ -202,8 +379,11 @@ function GenericChatBody({ const canSignal = permissions.signal const canFork = permissions.fork const navigate = useNavigate() + const { helpers } = useWorkspace() const [sentMessageSignal, setSentMessageSignal] = useState(0) const [stopPending, setStopPending] = useState(false) + const [selectedCommentTarget, setSelectedCommentTarget] = + useState(null) const { data: matchingEntityTypes = [] } = useLiveQuery( (query) => { if (!entityTypesCollection) return undefined @@ -217,6 +397,13 @@ function GenericChatBody({ matchingEntityTypes[0]?.creation_schema, entity.spawn_args ) + // Comments are an opt-in custom collection: only show comment UI on + // entities whose type declared a `comment` schema. On other entity + // types the server would reject the writes anyway, so we just hide + // the surface. + const commentsEnabled = Boolean( + matchingEntityTypes[0]?.custom_collection_schemas?.comment + ) const optimisticInlineInboxKeys = useMemo( () => new Set( @@ -248,6 +435,29 @@ function GenericChatBody({ : timelineRows, [inlinePendingInbox, timelineRows] ) + const showComments = commentsEnabled && viewParams?.comments !== `hidden` + const displayTimelineRows = useMemo>( + () => + showComments + ? timelineRowsWithInlinePending + : timelineRowsWithInlinePending.filter((row) => !row.comment), + [showComments, timelineRowsWithInlinePending] + ) + const focusTarget = useMemo( + () => decodeCommentTargetParam(viewParams?.[COMMENT_FOCUS_PARAM]), + [viewParams] + ) + const clearFocusTarget = useCallback(() => { + if (!viewParams?.[COMMENT_FOCUS_PARAM]) return + const nextParams = { ...viewParams } + delete nextParams[COMMENT_FOCUS_PARAM] + helpers.setTileView(tileId, `chat`, { + viewParams: Object.keys(nextParams).length > 0 ? nextParams : undefined, + }) + }, [helpers, tileId, viewParams]) + useEffect(() => { + if (!showComments) setSelectedCommentTarget(null) + }, [showComments]) const drawerPendingInbox = inlinePendingInbox ? visiblePendingInbox.slice(1) : visiblePendingInbox @@ -308,7 +518,7 @@ function GenericChatBody({ if (!runOffsets) return undefined const map = new Map() let anchor: { rowKey: string; pointer: EventPointer } | null = null - for (const row of timelineRowsWithInlinePending) { + for (const row of displayTimelineRows) { if (row.run && row.run.status === `completed`) { const pointer = runOffsets.get(row.run.key) anchor = pointer ? { rowKey: row.$key, pointer } : null @@ -338,24 +548,17 @@ function GenericChatBody({ } } return map - }, [ - timelineRowsWithInlinePending, - canFork, - db, - forkEntity, - entityUrl, - navigate, - ]) + }, [displayTimelineRows, canFork, db, forkEntity, entityUrl, navigate]) return ( <> setSelectedCommentTarget(null)} drawer={(pending) => ( setMenuOpen(false) /** Wraps a handler so it dispatches and then closes the menu. */ @@ -272,6 +275,15 @@ export function SplitMenu({ void navigator.clipboard.writeText(url.toString()) } + const setChatCommentsVisible = (visible: boolean) => { + const nextParams = { ...(tile.viewParams ?? {}) } + if (visible) delete nextParams.comments + else nextParams.comments = `hidden` + helpers.setTileView(tile.id, tile.viewId, { + viewParams: Object.keys(nextParams).length > 0 ? nextParams : undefined, + }) + } + // The menu and the dialogs are siblings — keeping them in the same // portal subtree caused focus / unmount races (Base UI // tears the menu popup down on close, and any dialog mounted inside @@ -328,6 +340,39 @@ export function SplitMenu({ )} + {showDisplayOptions && ( + <> + + + + Display options + + + + + setChatCommentsVisible(!chatCommentsVisible) + )} + > + + Show comments + + + + + + + )} + helpers.splitTile(tile.id, `right`)}> Split right diff --git a/packages/agents-server-ui/src/hooks/useEntityTimeline.ts b/packages/agents-server-ui/src/hooks/useEntityTimeline.ts index eba430f748..f8422c72f9 100644 --- a/packages/agents-server-ui/src/hooks/useEntityTimeline.ts +++ b/packages/agents-server-ui/src/hooks/useEntityTimeline.ts @@ -4,16 +4,40 @@ import { compareTimelineOrders, createEntityTimelineQuery, normalizeTimelineEntities, + passthrough, } from '@electric-ax/agents-runtime/client' import { coalesce, eq } from '@durable-streams/state/db' import { connectEntityStream } from '../lib/entity-connection' import type { EntityStreamDBWithActions, - EntityTimelineQueryRow, + EntityTimelineQueryRow as RuntimeEntityTimelineQueryRow, IncludesInboxMessage, IncludesEntity, Manifest, } from '@electric-ax/agents-runtime/client' +import type { + CommentRow, + EntityTimelineCommentRow, + EntityTimelineQueryRow, +} from '../lib/comments' + +const TIMELINE_ORDER_FALLBACK = `~` + +/** + * Comments are a custom collection declared by the entity type (see + * horton's and worker's `customCollectionSchemas: { comment: ... }`). + * The UI registers the matching TanStack DB collection here so + * `db.collections.comments` resolves and the runtime can splice it + * into the timeline projection via `createEntityTimelineQuery`'s + * `customSource` option. + */ +const COMMENT_CUSTOM_STATE = { + comments: { + schema: passthrough(), + type: `comment`, + primaryKey: `key`, + }, +} as const type TimelineEntityManifest = | (Manifest & { kind: `child`; id: string; entity_url: string }) @@ -44,6 +68,28 @@ function isTimelineEntityManifest( ) } +/** + * Re-shape the runtime's generic `custom` variant into a UI-friendly + * `comment` variant. Runs synchronously off the same live-query result so + * comments stay perfectly in order with the rest of the timeline. + */ +function projectRow( + row: RuntimeEntityTimelineQueryRow +): EntityTimelineQueryRow { + if (row.custom && row.custom.collection === `comment`) { + const value = row.custom.value as CommentRow + return { + $key: row.$key, + comment: { + ...value, + key: row.custom.key, + order: row.custom.order, + } as EntityTimelineCommentRow, + } as EntityTimelineQueryRow + } + return row as EntityTimelineQueryRow +} + export function useEntityTimeline( baseUrl: string | null, entityUrl: string | null @@ -73,7 +119,11 @@ export function useEntityTimeline( let cancelled = false setLoading(true) - connectEntityStream({ baseUrl, entityUrl }) + connectEntityStream({ + baseUrl, + entityUrl, + customState: COMMENT_CUSTOM_STATE, + }) .then((result) => { if (cancelled) { result.close() @@ -102,10 +152,18 @@ export function useEntityTimeline( } }, [baseUrl, entityUrl]) - const { data: timelineRows = [] } = useLiveQuery( + const { data: rawTimelineRows = [] } = useLiveQuery( (q) => { if (!db) return undefined - return createEntityTimelineQuery(db)(q) + const customSource = q + .from({ comment: (db.collections as any).comments }) + .select(({ comment }: any) => ({ + collection: `comment` as const, + order: coalesce(comment._timeline_order, TIMELINE_ORDER_FALLBACK), + key: comment.key, + value: comment, + })) + return createEntityTimelineQuery(db, { customSource })(q) }, [db] ) @@ -113,8 +171,8 @@ export function useEntityTimeline( (q) => db ? q - .from({ manifest: db.collections.manifests as any }) - .orderBy(({ manifest }: any) => manifest._seq, `asc`) + .from({ manifest: db.collections.manifests }) + .orderBy(({ manifest }) => manifest._seq, `asc`) : undefined, [db] ) @@ -122,19 +180,25 @@ export function useEntityTimeline( (q) => db ? q - .from({ inbox: db.collections.inbox as any }) - .where(({ inbox }: any) => eq(inbox.status, `pending`)) + .from({ inbox: db.collections.inbox }) + .where(({ inbox }) => eq(inbox.status, `pending`)) .orderBy( - ({ inbox }: any) => coalesce(inbox._timeline_order, `~`), + ({ inbox }) => + coalesce(inbox._timeline_order, TIMELINE_ORDER_FALLBACK), `asc` ) - .orderBy(({ inbox }: any) => + .orderBy(({ inbox }) => coalesce(inbox._seq, Number.MAX_SAFE_INTEGER) ) : undefined, [db] ) - const typedTimelineRows = timelineRows as Array + + const timelineRows = useMemo>( + () => + (rawTimelineRows as Array).map(projectRow), + [rawTimelineRows] + ) const pendingInbox = useMemo( () => @@ -178,8 +242,8 @@ export function useEntityTimeline( [pendingInboxRows] ) const generationActive = useMemo( - () => typedTimelineRows.some((row) => row.run?.status === `started`), - [typedTimelineRows] + () => timelineRows.some((row) => row.run?.status === `started`), + [timelineRows] ) const entities = useMemo( () => @@ -206,7 +270,7 @@ export function useEntityTimeline( ) return { - timelineRows: typedTimelineRows, + timelineRows, pendingInbox, entities, generationActive, diff --git a/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx b/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx index 7e87c51c8b..dfd43b3160 100644 --- a/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx +++ b/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx @@ -74,6 +74,7 @@ const entitySchema = z.object({ type_revision: z.coerce.number().nullable().optional(), inbox_schemas: z.record(z.unknown()).nullable().optional(), state_schemas: z.record(z.unknown()).nullable().optional(), + custom_collection_schemas: z.record(z.unknown()).nullable().optional(), created_at: z.coerce.number(), updated_at: z.coerce.number(), }) @@ -84,6 +85,7 @@ const entityTypeSchema = z.object({ creation_schema: z.unknown().nullable(), inbox_schemas: z.record(z.unknown()).nullable(), state_schemas: z.record(z.unknown()).nullable(), + custom_collection_schemas: z.record(z.unknown()).nullable().optional(), slash_commands: z .array( z.object({ diff --git a/packages/agents-server-ui/src/lib/comments.test.ts b/packages/agents-server-ui/src/lib/comments.test.ts new file mode 100644 index 0000000000..e0640eed1f --- /dev/null +++ b/packages/agents-server-ui/src/lib/comments.test.ts @@ -0,0 +1,147 @@ +import { afterEach, describe, expect, it, vi } from 'vitest' +import { createCollection, localOnlyCollectionOptions } from '@tanstack/db' +import { compareTimelineOrders } from '@electric-ax/agents-runtime/client' +import { registerActiveServerHeaders } from './auth-fetch' +import { createSendCommentAction } from './comments' +import type { EntityStreamDBWithActions } from '@electric-ax/agents-runtime/client' +import type { + CommentSnapshot, + CommentTarget, + OptimisticComment, +} from './comments' + +function createCommentsDb() { + const comments = createCollection( + localOnlyCollectionOptions({ + id: `test-comments-${Math.random().toString(36).slice(2)}`, + getKey: (comment: OptimisticComment) => comment.key, + }) + ) + return { + db: { + collections: { + comments, + }, + } as unknown as EntityStreamDBWithActions, + comments, + } +} + +describe(`createSendCommentAction`, () => { + afterEach(() => { + vi.restoreAllMocks() + registerActiveServerHeaders(null) + }) + + it(`inserts optimistic comments at increasing pending timeline orders`, async () => { + const fetchMock = vi + .spyOn(globalThis, `fetch`) + .mockResolvedValue(new Response(`{}`, { status: 201 })) + const { db } = createCommentsDb() + const optimistic: Array = [] + const sendComment = createSendCommentAction({ + db, + baseUrl: `http://localhost:4437`, + entityUrl: `/chat/test`, + from: `/principal/user%3Ame`, + onOptimisticComment: (comment) => optimistic.push(comment), + }) + + const firstTx = sendComment({ body: `first` }) + const secondTx = sendComment({ body: `second` }) + await Promise.all([ + firstTx.isPersisted.promise, + secondTx.isPersisted.promise, + ]) + + expect(optimistic).toHaveLength(2) + expect(optimistic[0]?._timeline_order).toMatch(/^~pending:/) + expect(optimistic[1]?._timeline_order).toMatch(/^~pending:/) + expect( + compareTimelineOrders( + optimistic[0]!._timeline_order, + optimistic[1]!._timeline_order + ) + ).toBeLessThan(0) + expect(fetchMock).toHaveBeenCalledTimes(2) + }) + + it(`posts reply metadata with the same key as the optimistic row`, async () => { + const fetchMock = vi + .spyOn(globalThis, `fetch`) + .mockResolvedValue(new Response(`{}`, { status: 201 })) + const { db } = createCommentsDb() + const optimistic: Array = [] + const replyTo: CommentTarget = { + kind: `timeline`, + collection: `run`, + key: `run-1`, + } + const targetSnapshot: CommentSnapshot = { + label: `Assistant response`, + text: `Draft reply`, + collection: `run`, + } + const sendComment = createSendCommentAction({ + db, + baseUrl: `http://localhost:4437`, + entityUrl: `/chat/test`, + from: `/principal/user%3Ame`, + onOptimisticComment: (comment) => optimistic.push(comment), + }) + + const tx = sendComment({ + body: `looks right`, + replyTo, + targetSnapshot, + }) + await tx.isPersisted.promise + + expect(optimistic).toHaveLength(1) + expect(optimistic[0]).toMatchObject({ + body: `looks right`, + from_principal: `/principal/user%3Ame`, + reply_to: replyTo, + target_snapshot: targetSnapshot, + }) + expect(fetchMock).toHaveBeenCalledTimes(1) + const [url, init] = fetchMock.mock.calls[0]! + expect(url).toBe( + `http://localhost:4437/_electric/entities/chat/test/collections/comment` + ) + expect(init?.method).toBe(`POST`) + expect(new Headers(init?.headers).get(`content-type`)).toBe( + `application/json` + ) + const parsedBody = JSON.parse(String(init?.body)) as { + key: string + value: Record + } + expect(parsedBody.key).toBe(optimistic[0]!.key) + expect(parsedBody.value).toMatchObject({ + body: `looks right`, + from_principal: `/principal/user%3Ame`, + reply_to: replyTo, + target_snapshot: targetSnapshot, + }) + expect(typeof parsedBody.value.timestamp).toBe(`string`) + }) + + it(`rejects the persistence promise when the server rejects the comment`, async () => { + vi.spyOn(globalThis, `fetch`).mockResolvedValue( + new Response(JSON.stringify({ message: `No write access` }), { + status: 403, + }) + ) + const { db } = createCommentsDb() + const sendComment = createSendCommentAction({ + db, + baseUrl: `http://localhost:4437`, + entityUrl: `/chat/test`, + }) + + const tx = sendComment({ body: `blocked` }) + + await expect(tx.isPersisted.promise).rejects.toThrow(`No write access`) + }) +}) diff --git a/packages/agents-server-ui/src/lib/comments.ts b/packages/agents-server-ui/src/lib/comments.ts new file mode 100644 index 0000000000..958a98dbfb --- /dev/null +++ b/packages/agents-server-ui/src/lib/comments.ts @@ -0,0 +1,213 @@ +import { createOptimisticAction } from '@tanstack/db' +import { createPendingTimelineOrder } from '@electric-ax/agents-runtime/client' +import { getActivePrincipal, serverFetch } from './auth-fetch' +import { entityApiUrl } from './entity-api' +import type { + EntityStreamDBWithActions, + EntityTimelineQueryRow as RuntimeEntityTimelineQueryRow, +} from '@electric-ax/agents-runtime/client' + +const OPTIMISTIC_COMMENT_ORDER_START = Number.MAX_SAFE_INTEGER - 2_000_000 + +let optimisticCommentOrderIndex = OPTIMISTIC_COMMENT_ORDER_START + +/** + * Wire-level shape of the `comment` custom collection. Comments are a + * custom collection declared by the entity types that accept them + * (horton, worker): each type registers a JSON Schema in + * `custom_collection_schemas.comment`, and the server validates every + * write through the generic `/collections/comment` endpoint against + * that schema. Clients register the matching TanStack DB collection at + * StreamDB construction time (see `useEntityTimeline`). + */ +export type CommentTargetCollection = + | `inbox` + | `run` + | `text` + | `tool_call` + | `wake` + | `signal` + | `manifest` + +export type CommentTarget = + | { + kind: `comment` + key: string + } + | { + kind: `timeline` + collection: CommentTargetCollection + key: string + run_id?: string + } + +export type CommentSnapshot = { + label: string + text?: string + from?: string + timestamp?: string + collection?: string +} + +export type CommentRow = { + key: string + body: string + from_principal: string + timestamp: string + reply_to?: CommentTarget + target_snapshot?: CommentSnapshot + edited_at?: string + deleted_at?: string + deleted_by?: string + _seq?: number + _timeline_order?: string +} + +/** + * Comment shaped for rendering by the timeline (mirrors the row union the + * runtime emits for built-ins like inbox or run, with `order` resolved). + */ +export type EntityTimelineCommentRow = Omit< + CommentRow, + `_seq` | `_timeline_order` +> & { + order: string | number +} + +/** + * Timeline row union for the UI. `useEntityTimeline` projects the runtime's + * generic `custom` variant into a domain-specific `comment` variant for + * comments, so consumers can write `if (row.comment)` instead of inspecting + * `row.custom.collection`. + */ +type RuntimeBuiltInRow = Exclude< + RuntimeEntityTimelineQueryRow, + { custom: { collection: string } } +> +type WithCommentSlot = T & { comment?: undefined } +export type EntityTimelineQueryRow = + | WithCommentSlot + | { + $key: string + inbox?: undefined + run?: undefined + wake?: undefined + signal?: undefined + manifest?: undefined + comment: EntityTimelineCommentRow + } + +export type OptimisticComment = EntityTimelineCommentRow & { + _timeline_order: string +} + +export type SelectedCommentTarget = { + target: CommentTarget + snapshot: CommentSnapshot +} + +type SendCommentInput = { + key: string + body: string + replyTo?: CommentTarget + targetSnapshot?: CommentSnapshot + pendingOrderIndex: number +} + +function nextOptimisticCommentOrderIndex(): number { + optimisticCommentOrderIndex += 1 + if (optimisticCommentOrderIndex >= Number.MAX_SAFE_INTEGER) { + optimisticCommentOrderIndex = OPTIMISTIC_COMMENT_ORDER_START + } + return optimisticCommentOrderIndex +} + +function createClientCommentKey(pendingOrderIndex: number): string { + return `client-comment-${Date.now()}-${pendingOrderIndex}` +} + +function readCommentError(status: number, body: string): Error { + let message = `Failed to post comment (${status})` + if (body) { + try { + const data = JSON.parse(body) as Record + if (data.message) message = String(data.message) + } catch { + message = body + } + } + return new Error(message) +} + +export function createSendCommentAction({ + db, + baseUrl, + entityUrl, + from, + onOptimisticComment, +}: { + db: EntityStreamDBWithActions + baseUrl: string + entityUrl: string + from?: string + onOptimisticComment?: (comment: OptimisticComment) => void +}) { + const action = createOptimisticAction({ + onMutate: ({ key, body, replyTo, targetSnapshot, pendingOrderIndex }) => { + const now = new Date().toISOString() + const comment: OptimisticComment = { + key, + order: createPendingTimelineOrder(pendingOrderIndex), + _timeline_order: createPendingTimelineOrder(pendingOrderIndex), + body, + from_principal: from ?? getActivePrincipal(), + timestamp: now, + ...(replyTo ? { reply_to: replyTo } : {}), + ...(targetSnapshot ? { target_snapshot: targetSnapshot } : {}), + } + onOptimisticComment?.(comment) + ;(db.collections as Record).comments.insert(comment) + }, + mutationFn: async ({ key, body, replyTo, targetSnapshot }) => { + const now = new Date().toISOString() + const value = { + body, + from_principal: from ?? getActivePrincipal(), + timestamp: now, + ...(replyTo ? { reply_to: replyTo } : {}), + ...(targetSnapshot ? { target_snapshot: targetSnapshot } : {}), + } + const res = await serverFetch( + entityApiUrl(baseUrl, entityUrl, `/collections/comment`), + { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ key, value }), + } + ) + if (!res.ok) { + const body = await res.text().catch(() => ``) + throw readCommentError(res.status, body) + } + }, + }) + + return ({ + body, + replyTo, + targetSnapshot, + }: { + body: string + replyTo?: CommentTarget + targetSnapshot?: CommentSnapshot + }) => { + const pendingOrderIndex = nextOptimisticCommentOrderIndex() + return action({ + key: createClientCommentKey(pendingOrderIndex), + body, + replyTo, + targetSnapshot, + pendingOrderIndex, + }) + } +} diff --git a/packages/agents-server-ui/src/lib/workspace/registerViews.ts b/packages/agents-server-ui/src/lib/workspace/registerViews.ts index e887aeb9bb..22c49a3131 100644 --- a/packages/agents-server-ui/src/lib/workspace/registerViews.ts +++ b/packages/agents-server-ui/src/lib/workspace/registerViews.ts @@ -1,7 +1,7 @@ -import { Database, MessageSquare, SquarePen } from 'lucide-react' +import { Database, MessageCircle, MessageSquare, SquarePen } from 'lucide-react' import { registerView } from './viewRegistry' import { NEW_SESSION_VIEW_ID } from './types' -import { ChatView } from '../../components/views/ChatView' +import { ChatView, CommentsView } from '../../components/views/ChatView' import { StateExplorerView } from '../../components/views/StateExplorerView' import { NewSessionView } from '../../components/views/NewSessionView' @@ -23,6 +23,19 @@ registerView({ Component: ChatView, }) +registerView({ + kind: `entity`, + id: `comments`, + label: `Comments`, + icon: MessageCircle, + description: `Comment-only timeline`, + Component: CommentsView, + // Only entities whose type opts in to comments via + // `custom_collection_schemas.comment` see the comments-only view. + isAvailable: (entity) => + Boolean(entity.custom_collection_schemas?.comment), +}) + registerView({ kind: `entity`, id: `state-explorer`, diff --git a/packages/agents-server/drizzle/0015_entity_custom_collection_schemas.sql b/packages/agents-server/drizzle/0015_entity_custom_collection_schemas.sql new file mode 100644 index 0000000000..d0bd4942af --- /dev/null +++ b/packages/agents-server/drizzle/0015_entity_custom_collection_schemas.sql @@ -0,0 +1,2 @@ +ALTER TABLE "entity_types" ADD COLUMN "custom_collection_schemas" jsonb;--> statement-breakpoint +ALTER TABLE "entities" ADD COLUMN "custom_collection_schemas" jsonb; diff --git a/packages/agents-server/drizzle/meta/_journal.json b/packages/agents-server/drizzle/meta/_journal.json index 994885b230..e65a63fcf9 100644 --- a/packages/agents-server/drizzle/meta/_journal.json +++ b/packages/agents-server/drizzle/meta/_journal.json @@ -106,6 +106,13 @@ "when": 1780600000000, "tag": "0014_entity_type_slash_commands", "breakpoints": true + }, + { + "idx": 15, + "version": "7", + "when": 1781000000000, + "tag": "0015_entity_custom_collection_schemas", + "breakpoints": true } ] } diff --git a/packages/agents-server/src/db/schema.ts b/packages/agents-server/src/db/schema.ts index 5c01ada8b7..4e298214b4 100644 --- a/packages/agents-server/src/db/schema.ts +++ b/packages/agents-server/src/db/schema.ts @@ -24,6 +24,7 @@ export const entityTypes = pgTable( creationSchema: jsonb(`creation_schema`), inboxSchemas: jsonb(`inbox_schemas`), stateSchemas: jsonb(`state_schemas`), + customCollectionSchemas: jsonb(`custom_collection_schemas`), slashCommands: jsonb(`slash_commands`), serveEndpoint: text(`serve_endpoint`), defaultDispatchPolicy: jsonb(`default_dispatch_policy`), @@ -56,6 +57,7 @@ export const entities = pgTable( typeRevision: integer(`type_revision`), inboxSchemas: jsonb(`inbox_schemas`), stateSchemas: jsonb(`state_schemas`), + customCollectionSchemas: jsonb(`custom_collection_schemas`), createdAt: bigint(`created_at`, { mode: `number` }).notNull(), updatedAt: bigint(`updated_at`, { mode: `number` }).notNull(), }, diff --git a/packages/agents-server/src/electric-agents-types.ts b/packages/agents-server/src/electric-agents-types.ts index 60ffefec96..2edb166437 100644 --- a/packages/agents-server/src/electric-agents-types.ts +++ b/packages/agents-server/src/electric-agents-types.ts @@ -447,6 +447,7 @@ export interface ElectricAgentsEntity { type_revision?: number inbox_schemas?: Record> state_schemas?: Record> + custom_collection_schemas?: Record> created_by?: string created_at: number updated_at: number @@ -463,6 +464,7 @@ export interface PublicElectricAgentsEntity { spawn_args?: Record sandbox?: EntitySandboxSelection parent?: string + custom_collection_schemas?: Record> created_by?: string created_at: number updated_at: number @@ -488,6 +490,7 @@ export function toPublicEntity( spawn_args: entity.spawn_args, sandbox: entity.sandbox, parent: entity.parent, + custom_collection_schemas: entity.custom_collection_schemas, created_by: entity.created_by, created_at: entity.created_at, updated_at: entity.updated_at, @@ -500,6 +503,7 @@ export interface ElectricAgentsEntityType { creation_schema?: Record inbox_schemas?: Record> state_schemas?: Record> + custom_collection_schemas?: Record> slash_commands?: Array serve_endpoint?: string default_dispatch_policy?: DispatchPolicy @@ -514,6 +518,7 @@ export interface RegisterEntityTypeRequest { creation_schema?: Record inbox_schemas?: Record> state_schemas?: Record> + custom_collection_schemas?: Record> slash_commands?: Array serve_endpoint?: string default_dispatch_policy?: DispatchPolicy diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index 26700b246f..e1fdf89d16 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -1,6 +1,7 @@ import { createHash, randomUUID } from 'node:crypto' import fastq from 'fastq' import { + BUILT_IN_COLLECTION_TYPES, COMPOSER_INPUT_MESSAGE_TYPE, assertTags, entityStateSchema, @@ -465,6 +466,7 @@ export class EntityManager { this.validateSchema(req.creation_schema) this.validateSchemaMap(req.inbox_schemas) this.validateSchemaMap(req.state_schemas) + this.validateSchemaMap(req.custom_collection_schemas) this.validateSlashCommands(req.slash_commands) const defaultDispatchPolicy = req.default_dispatch_policy ? this.validateDispatchPolicy(req.default_dispatch_policy, { @@ -480,6 +482,7 @@ export class EntityManager { creation_schema: req.creation_schema, inbox_schemas: req.inbox_schemas, state_schemas: req.state_schemas, + custom_collection_schemas: req.custom_collection_schemas, slash_commands: req.slash_commands, serve_endpoint: req.serve_endpoint, default_dispatch_policy: defaultDispatchPolicy, @@ -734,6 +737,7 @@ export class EntityManager { type_revision: entityType.revision, inbox_schemas: entityType.inbox_schemas, state_schemas: entityType.state_schemas, + custom_collection_schemas: entityType.custom_collection_schemas, created_at: now, created_by: req.created_by ?? parentEntity?.created_by, updated_at: now, @@ -2501,6 +2505,94 @@ export class EntityManager { return /^\/[^/]+\/[^/]+\/attachments\/[^/]+$/.test(path) } + async appendCollectionRow( + entityUrl: string, + collection: string, + req: { key?: string; value: unknown } + ): Promise<{ key: string }> { + if (BUILT_IN_COLLECTION_TYPES.has(collection)) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Collection "${collection}" is reserved by a built-in and cannot be written through the custom collection endpoint`, + 400 + ) + } + if (!/^[a-zA-Z][a-zA-Z0-9_-]{0,127}$/.test(collection)) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Invalid collection name "${collection}"`, + 400 + ) + } + if (req.value === null || typeof req.value !== `object`) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Collection value must be a JSON object`, + 400 + ) + } + + const entity = await this.registry.getEntity(entityUrl) + if (!entity) { + throw new ElectricAgentsError(ErrCodeNotFound, `Entity not found`, 404) + } + if (rejectsNormalWrites(entity.status)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is not accepting writes`, + 409 + ) + } + if (this.isForkWorkLockedEntity(entityUrl)) { + this.assertEntityNotForkWorkLocked(entityUrl) + } + + const { customCollectionSchemas } = await this.getEffectiveSchemas(entity) + const schema = customCollectionSchemas?.[collection] + if (!schema) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Custom collection "${collection}" is not declared on entity type "${entity.type}"`, + 422 + ) + } + const valErr = this.validator.validate(schema, req.value) + if (valErr) { + throw new ElectricAgentsError( + valErr.code, + valErr.message, + 422, + valErr.details + ) + } + + const key = + req.key ?? + `${collection}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}` + const event = { + type: collection, + key, + value: req.value, + headers: { operation: `insert` as const }, + } + const encoded = this.encodeChangeEvent(event) + + try { + await this.streamClient.append(entity.streams.main, encoded) + } catch (err) { + if (this.isClosedStreamError(err)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is stopped`, + 409 + ) + } + throw err + } + + return { key } + } + async createAttachment( entityUrl: string, req: CreateAttachmentRequest @@ -3619,6 +3711,7 @@ export class EntityManager { schemas: { inbox_schemas?: Record> state_schemas?: Record> + custom_collection_schemas?: Record> } ): Promise { if (typeName === `principal`) { @@ -3632,6 +3725,7 @@ export class EntityManager { // Validate each provided schema via validateSchemaSubset. this.validateSchemaMap(schemas.inbox_schemas) this.validateSchemaMap(schemas.state_schemas) + this.validateSchemaMap(schemas.custom_collection_schemas) // Look up current entity type. const existing = await this.registry.getEntityType(typeName) @@ -3643,37 +3737,23 @@ export class EntityManager { ) } - // Check for key overlap (additive only, no overwriting). - if (schemas.inbox_schemas && existing.inbox_schemas) { - for (const key of Object.keys(schemas.inbox_schemas)) { - if (key in existing.inbox_schemas) { - throw new ElectricAgentsError( - ErrCodeSchemaKeyExists, - `Cannot amend existing inbox schema key: ${key}`, - 409 - ) - } - } - } - if (schemas.state_schemas && existing.state_schemas) { - for (const key of Object.keys(schemas.state_schemas)) { - if (key in existing.state_schemas) { - throw new ElectricAgentsError( - ErrCodeSchemaKeyExists, - `Cannot amend existing state schema key: ${key}`, - 409 - ) - } - } - } - - // Merge schemas. - const mergedInbox = schemas.inbox_schemas - ? { ...(existing.inbox_schemas ?? {}), ...schemas.inbox_schemas } - : existing.inbox_schemas - const mergedState = schemas.state_schemas - ? { ...(existing.state_schemas ?? {}), ...schemas.state_schemas } - : existing.state_schemas + // Additive merge per schema map: reject overlapping keys (no + // overwriting), then spread the new entries onto the existing ones. + const mergedInbox = this.mergeAdditiveSchemaMap( + `inbox`, + schemas.inbox_schemas, + existing.inbox_schemas + ) + const mergedState = this.mergeAdditiveSchemaMap( + `state`, + schemas.state_schemas, + existing.state_schemas + ) + const mergedCustomCollection = this.mergeAdditiveSchemaMap( + `custom collection`, + schemas.custom_collection_schemas, + existing.custom_collection_schemas + ) const now = new Date().toISOString() const nextRevision = existing.revision + 1 @@ -3684,6 +3764,7 @@ export class EntityManager { creation_schema: existing.creation_schema, inbox_schemas: mergedInbox, state_schemas: mergedState, + custom_collection_schemas: mergedCustomCollection, slash_commands: existing.slash_commands, serve_endpoint: existing.serve_endpoint, default_dispatch_policy: existing.default_dispatch_policy, @@ -3748,6 +3829,32 @@ export class EntityManager { } } + /** + * Additively merge a schema map: reject any key that already exists on + * `existing` (additive only, no overwriting), then return the spread of + * both. Used by `amendSchemas` to extend `inbox_schemas`, `state_schemas`, + * and `custom_collection_schemas` without overwriting registered keys. + */ + private mergeAdditiveSchemaMap( + label: string, + incoming: Record> | undefined, + existing: Record> | undefined + ): Record> | undefined { + if (!incoming) return existing + if (existing) { + for (const key of Object.keys(incoming)) { + if (key in existing) { + throw new ElectricAgentsError( + ErrCodeSchemaKeyExists, + `Cannot amend existing ${label} schema key: ${key}`, + 409 + ) + } + } + } + return { ...(existing ?? {}), ...incoming } + } + private validateDispatchPolicy( input: unknown, opts: { label: string } @@ -3871,11 +3978,13 @@ export class EntityManager { private async getEffectiveSchemas(entity: ElectricAgentsEntity): Promise<{ inboxSchemas?: Record> stateSchemas?: Record> + customCollectionSchemas?: Record> }> { if (!entity.type) { return { inboxSchemas: entity.inbox_schemas, stateSchemas: entity.state_schemas, + customCollectionSchemas: entity.custom_collection_schemas, } } @@ -3888,6 +3997,12 @@ export class EntityManager { stateSchemas: latestType?.state_schemas ? { ...(entity.state_schemas ?? {}), ...latestType.state_schemas } : entity.state_schemas, + customCollectionSchemas: latestType?.custom_collection_schemas + ? { + ...(entity.custom_collection_schemas ?? {}), + ...latestType.custom_collection_schemas, + } + : entity.custom_collection_schemas, } } diff --git a/packages/agents-server/src/entity-projector.ts b/packages/agents-server/src/entity-projector.ts index f8c9d89295..1017975db8 100644 --- a/packages/agents-server/src/entity-projector.ts +++ b/packages/agents-server/src/entity-projector.ts @@ -46,6 +46,7 @@ interface EntityShapeRow extends Row { type_revision?: number | null inbox_schemas?: Record> | null state_schemas?: Record> | null + custom_collection_schemas?: Record> | null created_at: number updated_at: number } @@ -63,6 +64,7 @@ const ENTITY_SHAPE_COLUMNS = [ `type_revision`, `inbox_schemas`, `state_schemas`, + `custom_collection_schemas`, `created_at`, `updated_at`, ] as const @@ -129,6 +131,7 @@ function toMemberRow(entity: EntityShapeRow): EntityMembershipRow { type_revision: entity.type_revision ?? null, inbox_schemas: entity.inbox_schemas ?? null, state_schemas: entity.state_schemas ?? null, + custom_collection_schemas: entity.custom_collection_schemas ?? null, created_at: entity.created_at, updated_at: entity.updated_at, } diff --git a/packages/agents-server/src/entity-registry.ts b/packages/agents-server/src/entity-registry.ts index c35b62813c..f5463e44ed 100644 --- a/packages/agents-server/src/entity-registry.ts +++ b/packages/agents-server/src/entity-registry.ts @@ -633,6 +633,7 @@ export class PostgresRegistry { creationSchema: et.creation_schema ?? null, inboxSchemas: et.inbox_schemas ?? null, stateSchemas: et.state_schemas ?? null, + customCollectionSchemas: et.custom_collection_schemas ?? null, slashCommands: et.slash_commands ?? null, serveEndpoint: et.serve_endpoint ?? null, defaultDispatchPolicy: et.default_dispatch_policy ?? null, @@ -647,6 +648,7 @@ export class PostgresRegistry { creationSchema: et.creation_schema ?? null, inboxSchemas: et.inbox_schemas ?? null, stateSchemas: et.state_schemas ?? null, + customCollectionSchemas: et.custom_collection_schemas ?? null, slashCommands: et.slash_commands ?? null, serveEndpoint: et.serve_endpoint ?? null, defaultDispatchPolicy: et.default_dispatch_policy ?? null, @@ -670,6 +672,7 @@ export class PostgresRegistry { creationSchema: et.creation_schema ?? null, inboxSchemas: et.inbox_schemas ?? null, stateSchemas: et.state_schemas ?? null, + customCollectionSchemas: et.custom_collection_schemas ?? null, slashCommands: et.slash_commands ?? null, serveEndpoint: et.serve_endpoint ?? null, defaultDispatchPolicy: et.default_dispatch_policy ?? null, @@ -712,6 +715,7 @@ export class PostgresRegistry { creationSchema: et.creation_schema ?? null, inboxSchemas: et.inbox_schemas ?? null, stateSchemas: et.state_schemas ?? null, + customCollectionSchemas: et.custom_collection_schemas ?? null, slashCommands: et.slash_commands ?? null, serveEndpoint: et.serve_endpoint ?? null, defaultDispatchPolicy: et.default_dispatch_policy ?? null, @@ -743,6 +747,7 @@ export class PostgresRegistry { typeRevision: entity.type_revision ?? null, inboxSchemas: entity.inbox_schemas ?? null, stateSchemas: entity.state_schemas ?? null, + customCollectionSchemas: entity.custom_collection_schemas ?? null, createdAt: entity.created_at, updatedAt: entity.updated_at, }) @@ -1833,6 +1838,9 @@ export class PostgresRegistry { state_schemas: row.stateSchemas as | Record> | undefined, + custom_collection_schemas: row.customCollectionSchemas as + | Record> + | undefined, slash_commands: (row.slashCommands as ElectricAgentsEntityType[`slash_commands`]) ?? undefined, @@ -1906,6 +1914,9 @@ export class PostgresRegistry { state_schemas: row.stateSchemas as | Record> | undefined, + custom_collection_schemas: row.customCollectionSchemas as + | Record> + | undefined, created_at: row.createdAt, updated_at: row.updatedAt, } diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index 31dbc72a75..fa067cced1 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -326,8 +326,17 @@ const eventSourceSubscriptionBodySchema = Type.Object({ reason: Type.Optional(Type.String()), }) +const collectionWriteBodySchema = Type.Object( + { + key: Type.Optional(Type.String()), + value: Type.Record(Type.String(), Type.Unknown()), + }, + { additionalProperties: false } +) + type SpawnBody = Static type SendBody = Static +type CollectionWriteBody = Static type InboxMessageBody = Static type ForkBody = Static type SetTagBody = Static @@ -407,6 +416,13 @@ entitiesRouter.post( withEntityPermission(`write`), sendEntity ) +entitiesRouter.post( + `/:type/:instanceId/collections/:collection`, + withExistingEntity, + withSchema(collectionWriteBodySchema), + withEntityPermission(`write`), + writeCollectionRow +) entitiesRouter.post( `/:type/:instanceId/attachments`, withExistingEntity, @@ -1296,6 +1312,22 @@ async function sendEntity( return status(204) } +async function writeCollectionRow( + request: AgentsRouteRequest, + ctx: TenantContext +): Promise { + const parsed = routeBody(request) + await ctx.entityManager.ensurePrincipal(ctx.principal) + const { entityUrl } = requireExistingEntityRoute(request) + const collection = decodeURIComponent(request.params.collection) + const result = await ctx.entityManager.appendCollectionRow( + entityUrl, + collection, + { key: parsed.key, value: parsed.value } + ) + return json(result, { status: 201 }) +} + async function createAttachment( request: AgentsRouteRequest, ctx: TenantContext diff --git a/packages/agents-server/src/routing/entity-types-router.ts b/packages/agents-server/src/routing/entity-types-router.ts index 76e7a50afe..eac4c91595 100644 --- a/packages/agents-server/src/routing/entity-types-router.ts +++ b/packages/agents-server/src/routing/entity-types-router.ts @@ -87,6 +87,7 @@ const registerEntityTypeBodySchema = Type.Object( creation_schema: Type.Optional(jsonObjectSchema), inbox_schemas: Type.Optional(schemaMapSchema), state_schemas: Type.Optional(schemaMapSchema), + custom_collection_schemas: Type.Optional(schemaMapSchema), slash_commands: Type.Optional(Type.Array(slashCommandSchema)), serve_endpoint: Type.Optional(Type.String()), default_dispatch_policy: Type.Optional(dispatchPolicySchema), @@ -101,6 +102,7 @@ const amendEntityTypeSchemasBodySchema = Type.Object( { inbox_schemas: Type.Optional(schemaMapSchema), state_schemas: Type.Optional(schemaMapSchema), + custom_collection_schemas: Type.Optional(schemaMapSchema), }, { additionalProperties: false } ) @@ -349,6 +351,7 @@ async function amendSchemas( const updated = await ctx.entityManager.amendSchemas(request.params.name, { inbox_schemas: parsed.inbox_schemas, state_schemas: parsed.state_schemas, + custom_collection_schemas: parsed.custom_collection_schemas, }) return json(toPublicEntityType(updated)) } @@ -455,6 +458,7 @@ function normalizeEntityTypeRequest( creation_schema: parsed.creation_schema, inbox_schemas: parsed.inbox_schemas, state_schemas: parsed.state_schemas, + custom_collection_schemas: parsed.custom_collection_schemas, slash_commands: parsed.slash_commands, serve_endpoint: serveEndpoint, default_dispatch_policy: diff --git a/packages/agents-server/src/utils/server-utils.ts b/packages/agents-server/src/utils/server-utils.ts index 658a753904..629c012114 100644 --- a/packages/agents-server/src/utils/server-utils.ts +++ b/packages/agents-server/src/utils/server-utils.ts @@ -135,7 +135,7 @@ export function buildElectricProxyTarget(options: { } else if (table === `entity_types`) { target.searchParams.set( `columns`, - `"tenant_id","name","description","creation_schema","inbox_schemas","state_schemas","slash_commands","serve_endpoint","default_dispatch_policy","revision","created_at","updated_at"` + `"tenant_id","name","description","creation_schema","inbox_schemas","state_schemas","custom_collection_schemas","slash_commands","serve_endpoint","default_dispatch_policy","revision","created_at","updated_at"` ) applyShapeWhere( target, diff --git a/packages/agents-server/test/electric-agents-manager-write-validation.test.ts b/packages/agents-server/test/electric-agents-manager-write-validation.test.ts index 4cab89f72b..0cf84ed42c 100644 --- a/packages/agents-server/test/electric-agents-manager-write-validation.test.ts +++ b/packages/agents-server/test/electric-agents-manager-write-validation.test.ts @@ -38,10 +38,12 @@ function createAttachmentManager({ entityStatus = `running`, readJson = [], streamClient = {}, + customCollectionSchemas, }: { entityStatus?: string readJson?: Array streamClient?: Record + customCollectionSchemas?: Record> } = {}) { return { manager: new EntityManager({ @@ -50,6 +52,7 @@ function createAttachmentManager({ url: `/chat/session-1`, status: entityStatus, streams: { main: `/chat/session-1` }, + custom_collection_schemas: customCollectionSchemas, }), getEntityType: vi.fn(), replaceEntityManifestSource: vi.fn(), @@ -73,6 +76,14 @@ function createAttachmentManager({ } } +// Permissive note schema for the happy-path tests below — every property +// optional so we cover the appendCollectionRow plumbing without the +// schema-shape itself being the subject under test. +const NOTE_SCHEMA = { + type: `object`, + additionalProperties: true, +} as const + function attachmentManifest(value: Record) { return { type: `manifest`, @@ -203,6 +214,181 @@ describe(`ElectricAgentsManager attachments`, () => { }) }) +describe(`ElectricAgentsManager appendCollectionRow`, () => { + it(`appends the row to the entity main stream and returns the key`, async () => { + const append = vi.fn().mockResolvedValue(undefined) + const { manager } = createAttachmentManager({ + streamClient: { append }, + customCollectionSchemas: { note: NOTE_SCHEMA }, + }) + + const result = await manager.appendCollectionRow( + `/chat/session-1`, + `note`, + { key: `n-1`, value: { body: `hello` } } + ) + + expect(result).toEqual({ key: `n-1` }) + expect(append).toHaveBeenCalledTimes(1) + const [streamPath, encoded] = append.mock.calls[0]! + expect(streamPath).toBe(`/chat/session-1`) + const event = JSON.parse(new TextDecoder().decode(encoded)) + expect(event).toEqual({ + type: `note`, + key: `n-1`, + value: { body: `hello` }, + headers: { operation: `insert` }, + }) + }) + + it(`auto-generates a key when none is provided`, async () => { + const append = vi.fn().mockResolvedValue(undefined) + const { manager } = createAttachmentManager({ + streamClient: { append }, + customCollectionSchemas: { note: NOTE_SCHEMA }, + }) + + const result = await manager.appendCollectionRow( + `/chat/session-1`, + `note`, + { value: { body: `hi` } } + ) + + expect(result.key).toMatch(/^note-\d+-[a-z0-9]+$/) + }) + + it(`rejects writes targeting a built-in collection name`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ streamClient: { append } }) + + await expect( + manager.appendCollectionRow(`/chat/session-1`, `inbox`, { + value: { body: `hi` }, + }) + ).rejects.toMatchObject({ + status: 400, + message: `Collection "inbox" is reserved by a built-in and cannot be written through the custom collection endpoint`, + }) + expect(append).not.toHaveBeenCalled() + }) + + it(`rejects invalid collection names`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ streamClient: { append } }) + + await expect( + manager.appendCollectionRow(`/chat/session-1`, `bad name`, { + value: { body: `hi` }, + }) + ).rejects.toMatchObject({ + status: 400, + message: `Invalid collection name "bad name"`, + }) + expect(append).not.toHaveBeenCalled() + }) + + it(`rejects non-object values`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ streamClient: { append } }) + + await expect( + manager.appendCollectionRow(`/chat/session-1`, `note`, { + value: `oops` as unknown as object, + }) + ).rejects.toMatchObject({ + status: 400, + message: `Collection value must be a JSON object`, + }) + expect(append).not.toHaveBeenCalled() + }) + + it(`rejects writes when the entity is not accepting writes`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ + entityStatus: `stopped`, + streamClient: { append }, + }) + + await expect( + manager.appendCollectionRow(`/chat/session-1`, `note`, { + value: { body: `hi` }, + }) + ).rejects.toMatchObject({ + status: 409, + message: `Entity is not accepting writes`, + }) + expect(append).not.toHaveBeenCalled() + }) + + it(`validates the value against a declared custom collection schema`, async () => { + const commentSchema = { + type: `object`, + properties: { + body: { type: `string`, minLength: 1 }, + from_principal: { type: `string` }, + }, + required: [`body`, `from_principal`], + additionalProperties: true, + } + const append = vi.fn().mockResolvedValue(undefined) + const manager = new EntityManager({ + registry: { + getEntity: vi.fn().mockResolvedValue({ + url: `/coder/session-1`, + type: `coder`, + status: `idle`, + streams: { main: `/coder/session-1/main` }, + custom_collection_schemas: { comment: commentSchema }, + }), + getEntityType: vi.fn().mockResolvedValue({ + name: `coder`, + custom_collection_schemas: { comment: commentSchema }, + }), + close: vi.fn(), + } as any, + streamClient: { append } as any, + validator: new SchemaValidator(), + wakeRegistry: { + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + }) + + // Happy path: matches the schema. + await manager.appendCollectionRow(`/coder/session-1`, `comment`, { + value: { body: `looks right`, from_principal: `/principal/user%3Ame` }, + }) + expect(append).toHaveBeenCalledTimes(1) + + // Sad path: missing required field. + await expect( + manager.appendCollectionRow(`/coder/session-1`, `comment`, { + value: { from_principal: `/principal/user%3Ame` }, + }) + ).rejects.toMatchObject({ status: 422 }) + expect(append).toHaveBeenCalledTimes(1) + }) + + it(`rejects writes to collections the entity type did not declare`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ + streamClient: { append }, + }) + + // No custom_collection_schemas on entity or type — the collection + // isn't a recognized shape, so the write is rejected. + await expect( + manager.appendCollectionRow(`/chat/session-1`, `anything`, { + value: { wild: { stuff: [1, 2, 3] } }, + }) + ).rejects.toMatchObject({ + status: 422, + message: `Custom collection "anything" is not declared on entity type "undefined"`, + }) + expect(append).not.toHaveBeenCalled() + }) +}) + describe(`ElectricAgentsManager composer input validation`, () => { it(`accepts composer_input without an entity-declared inbox schema`, async () => { const manager = new EntityManager({ diff --git a/packages/agents-server/test/electric-agents-routes.test.ts b/packages/agents-server/test/electric-agents-routes.test.ts index 64df63ad52..1162807cc7 100644 --- a/packages/agents-server/test/electric-agents-routes.test.ts +++ b/packages/agents-server/test/electric-agents-routes.test.ts @@ -959,6 +959,106 @@ describe(`ElectricAgentsRoutes send endpoint`, () => { }) }) +describe(`ElectricAgentsRoutes collection write endpoint`, () => { + it(`delegates a custom collection write to the manager`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/test` }), + getEntityType: vi.fn(), + }, + ensurePrincipal: vi.fn().mockResolvedValue(undefined), + appendCollectionRow: vi.fn().mockResolvedValue({ key: `auto-1` }), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/test/collections/note`, + { value: { body: `hi` } } + ) + + expect(response.status).toBe(201) + expect(manager.appendCollectionRow).toHaveBeenCalledWith( + `/chat/test`, + `note`, + { key: undefined, value: { body: `hi` } } + ) + expect(await responseJson(response)).toEqual({ key: `auto-1` }) + }) + + it(`returns 404 when the entity is missing`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue(null), + getEntityType: vi.fn().mockResolvedValue({ name: `chat` }), + }, + appendCollectionRow: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/missing/collections/note`, + { value: { body: `hi` } } + ) + + expect(response.status).toBe(404) + expect(manager.appendCollectionRow).not.toHaveBeenCalled() + }) + + it(`rejects bodies that are missing the value field`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/test` }), + getEntityType: vi.fn(), + }, + ensurePrincipal: vi.fn().mockResolvedValue(undefined), + appendCollectionRow: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/test/collections/note`, + {} + ) + + expect(response.status).toBe(400) + expect(manager.appendCollectionRow).not.toHaveBeenCalled() + }) + + it(`propagates manager-level rejections of reserved collection names`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/test` }), + getEntityType: vi.fn(), + }, + ensurePrincipal: vi.fn().mockResolvedValue(undefined), + appendCollectionRow: vi + .fn() + .mockRejectedValue( + new ElectricAgentsError( + `INVALID_REQUEST`, + `Collection "inbox" is reserved by a built-in and cannot be written through the custom collection endpoint`, + 400 + ) + ), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/test/collections/inbox`, + { value: { body: `hi` } } + ) + + expect(response.status).toBe(400) + expect(await responseJson(response)).toMatchObject({ + error: { code: `INVALID_REQUEST` }, + }) + }) +}) + describe(`ElectricAgentsRoutes spawn endpoint request validation`, () => { it(`rejects malformed JSON before spawning`, async () => { const manager = { diff --git a/packages/agents/src/agents/comment-schema.ts b/packages/agents/src/agents/comment-schema.ts new file mode 100644 index 0000000000..2c58f2d06b --- /dev/null +++ b/packages/agents/src/agents/comment-schema.ts @@ -0,0 +1,41 @@ +import { z } from 'zod' + +export const commentCollectionSchema = z.looseObject({ + body: z.string().min(1), + from_principal: z.string(), + timestamp: z.string(), + reply_to: z + .union([ + z.strictObject({ + kind: z.literal(`comment`), + key: z.string(), + }), + z.strictObject({ + kind: z.literal(`timeline`), + collection: z.enum([ + `inbox`, + `run`, + `text`, + `tool_call`, + `wake`, + `signal`, + `manifest`, + ]), + key: z.string(), + run_id: z.string().optional(), + }), + ]) + .optional(), + target_snapshot: z + .looseObject({ + label: z.string(), + text: z.string().optional(), + from: z.string().optional(), + timestamp: z.string().optional(), + collection: z.string().optional(), + }) + .optional(), + edited_at: z.string().optional(), + deleted_at: z.string().optional(), + deleted_by: z.string().optional(), +}) diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index fcaadd8471..9af041739e 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -19,6 +19,7 @@ import { createContextSkillLoader, completeWithLowCostModel, } from '@electric-ax/agents-runtime' +import { commentCollectionSchema } from './comment-schema' import type { EntityRegistry, HandlerContext, @@ -760,6 +761,9 @@ export function registerHorton( registry.define(`horton`, { description: `Friendly capable assistant — chat, code, research, dispatch`, creationSchema: hortonCreationSchema, + customCollectionSchemas: { + comment: commentCollectionSchema, + }, permissionGrants: [ { subject_kind: `principal_kind`, diff --git a/packages/agents/src/agents/worker.ts b/packages/agents/src/agents/worker.ts index da833feb01..05da2ce010 100644 --- a/packages/agents/src/agents/worker.ts +++ b/packages/agents/src/agents/worker.ts @@ -1,5 +1,6 @@ import { Type } from '@sinclair/typebox' import { db } from '@electric-ax/agents-runtime' +import { commentCollectionSchema } from './comment-schema' import { createBashTool, braveSearchTool, @@ -302,6 +303,9 @@ export function registerWorker( const { streamFn, modelCatalog } = options registry.define(`worker`, { description: `Internal — generic worker spawned by other agents. Configure via spawn args (systemPrompt + tools + optional sharedDb).`, + customCollectionSchemas: { + comment: commentCollectionSchema, + }, permissionGrants: [ { subject_kind: `principal_kind`,