diff --git a/.changeset/generic-externally-writable-collections.md b/.changeset/generic-externally-writable-collections.md new file mode 100644 index 0000000000..76fe6c8be0 --- /dev/null +++ b/.changeset/generic-externally-writable-collections.md @@ -0,0 +1,8 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server': patch +'@electric-ax/agents-server-ui': patch +'@electric-ax/agents': patch +--- + +Add generic externally-writable custom collections for agent entity state: collections opt in via `externallyWritable`, writes go through an authenticated schema-validated endpoint that stamps the principal into a read-only `_principal` column, and `createEntityTimelineQuery` can project them into the timeline via `customSources`. Comments are reimplemented as one such collection, gated per agent type through a reserved `comments/v1` contract that the UI keys its comment affordances on. diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index c25001f662..dfdabe8ccd 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -10,6 +10,7 @@ export { getEntityState, normalizeEntityTimelineData, normalizeTimelineEntities, + TIMELINE_ORDER_FALLBACK, } from './entity-timeline' export { canonicalPgSyncOptions, @@ -82,6 +83,7 @@ export type { export type { EntityTimelineContentItem, EntityTimelineData, + EntityTimelineCustomSource, EntityTimelineInboxMode, EntityTimelineQueryOptions, EntityTimelineQueryRow, @@ -96,4 +98,9 @@ export type { IncludesInboxMessage, IncludesRun, } from './entity-timeline' +export { COMMENTS_CONTRACT, commentsCollection } from './comments-collection' +export type { + CommentSnapshotValue as CommentSnapshot, + CommentTargetValue as CommentTarget, +} from './comments-collection' export type { EntityTimelineEntry } from './use-chat' diff --git a/packages/agents-runtime/src/comments-collection.ts b/packages/agents-runtime/src/comments-collection.ts new file mode 100644 index 0000000000..e03fdf1b53 --- /dev/null +++ b/packages/agents-runtime/src/comments-collection.ts @@ -0,0 +1,91 @@ +import { z } from 'zod' +import type { CollectionDefinition } from './types' + +export type CommentTargetValue = + | { kind: `comment`; key: string } + | { + kind: `timeline` + collection: + | `inbox` + | `run` + | `text` + | `tool_call` + | `wake` + | `signal` + | `manifest` + key: string + run_id?: string + } + +export type CommentSnapshotValue = { + label: string + text?: string + from?: string + timestamp?: string + collection?: string +} + +export type CommentValue = { + key?: string + body: string + timestamp: string + reply_to?: CommentTargetValue + target_snapshot?: CommentSnapshotValue + edited_at?: string + deleted_at?: string + deleted_by?: string +} + +const commentTargetSchema = z.union([ + z.object({ kind: z.literal(`comment`), key: z.string() }), + z.object({ + kind: z.literal(`timeline`), + collection: z.enum([ + `inbox`, + `run`, + `text`, + `tool_call`, + `wake`, + `signal`, + `manifest`, + ]), + key: z.string(), + run_id: z.string().optional(), + }), +]) + +const commentSnapshotSchema = z.object({ + label: z.string(), + text: z.string().optional(), + from: z.string().optional(), + timestamp: z.string().optional(), + collection: z.string().optional(), +}) + +export const commentSchema = z.object({ + key: z.string().optional(), + body: z.string(), + timestamp: z.string(), + reply_to: commentTargetSchema.optional(), + target_snapshot: commentSnapshotSchema.optional(), + edited_at: z.string().optional(), + deleted_at: z.string().optional(), + deleted_by: z.string().optional(), +}) + +/** + * Contract identifier for the canonical comments collection. The server + * reserves the `comments` collection name for this contract, and the UI + * only surfaces comment affordances for entity types whose registration + * advertises it — so an agent's unrelated `comments` state can never be + * mistaken for platform comments. + */ +export const COMMENTS_CONTRACT = `comments/v1` + +export const commentsCollection: CollectionDefinition = { + schema: commentSchema, + type: `state:comments`, + primaryKey: `key`, + externallyWritable: true, + contract: COMMENTS_CONTRACT, +} diff --git a/packages/agents-runtime/src/create-handler.ts b/packages/agents-runtime/src/create-handler.ts index c4008862d4..3a18c56a44 100644 --- a/packages/agents-runtime/src/create-handler.ts +++ b/packages/agents-runtime/src/create-handler.ts @@ -17,6 +17,7 @@ import type { IncomingMessage, ServerResponse } from 'node:http' import type { WebhookSignatureVerifierConfig } from './webhook-signature' import type { AgentTool, + AnyEntityDefinition, EntityStreamDBWithActions, HeadersProvider, ProcessWakeConfig, @@ -207,6 +208,119 @@ export interface RuntimeDebugState { export type RuntimeHandlerConfig = RuntimeRouterConfig export type RuntimeHandlerResult = RuntimeHandler +const JSON_SCHEMA_KEYWORDS = [ + `type`, + `properties`, + `items`, + `enum`, + `oneOf`, + `anyOf`, + `allOf`, + `additionalProperties`, +] as const + +function stripSchemaKeyword( + jsonSchema: Record +): Record { + const { $schema: _schema, ...rest } = jsonSchema + return rest +} + +function toJsonSchema(schema: unknown): Record { + if (!schema || typeof schema !== `object` || Array.isArray(schema)) { + return {} + } + + const standardSchema = schema as { + [`~standard`]?: { + jsonSchema?: { + input?: () => unknown + } + } + toJSONSchema?: () => Record + } + + const standardJsonSchema = standardSchema[`~standard`]?.jsonSchema?.input?.() + if (standardJsonSchema) { + return stripSchemaKeyword(standardJsonSchema as Record) + } + + if (typeof standardSchema.toJSONSchema === `function`) { + return stripSchemaKeyword(standardSchema.toJSONSchema()) + } + + if (`~standard` in standardSchema) { + return {} + } + + const jsonSchemaLike = schema as Record + if (JSON_SCHEMA_KEYWORDS.some((keyword) => keyword in jsonSchemaLike)) { + return stripSchemaKeyword(jsonSchemaLike) + } + + return zodToJsonSchema(schema as any, { target: `jsonSchema7` }) +} + +function mapSchemas( + schemas: Record +): Record> { + return Object.fromEntries( + Object.entries(schemas).map(([k, v]) => [k, toJsonSchema(v)]) + ) +} + +export function buildEntityTypeRegistrationBody( + name: string, + definition: AnyEntityDefinition +): Record { + const stateEntries = definition.state ? Object.entries(definition.state) : [] + + const stateSchemas = Object.fromEntries( + stateEntries.map(([collectionName, def]) => [ + def.type ?? `state:${collectionName}`, + toJsonSchema(def.schema ?? passthrough()), + ]) + ) + + const externallyWritableCollections: Record< + string, + { type: string; contract?: string } + > = {} + for (const [collectionName, def] of stateEntries) { + if (!def.externallyWritable) continue + externallyWritableCollections[collectionName] = { + type: def.type ?? `state:${collectionName}`, + ...(def.contract && { contract: def.contract }), + } + } + + const body: Record = { + name, + description: definition.description ?? `${name} entity`, + ...(definition.creationSchema && { + creation_schema: toJsonSchema(definition.creationSchema), + }), + ...(definition.inboxSchemas && { + inbox_schemas: mapSchemas(definition.inboxSchemas), + }), + ...(definition.slashCommands && { + slash_commands: definition.slashCommands, + }), + state_schemas: { + ...DEFAULT_STATE_SCHEMAS, + ...stateSchemas, + ...(definition.stateSchemas ? mapSchemas(definition.stateSchemas) : {}), + }, + ...(Object.keys(externallyWritableCollections).length > 0 && { + externally_writable_collections: externallyWritableCollections, + }), + ...(definition.permissionGrants && { + permission_grants: definition.permissionGrants, + }), + } + return body +} + export function createRuntimeRouter( config: RuntimeRouterConfig ): RuntimeRouter { @@ -413,60 +527,6 @@ export function createRuntimeRouter( return handleWebhookRequest(request) } - const stripSchemaKeyword = ( - jsonSchema: Record - ): Record => { - const { $schema: _schema, ...rest } = jsonSchema - return rest - } - - const JSON_SCHEMA_KEYWORDS = [ - `type`, - `properties`, - `items`, - `enum`, - `oneOf`, - `anyOf`, - `allOf`, - `additionalProperties`, - ] as const - - const toJsonSchema = (schema: unknown): Record => { - if (!schema || typeof schema !== `object` || Array.isArray(schema)) { - return {} - } - - const standardSchema = schema as { - [`~standard`]?: { - jsonSchema?: { - input?: () => unknown - } - } - toJSONSchema?: () => Record - } - - const standardJsonSchema = - standardSchema[`~standard`]?.jsonSchema?.input?.() - if (standardJsonSchema) { - return stripSchemaKeyword(standardJsonSchema as Record) - } - - if (typeof standardSchema.toJSONSchema === `function`) { - return stripSchemaKeyword(standardSchema.toJSONSchema()) - } - - if (`~standard` in standardSchema) { - return {} - } - - const jsonSchemaLike = schema as Record - if (JSON_SCHEMA_KEYWORDS.some((keyword) => keyword in jsonSchemaLike)) { - return stripSchemaKeyword(jsonSchemaLike) - } - - return zodToJsonSchema(schema as any, { target: `jsonSchema7` }) - } - const registerTypes = async (): Promise => { const types = getRegisteredTypes() const registered: Array = [] @@ -474,49 +534,11 @@ export function createRuntimeRouter( const totalStart = performance.now() const effectiveConcurrency = Math.max(1, registrationConcurrency ?? 8) - const mapSchemas = ( - schemas: Record - ): Record> => - Object.fromEntries( - Object.entries(schemas).map(([k, v]) => [k, toJsonSchema(v)]) - ) - await forEachWithConcurrency(types, effectiveConcurrency, async (entry) => { const registrationStart = performance.now() const { name, definition } = entry - const stateSchemas = definition.state - ? Object.fromEntries( - Object.entries(definition.state).map(([collectionName, def]) => [ - def.type ?? `state:${collectionName}`, - toJsonSchema(def.schema ?? passthrough()), - ]) - ) - : {} - - const body: Record = { - name, - description: definition.description ?? `${name} entity`, - ...(definition.creationSchema && { - creation_schema: toJsonSchema(definition.creationSchema), - }), - ...(definition.inboxSchemas && { - inbox_schemas: mapSchemas(definition.inboxSchemas), - }), - ...(definition.slashCommands && { - slash_commands: definition.slashCommands, - }), - state_schemas: { - ...DEFAULT_STATE_SCHEMAS, - ...stateSchemas, - ...(definition.stateSchemas - ? mapSchemas(definition.stateSchemas) - : {}), - }, - ...(definition.permissionGrants && { - permission_grants: definition.permissionGrants, - }), - } + const body = buildEntityTypeRegistrationBody(name, definition) const defaultDispatchPolicy = defaultDispatchPolicyForType?.(name) diff --git a/packages/agents-runtime/src/entity-stream-db.ts b/packages/agents-runtime/src/entity-stream-db.ts index a1b7cc7d02..37a87b11df 100644 --- a/packages/agents-runtime/src/entity-stream-db.ts +++ b/packages/agents-runtime/src/entity-stream-db.ts @@ -10,6 +10,7 @@ import { getStreamDBCollectionId, } from '@durable-streams/state/db' import { builtInCollections, passthrough } from './entity-schema' +import type { StandardSchemaV1 } from '@standard-schema/spec' import { formatPointerOrderToken, type EventPointer } from './event-pointer' import type { ChangeEvent, @@ -105,6 +106,52 @@ type EntityStreamDBOptions = { const WRITE_TXID_TIMEOUT_MS = 20_000 +/** + * Virtual column the authenticated principal (from the change-event header) is + * materialized into for externally writable collections. Like `_timeline_order`, + * it is stripped before client write-back. + */ +export const PRINCIPAL_COLUMN = `_principal` + +// Wrap a Standard Schema so that named virtual columns (e.g. `_timeline_order`, +// `_principal`) survive the validation step. TanStack DB calls the schema's +// validate() on every insert/update and uses result.value as the stored row, +// so any key not explicitly passed through by the schema is dropped. We +// extract the virtual fields before validation and re-attach them after. +function wrapSchemaWithVirtualColumns( + inner: StandardSchemaV1, + virtualColumns: Array +): StandardSchemaV1 { + return { + '~standard': { + version: 1 as const, + vendor: `electric-agents`, + validate: ( + value: unknown + ): StandardSchemaV1.Result | Promise> => { + if (typeof value !== `object` || value === null) { + return inner[`~standard`].validate(value) + } + const record = value as Record + const saved: Record = {} + for (const col of virtualColumns) { + if (col in record) saved[col] = record[col] + } + const reattach = ( + result: StandardSchemaV1.Result + ): StandardSchemaV1.Result => { + if (`issues` in result && result.issues) return result + return { value: Object.assign({}, result.value, saved) as T } + } + const result = inner[`~standard`].validate(value) + return result instanceof Promise + ? result.then(reattach) + : reattach(result) + }, + }, + } +} + /** * Create a StreamDB connected to a Electric Agents entity stream. * @@ -127,10 +174,26 @@ export function createEntityStreamDB( // Convert entity-level CollectionDefinition (with optional JSON schema) to // stream-db CollectionDefinition (with Standard Schema validator + type + primaryKey) const streamCustomState: Record = {} + const externallyWritableCollections = new Set() if (customState) { for (const [name, def] of Object.entries(customState)) { + if (def.externallyWritable) { + externallyWritableCollections.add(name) + } + + // When virtual columns are projected onto the row, wrap the user schema + // to preserve those fields through TanStack DB's schema validation. + const baseSchema = def.schema ?? passthrough() + const virtualColumns = [ + `_timeline_order`, + ...(def.externallyWritable ? [PRINCIPAL_COLUMN] : []), + ] + const schema = def.schema + ? wrapSchemaWithVirtualColumns(baseSchema, virtualColumns) + : baseSchema + streamCustomState[name] = { - schema: def.schema ?? passthrough(), + schema, type: def.type ?? `state:${name}`, primaryKey: def.primaryKey ?? `key`, } @@ -188,6 +251,7 @@ export function createEntityStreamDB( const clone = { ...row } delete clone._seq delete clone._timeline_order + delete clone[PRINCIPAL_COLUMN] return clone } @@ -354,6 +418,13 @@ export function createEntityStreamDB( orders.set(item.key, order) } ;(item.value as Record)._timeline_order = order + if (externallyWritableCollections.has(collectionName)) { + const principal = (item.headers as Record).principal + if (principal !== undefined) { + ;(item.value as Record)[PRINCIPAL_COLUMN] = + principal + } + } }) // After processing the batch, advance the anchor for next time. // `batch.offset` is the `Stream-Next-Offset` for this batch — @@ -727,6 +798,13 @@ export function createEntityStreamDB( const order = orders?.get(event.key) ?? formatPointerOrderToken(pointer) orders?.set(event.key, order) ;(event.value as Record)._timeline_order = order + if (externallyWritableCollections.has(collectionName)) { + const principal = (event.headers as Record).principal + if (principal !== undefined) { + ;(event.value as Record)[PRINCIPAL_COLUMN] = + principal + } + } } const transaction = createWriteTransaction({ debugOrigin: `apply-event:${event.type}:${event.headers.operation}`, diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index 116818b246..e21c29e440 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -26,6 +26,8 @@ import { formatPointerOrderToken, type EventPointer } from './event-pointer' import type { ChildStatusEntry, MessageReceived, Signal } from './entity-schema' import type { ManifestEntry, Wake, WakeMessage } from './types' +export const TIMELINE_ORDER_FALLBACK = `~` + export type EntityTimelineState = | `pending` | `queued` @@ -198,8 +200,23 @@ export interface EntityTimelineData { export type EntityTimelineInboxMode = `processed` | `all` +/** + * A consumer-provided source unioned into the timeline query under its own + * row key. The projection must include `order` (timeline order token) and + * `key`; all other fields are passed through to the timeline row. + */ +export type EntityTimelineCustomSource = ( + q: InitialQueryBuilder +) => QueryBuilder + export interface EntityTimelineQueryOptions { inboxMode?: EntityTimelineInboxMode + /** + * Additional sources merged into the timeline, keyed by row name. Names + * must not collide with the built-in sources (`inbox`, `run`, `wake`, + * `signal`, `manifest`). + */ + customSources?: Record } export interface EntityTimelineTextChunk { @@ -511,7 +528,7 @@ function readTimelineOrder(row: object): string | undefined { } export function createPendingTimelineOrder(index: number): string { - return `~pending:${index.toString().padStart(12, `0`)}` + return `${TIMELINE_ORDER_FALLBACK}pending:${index.toString().padStart(12, `0`)}` } function toSeqOrderToken(seq: number): string { @@ -1513,47 +1530,46 @@ function buildEntityTimelineQuery( })), })) - return q - .unionAll({ - inbox: inboxSource, - run: runSource, - wake: wakeSource, - signal: signalSource, - error: errorSource, - manifest: db.collections.manifests, - }) - .orderBy(({ inbox, run, wake, signal, error, manifest }) => - coalesce( - inbox.order, - run.order, - wake.order, - signal.order, - error.order, - manifest._timeline_order, - `~` + const sources: Record = { + inbox: inboxSource, + run: runSource, + wake: wakeSource, + signal: signalSource, + error: errorSource, + manifest: db.collections.manifests, + } + for (const [name, buildSource] of Object.entries(opts.customSources ?? {})) { + if (name in sources) { + throw new Error( + `customSources name "${name}" collides with a built-in timeline source` ) + } + sources[name] = buildSource(q) + } + const sourceNames = Object.keys(sources) + // The manifests collection joins the union raw, so its order lives on + // `_timeline_order` rather than a projected `order` field. + const orderRef = (refs: any, name: string) => + name === `manifest` ? refs.manifest._timeline_order : refs[name].order + const coalesceAll = (exprs: Array) => + coalesce(...(exprs as [any, ...Array])) + + return q + .unionAll(sources) + .orderBy((refs: any) => + coalesceAll([ + ...sourceNames.map((name) => orderRef(refs, name)), + TIMELINE_ORDER_FALLBACK, + ]) ) - .orderBy(({ inbox, run, wake, signal, error, manifest }) => - coalesce( - caseWhen(inbox.key, `inbox`), - caseWhen(run.key, `run`), - caseWhen(wake.key, `wake`), - caseWhen(signal.key, `signal`), - caseWhen(error.key, `error`), - caseWhen(manifest.key, `manifest`), - `` - ) + .orderBy((refs: any) => + coalesceAll([ + ...sourceNames.map((name) => caseWhen(refs[name].key, name)), + ``, + ]) ) - .orderBy(({ inbox, run, wake, signal, error, manifest }) => - coalesce( - inbox.key, - run.key, - wake.key, - signal.key, - error.key, - manifest.key, - `` - ) + .orderBy((refs: any) => + coalesceAll([...sourceNames.map((name) => refs[name].key), ``]) ) } diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 7942ae81e0..832ecfe849 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -398,3 +398,14 @@ export { STREAM_TOKEN_PREFIX, } from './event-pointer' export type { EventPointer } from './event-pointer' + +export { + COMMENTS_CONTRACT, + commentSchema, + commentsCollection, +} from './comments-collection' +export type { + CommentTargetValue, + CommentSnapshotValue, + CommentValue, +} from './comments-collection' diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index ec366ab670..32a4d841c5 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -639,6 +639,18 @@ export interface CollectionDefinition< type?: string /** Primary key field name. Defaults to `"key"`. */ primaryKey?: string + /** + * Opt-in for externally writable via the HTTP router: `POST /:type/:instanceId/collections/:name`. + * Absent/false ⇒ agent-only; the endpoint rejects writes. `true` ⇒ externally writable, + * with the authenticated principal materialized into the `_principal` virtual column. + */ + externallyWritable?: boolean + /** + * Well-known contract this collection implements (e.g. `comments/v1`). + * Forwarded in the type registration so clients can recognize the + * collection by capability instead of by name. + */ + contract?: string } export interface EntityTypeEntry< diff --git a/packages/agents-runtime/test/comments-collection.test.ts b/packages/agents-runtime/test/comments-collection.test.ts new file mode 100644 index 0000000000..866d3081a2 --- /dev/null +++ b/packages/agents-runtime/test/comments-collection.test.ts @@ -0,0 +1,28 @@ +import { describe, it, expect } from 'vitest' +import { commentSchema, commentsCollection } from '../src/comments-collection' + +describe(`commentsCollection`, () => { + it(`parses a valid comment with a timeline reply_to target`, () => { + const result = commentSchema.parse({ + key: `c-1`, + body: `LGTM`, + timestamp: `2024-01-01T00:00:00Z`, + reply_to: { + kind: `timeline`, + collection: `run`, + key: `run-42`, + run_id: `run-42`, + }, + }) + expect(result.body).toBe(`LGTM`) + expect(result.reply_to).toMatchObject({ + kind: `timeline`, + collection: `run`, + }) + }) + + it(`is externally writable and declares the comments contract`, () => { + expect(commentsCollection.externallyWritable).toBe(true) + expect(commentsCollection.contract).toBe(`comments/v1`) + }) +}) diff --git a/packages/agents-runtime/test/create-handler-externally-writable.test.ts b/packages/agents-runtime/test/create-handler-externally-writable.test.ts new file mode 100644 index 0000000000..9e8fba224e --- /dev/null +++ b/packages/agents-runtime/test/create-handler-externally-writable.test.ts @@ -0,0 +1,48 @@ +import { describe, it, expect } from 'vitest' +import { z } from 'zod' +import { + COMMENTS_CONTRACT, + commentsCollection, +} from '../src/comments-collection' +import { buildEntityTypeRegistrationBody } from '../src/create-handler' + +describe(`buildEntityTypeRegistrationBody`, () => { + it(`emits externally_writable_collections for externally writable state collections only`, () => { + const body = buildEntityTypeRegistrationBody(`chat`, { + description: `chat`, + handler: async () => {}, + state: { + feedback: { + schema: z.object({ key: z.string().optional(), body: z.string() }), + externallyWritable: true, + }, + scratch: { + schema: z.object({ key: z.string().optional(), note: z.string() }), + }, + }, + } as any) + expect(body.externally_writable_collections).toEqual({ + feedback: { type: `state:feedback` }, + }) + }) + + it(`forwards the collection contract when declared`, () => { + const body = buildEntityTypeRegistrationBody(`chat`, { + description: `chat`, + handler: async () => {}, + state: { comments: commentsCollection }, + } as any) + expect(body.externally_writable_collections).toEqual({ + comments: { type: `state:comments`, contract: COMMENTS_CONTRACT }, + }) + }) + + it(`omits externally_writable_collections when no collection opts in`, () => { + const body = buildEntityTypeRegistrationBody(`chat`, { + description: `chat`, + handler: async () => {}, + state: { scratch: { schema: z.object({ note: z.string() }) } }, + } as any) + expect(body.externally_writable_collections).toBeUndefined() + }) +}) diff --git a/packages/agents-runtime/test/entity-stream-db-principal.test.ts b/packages/agents-runtime/test/entity-stream-db-principal.test.ts new file mode 100644 index 0000000000..087ae02e75 --- /dev/null +++ b/packages/agents-runtime/test/entity-stream-db-principal.test.ts @@ -0,0 +1,107 @@ +import { describe, it, expect } from 'vitest' +import { z } from 'zod' +import { createEntityStreamDB } from '../src/entity-stream-db' +import type { ChangeEvent } from '@durable-streams/state' + +function principalHeader() { + return { url: `/principal/user%3Aalice`, kind: `user`, id: `alice` } +} + +describe(`entity-stream-db principal virtual column`, () => { + it(`projects headers.principal onto the configured column for externally writable collections`, () => { + const db = createEntityStreamDB(`/chat/sess-1`, { + comments: { + schema: z.object({ key: z.string().optional(), body: z.string() }), + externallyWritable: true, + }, + }) + db.utils.applyEvent({ + type: `state:comments`, + key: `c1`, + headers: { operation: `insert`, principal: principalHeader() }, + value: { body: `hi` }, + } as any) + const row = db.collections.comments.get(`c1`) as Record + expect(row.body).toBe(`hi`) + expect(row._principal).toEqual(principalHeader()) + }) + + it(`projects headers.principal onto _principal when externallyWritable: true`, () => { + const db = createEntityStreamDB(`/chat/sess-3`, { + notes: { + schema: z.object({ key: z.string().optional(), body: z.string() }), + externallyWritable: true, + }, + }) + db.utils.applyEvent({ + type: `state:notes`, + key: `n2`, + headers: { operation: `insert`, principal: principalHeader() }, + value: { body: `hello` }, + } as any) + const row = db.collections.notes.get(`n2`) as Record + expect(row._principal).toEqual(principalHeader()) + }) + + it(`does not add a principal column when the collection is not writable`, () => { + const db = createEntityStreamDB(`/chat/sess-2`, { + notes: { + schema: z.object({ key: z.string().optional(), body: z.string() }), + }, + }) + db.utils.applyEvent({ + type: `state:notes`, + key: `n1`, + headers: { operation: `insert`, principal: principalHeader() }, + value: { body: `hi` }, + } as any) + const row = db.collections.notes.get(`n1`) as Record + expect(row._principal).toBeUndefined() + }) + + it(`strips the principal column from outgoing ChangeEvent value on insert`, async () => { + const captured: Array = [] + let awaitTxIdResolve: (() => void) | undefined + const db = createEntityStreamDB( + `/chat/sess-4`, + { + comments: { + schema: z.object({ key: z.string().optional(), body: z.string() }), + externallyWritable: true, + }, + }, + undefined, + { + writeEvent: (ev) => captured.push(ev), + flushWrites: async () => {}, + } + ) + + // Stub awaitTxId so the action's mutationFn resolves immediately + ;(db.utils as any).awaitTxId = (_txid: string) => + new Promise((r) => { + awaitTxIdResolve = r + }) + + // Trigger an insert that carries a _principal field (simulating a row + // materialized with the principal virtual column writing back to the server) + const actionPromise = (db.actions as any).comments_insert({ + row: { key: `c1`, body: `hello`, _principal: principalHeader() }, + }) + + // writeEvent is called synchronously inside persistMutationsNow before + // flushWrites resolves, so captured should be populated already + await Promise.resolve() + + expect(captured).toHaveLength(1) + const ev = captured[0]! as ChangeEvent & { value: Record } + expect(ev.value._principal).toBeUndefined() + expect(ev.value._seq).toBeUndefined() + expect(ev.value._timeline_order).toBeUndefined() + expect(ev.value.body).toBe(`hello`) + + // Resolve the awaitTxId so the action promise doesn't hang + awaitTxIdResolve?.() + await actionPromise + }) +}) diff --git a/packages/agents-runtime/test/entity-timeline.test.ts b/packages/agents-runtime/test/entity-timeline.test.ts index 6528346bbc..0b15577c6a 100644 --- a/packages/agents-runtime/test/entity-timeline.test.ts +++ b/packages/agents-runtime/test/entity-timeline.test.ts @@ -1864,6 +1864,67 @@ describe(`entity includes query`, () => { ) }) + it(`unions customSources into the timeline by order`, async () => { + const { collections, sync } = createEntityCollections() + let extraOffset = 1000 + const annotations = createSyncCollection(`test-annotations`, () => + offset(extraOffset++) + ) + const liveQuery = createLiveQueryCollection({ + query: createEntityTimelineQuery({ collections } as any, { + customSources: { + annotation: (q) => + q + .from({ annotation: annotations.collection }) + .select(({ annotation }: any) => ({ + order: annotation._timeline_order, + key: annotation.key, + note: annotation.note, + })), + }, + }), + startSync: true, + }) + await liveQuery.preload() + + sync.inbox.insert({ + key: `msg-1`, + _timeline_order: order(1), + from: `user`, + payload: `start`, + timestamp: `2026-04-15T18:00:00.000Z`, + status: `processed`, + }) + annotations.insert({ + key: `note-1`, + _timeline_order: order(2), + note: `between`, + }) + sync.wakes.insert({ + key: `wake-1`, + _timeline_order: order(3), + timestamp: `2026-04-15T18:00:10.000Z`, + source: `/chat/test`, + timeout: false, + changes: [], + }) + await new Promise((r) => setTimeout(r, 50)) + + const rows = Array.from((liveQuery as any).entries()).map( + ([, v]: any) => v + ) + expect( + rows.map((row: any) => + row.inbox + ? `inbox:${row.inbox.key}` + : row.annotation + ? `annotation:${row.annotation.key}` + : `wake:${row.wake?.key}` + ) + ).toEqual([`inbox:msg-1`, `annotation:note-1`, `wake:wake-1`]) + expect(rows[1]?.annotation?.note).toBe(`between`) + }) + it(`projects related entities from one manifest row per related entity`, () => { const timeline = buildEntityTimelineData({ collections: { diff --git a/packages/agents-server-ui/src/components/AgentResponse.tsx b/packages/agents-server-ui/src/components/AgentResponse.tsx index a98dc51b21..1d05ccc0ca 100644 --- a/packages/agents-server-ui/src/components/AgentResponse.tsx +++ b/packages/agents-server-ui/src/components/AgentResponse.tsx @@ -1,4 +1,4 @@ -import { Check, Copy, GitFork } from 'lucide-react' +import { Check, Copy, GitFork, Reply } from 'lucide-react' import { memo, useEffect, @@ -386,6 +386,8 @@ export const AgentResponseLive = memo(function AgentResponseLive({ timestamp, renderWidth = 0, forkFromHere, + onReply, + onReplyToToolCall, onSearchTextChange, }: { rowKey: string @@ -394,6 +396,8 @@ export const AgentResponseLive = memo(function AgentResponseLive({ timestamp?: number | null renderWidth?: number forkFromHere?: ForkFromHereAction + onReply?: () => void + onReplyToToolCall?: (item: EntityTimelineToolCallItem) => void onSearchTextChange?: (rowKey: string, text: string) => void }): React.ReactElement { const { data: items = [] } = useLiveQuery( @@ -519,6 +523,11 @@ export const AgentResponseLive = memo(function AgentResponseLive({ onReplyToToolCall(item.toolCall!) + : undefined + } /> ) })} @@ -583,6 +592,7 @@ export const AgentResponseLive = memo(function AgentResponseLive({ copied={copied} onCopy={() => void copyResponseText()} forkFromHere={done ? forkFromHere : undefined} + onReply={onReply} /> @@ -594,20 +604,37 @@ function ResponseMetaActions({ copied, onCopy, forkFromHere, + onReply, }: { showCopy: boolean copied: boolean onCopy: () => void forkFromHere?: ForkFromHereAction + onReply?: () => void }): React.ReactElement | null { const showFork = forkFromHere !== undefined - if (!showCopy && !showFork) return null + if (!showCopy && !showFork && !onReply) return null const forkDisabled = forkFromHere?.disabled === true || !forkFromHere?.onFork const forkLabel = forkDisabled ? `Fork permission required` : `Fork from here` return ( + {onReply && ( + + + + + + )} {showFork && ( @@ -654,12 +681,14 @@ export const AgentResponse = memo(function AgentResponse({ timestamp, renderWidth = 0, forkFromHere, + onReply, }: { section: AgentResponseSection isStreaming: boolean timestamp?: number | null renderWidth?: number forkFromHere?: ForkFromHereAction + onReply?: () => void }): React.ReactElement { const canCache = !isStreaming && section.done === true const [copied, setCopied] = useState(false) @@ -803,6 +832,7 @@ export const AgentResponse = memo(function AgentResponse({ copied={copied} onCopy={() => void copyResponseText()} forkFromHere={section.done ? forkFromHere : undefined} + onReply={onReply} /> diff --git a/packages/agents-server-ui/src/components/CommentBubble.module.css b/packages/agents-server-ui/src/components/CommentBubble.module.css new file mode 100644 index 0000000000..9beebc9298 --- /dev/null +++ b/packages/agents-server-ui/src/components/CommentBubble.module.css @@ -0,0 +1,176 @@ +.root { + display: flex; + width: 100%; + box-sizing: border-box; +} + +.root[data-own='true'] { + justify-content: flex-end; +} + +.root[data-own='false'] { + justify-content: flex-start; +} + +.column { + display: grid; + gap: 4px; + max-width: min(72%, 640px); +} + +.root[data-own='true'] .column { + justify-items: end; +} + +.root[data-own='false'] .column { + justify-items: start; +} + +.message { + display: grid; + gap: 4px; + width: fit-content; + max-width: 100%; +} + +.root[data-own='true'] .message { + justify-self: end; +} + +.root[data-own='false'] .message { + justify-self: start; +} + +.bubble { + min-width: 0; + padding: 9px 12px; + border: 1px solid #000; + border-radius: var(--ds-radius-4); + background: #000; + color: #fff; + box-shadow: var(--ds-shadow-1); +} + +.root[data-single-line='true'] .bubble { + border-radius: var(--ds-radius-5); + padding-block: 7px; +} + +:global(html[data-theme='dark']) .bubble { + border-color: #fff; + background: #fff; + color: #000; +} + +.body, +.deletedBody { + white-space: pre-wrap; + overflow-wrap: anywhere; + font-size: var(--ds-chat-text); + line-height: var(--ds-chat-text-lh); +} + +.deletedBody { + color: inherit; + opacity: 0.64; + font-style: italic; +} + +.preview { + display: flex; + align-items: flex-start; + gap: 2px; + max-width: 100%; + margin: 0; + padding: 0; + border: 0; + background: transparent; + color: var(--ds-text-3); + font: inherit; + text-align: left; +} + +.previewButton { + border-radius: var(--ds-radius-2); + cursor: pointer; +} + +.previewButton:hover .previewContent { + border-left-color: var(--ds-text-3); +} + +.previewButton:focus-visible { + outline: 2px solid var(--ds-focus-ring); + outline-offset: 2px; +} + +.previewIcon { + flex: 0 0 auto; + margin-top: 1px; + color: var(--ds-text-4); +} + +.previewContent { + display: grid; + gap: 2px; + min-width: 0; + padding-left: 6px; + border-left: 2px solid var(--ds-gray-a8); +} + +.previewLabel, +.previewText { + min-width: 0; + overflow: hidden; + text-overflow: ellipsis; +} + +.previewLabel { + white-space: nowrap; + font-size: var(--ds-text-xs); + font-weight: 600; + line-height: var(--ds-text-xs-lh); +} + +.previewText { + display: -webkit-box; + -webkit-box-orient: vertical; + -webkit-line-clamp: 2; + font-size: var(--ds-text-xs); + line-height: var(--ds-text-xs-lh); +} + +.meta { + display: inline-flex; + align-items: center; + gap: 6px; + width: 100%; + box-sizing: border-box; + padding-inline: 8px; +} + +.meta > :not(.metaActions) { + opacity: 0.5; +} + +.metaActions { + margin-left: auto; + display: inline-flex; + align-items: center; + gap: 2px; +} + +.metaActionButton { + color: var(--ds-text-4); + opacity: 0.7; +} + +.metaActionButton:hover { + opacity: 1; +} + +@media (max-width: 700px) { + .column { + max-width: min(88%, 640px); + } +} diff --git a/packages/agents-server-ui/src/components/CommentBubble.tsx b/packages/agents-server-ui/src/components/CommentBubble.tsx new file mode 100644 index 0000000000..6ffecede52 --- /dev/null +++ b/packages/agents-server-ui/src/components/CommentBubble.tsx @@ -0,0 +1,145 @@ +import { memo } from 'react' +import { Reply } from 'lucide-react' +import type { + CommentSnapshot, + CommentTarget, +} from '@electric-ax/agents-runtime/client' +import type { EntityTimelineCommentRow } from '../lib/comments' +import { Icon, IconButton, Text, Tooltip } from '../ui' +import { formatSender, principalKeyFromInput } from '../lib/principals' +import { TimeText } from './TimeText' +import type { ElectricUser } from '../lib/ElectricAgentsProvider' +import styles from './CommentBubble.module.css' + +export const CommentBubble = memo(function CommentBubble({ + comment, + currentPrincipal, + usersById, + showMeta = true, + onReply, + onTargetClick, +}: { + comment: EntityTimelineCommentRow + currentPrincipal?: string + usersById?: Map + showMeta?: boolean + onReply?: (comment: EntityTimelineCommentRow) => void + onTargetClick?: (target: CommentTarget) => void +}): React.ReactElement { + const isOwn = + principalKeyFromInput(comment.from) === + principalKeyFromInput(currentPrincipal) + const sender = formatSender(comment.from, { + currentPrincipal, + usersById, + }) + const timestamp = Date.parse(comment.timestamp) + const deleted = Boolean(comment.deleted_at) + const singleLine = !deleted && !/[\r\n]/.test(comment.body) + + return ( +
+
+ {comment.target_snapshot && ( + onTargetClick(comment.reply_to!) + : undefined + } + /> + )} +
+
+
+ {deleted ? `Comment deleted` : comment.body} +
+
+ {showMeta && ( +
+ + {sender.label} + + {Number.isFinite(timestamp) && ( + <> + + - + + + + )} + {comment.edited_at && !deleted && ( + <> + + - + + + edited + + + )} + {onReply && !deleted && ( + + + onReply(comment)} + > + + + + + )} +
+ )} +
+
+
+ ) +}) + +function ReplyPreview({ + snapshot, + onClick, +}: { + snapshot: CommentSnapshot + onClick?: () => void +}): React.ReactElement { + const content = ( + <> + +
+
{snapshot.label}
+ {snapshot.text && ( +
{snapshot.text}
+ )} +
+ + ) + + if (onClick) { + return ( + + ) + } + + return
{content}
+} diff --git a/packages/agents-server-ui/src/components/EntityTimeline.module.css b/packages/agents-server-ui/src/components/EntityTimeline.module.css index ca9e59c46d..3e38026550 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.module.css +++ b/packages/agents-server-ui/src/components/EntityTimeline.module.css @@ -191,6 +191,20 @@ box-sizing: border-box; } +.virtualRow[data-highlighted='true'] { + animation: targetPulse 1600ms ease-out; +} + +@keyframes targetPulse { + 0%, + 65% { + background: color-mix(in oklab, var(--ds-accent-a4) 70%, transparent); + } + 100% { + background: transparent; + } +} + /* Jump-to-bottom affordance — centered over the chat column so it relates to the conversation rather than the viewport edge. It stays mounted and toggles visibility with opacity/translate for a soft diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index d625c2c801..c55c1382be 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -23,6 +23,7 @@ import { FileJson, GitBranch, Radio, + Reply, } from 'lucide-react' import { loadTimelineRowHeights, @@ -46,6 +47,7 @@ import { Icon, IconButton, ScrollArea, Stack, Text, Tooltip } from '../ui' import { UserMessage } from './UserMessage' import type { ForkFromHereAction, UserMessageAttachment } from './UserMessage' import { AgentResponseLive } from './AgentResponse' +import { CommentBubble } from './CommentBubble' import { InlineEventCard } from './InlineEventCard' import { InlineStatusBadge } from './InlineStatusBadge' import { @@ -57,20 +59,27 @@ import { formatChatTimestamp, } from '../lib/formatTime' import { readTextPayload } from '../lib/sendMessage' +import { principalKeyFromInput } from '../lib/principals' import styles from './EntityTimeline.module.css' import type { ElectricUser } from '../lib/ElectricAgentsProvider' import type { + SelectedCommentTarget, + TimelineRow, + TimelineRowAdjacency, +} from '../lib/comments' +import type { + CommentTarget, EntityTimelineSection, - EntityTimelineQueryRow, EntityTimelineRunItem, EntityTimelineRunRow, + EntityTimelineToolCallItem, IncludesEntity, Manifest, } from '@electric-ax/agents-runtime/client' import type { ErrorInfo, ReactNode } from 'react' import type { PaneFindAdapter, PaneFindMatch } from '../hooks/usePaneFind' -type RenderTimelineRow = EntityTimelineQueryRow +type RenderTimelineRow = TimelineRow type WakeSection = Extract function renderRowKey(row: RenderTimelineRow): string { @@ -212,7 +221,8 @@ class TimelineRowErrorBoundary extends Component< */ function estimateRowHeight( row: RenderTimelineRow | undefined, - contentWidth: number + contentWidth: number, + nextRow?: RenderTimelineRow ): number { if (!row) return 120 @@ -227,12 +237,16 @@ function estimateRowHeight( 1, Math.ceil(readInboxText(row.inbox.payload).length / charsPerLine) ) - return Math.max(64, 48 + lines * lineHeight) + timelineRowGap(row) + return Math.max(64, 48 + lines * lineHeight) + timelineRowGap(row, nextRow) + } + if (row.comment) { + const lines = Math.max(1, Math.ceil(row.comment.body.length / charsPerLine)) + return Math.max(58, 42 + lines * lineHeight) + timelineRowGap(row, nextRow) } if (row.wake || row.signal || row.manifest) { - return 76 + timelineRowGap(row) + return 76 + timelineRowGap(row, nextRow) } - return 120 + timelineRowGap(row) + return 120 + timelineRowGap(row, nextRow) } const BOTTOM_PIN_THRESHOLD = 8 @@ -242,10 +256,37 @@ const MANIFEST_ROW_GAP = 10 const ROW_SETTLE_MS = 500 type EntityStatus = NonNullable -function timelineRowGap(row: RenderTimelineRow): number { +function timelineRowGap( + row: RenderTimelineRow, + nextRow?: RenderTimelineRow +): number { + if (shouldCollapseCommentMeta(row, nextRow)) return 6 return row.manifest || row.wake || row.signal ? MANIFEST_ROW_GAP : ROW_GAP } +function isPlainCommentRow(row: RenderTimelineRow | undefined): boolean { + const comment = row?.comment + if (!comment) return false + return !comment.deleted_at && !comment.reply_to && !comment.target_snapshot +} + +function shouldCollapseCommentMeta( + row: RenderTimelineRow | undefined, + nextRow: RenderTimelineRow | undefined +): boolean { + if (!isPlainCommentRow(row) || !isPlainCommentRow(nextRow)) return false + const principal = principalKeyFromInput(row?.comment?.from) + if (!principal) return false + return principal === principalKeyFromInput(nextRow?.comment?.from) +} + +function shouldShowCommentMeta( + row: RenderTimelineRow, + nextRow: RenderTimelineRow | undefined +): boolean { + return !shouldCollapseCommentMeta(row, nextRow) +} + type TimelinePaneFindMatch = PaneFindMatch & { rowKey: string rowIndex: number @@ -256,6 +297,7 @@ function timelineRowSearchText( row: RenderTimelineRow, runSearchTextByKey: Map ): string { + if (row.comment) return row.comment.body if (row.inbox) return readInboxText(row.inbox.payload) if (row.wake) { return wakeSectionText({ @@ -271,6 +313,7 @@ function timelineRowSearchText( } function timelineRowLabel(row: RenderTimelineRow): string { + if (row.comment) return `Comment` if (row.inbox?.from_agent) return `Agent message` if (row.inbox) return `User message` if (row.wake) return `Wake` @@ -280,6 +323,160 @@ function timelineRowLabel(row: RenderTimelineRow): string { return `Agent response` } +function truncateCommentPreview(text: string, maxLength = 280): string { + const compact = text.replace(/\s+/g, ` `).trim() + return compact.length <= maxLength + ? compact + : `${compact.slice(0, maxLength - 3)}...` +} + +function createReplyTargetForRow( + row: RenderTimelineRow, + runSearchTextByKey: Map +): SelectedCommentTarget | null { + if (row.comment) { + return { + target: { kind: `comment`, key: row.comment.key }, + snapshot: { + label: `Comment`, + text: truncateCommentPreview(row.comment.body), + from: row.comment.from, + timestamp: row.comment.timestamp, + collection: `comment`, + }, + } + } + + if (row.inbox) { + return { + target: { kind: `timeline`, collection: `inbox`, key: row.inbox.key }, + snapshot: { + label: row.inbox.from_agent ? `Agent message` : `User message`, + text: truncateCommentPreview(readInboxText(row.inbox.payload)), + from: row.inbox.from, + timestamp: row.inbox.timestamp, + collection: `inbox`, + }, + } + } + + if (row.run) { + return { + target: { kind: `timeline`, collection: `run`, key: row.run.key }, + snapshot: { + label: `Assistant response`, + text: truncateCommentPreview( + runSearchTextByKey.get(row.$key) ?? runSearchTextFromSnapshot(row.run) + ), + collection: `run`, + }, + } + } + + if (row.wake) { + return { + target: { kind: `timeline`, collection: `wake`, key: row.wake.key }, + snapshot: { + label: `Wake`, + text: truncateCommentPreview(stringifyPayload(row.wake.payload)), + timestamp: row.wake.payload.timestamp, + collection: `wake`, + }, + } + } + + if (row.signal) { + return { + target: { kind: `timeline`, collection: `signal`, key: row.signal.key }, + snapshot: { + label: `Signal`, + text: truncateCommentPreview(signalSearchText(row.signal)), + timestamp: row.signal.timestamp, + collection: `signal`, + }, + } + } + + if (row.manifest) { + return { + target: { + kind: `timeline`, + collection: `manifest`, + key: row.manifest.key, + }, + snapshot: { + label: manifestKindLabel(row.manifest), + text: truncateCommentPreview(manifestSearchText(row.manifest)), + collection: `manifest`, + }, + } + } + + return null +} + +function createReplyTargetForToolCall( + row: RenderTimelineRow, + toolCall: EntityTimelineToolCallItem +): SelectedCommentTarget { + const runId = row.run?.key ?? toolCall.run_id + return { + target: { + kind: `timeline`, + collection: `tool_call`, + key: toolCall.key, + ...(runId ? { run_id: runId } : {}), + }, + snapshot: { + label: `Tool call`, + text: truncateCommentPreview( + [ + toolCall.tool_name, + stringifySearchPayload(toolCall.args), + stringifySearchPayload(toolCall.result), + stringifySearchPayload(toolCall.error), + ] + .filter((text) => text.length > 0) + .join(` `) + ), + collection: `tool_call`, + }, + } +} + +function timelineRowMatchesCommentTarget( + row: RenderTimelineRow, + target: CommentTarget +): boolean { + if (target.kind === `comment`) { + return row.comment?.key === target.key + } + + switch (target.collection) { + case `inbox`: + return row.inbox?.key === target.key + case `run`: + return row.run?.key === target.key + case `wake`: + return row.wake?.key === target.key + case `signal`: + return row.signal?.key === target.key + case `manifest`: + return row.manifest?.key === target.key + case `text`: + case `tool_call`: { + const run = row.run + if (!run) return false + if (target.run_id && run.key === target.run_id) return true + return run.items.toArray.some((item) => + target.collection === `text` + ? item.text?.key === target.key + : item.toolCall?.key === target.key + ) + } + } +} + function firstSelfSendWakeChange( section: WakeSection, entityUrl?: string | null @@ -338,9 +535,11 @@ function wakeSectionText( function WakeTimelineRow({ section, entityUrl, + onReply, }: { section: WakeSection entityUrl?: string | null + onReply?: () => void }): React.ReactElement { const reason = wakeReason(section, entityUrl) const details = wakeDetails(section, entityUrl) @@ -352,7 +551,13 @@ function WakeTimelineRow({ icon={Radio} title="woke" summary={`${reason} · ${formatChatTimestamp(section.timestamp)}`} + actions={ + onReply ? ( + + ) : undefined + } defaultExpanded={false} + collapsible headerSurface >
@@ -377,9 +582,11 @@ function WakeTimelineRow({ function AgentInboxMessageRow({ inbox, entityUrl, + onReply, }: { inbox: NonNullable entityUrl?: string | null + onReply?: () => void }): React.ReactElement { const parsed = Date.parse(inbox.timestamp) const timestamp = Number.isFinite(parsed) ? parsed : Date.now() @@ -400,7 +607,16 @@ function AgentInboxMessageRow({ icon={Radio} title={isSelfSend ? `sent to itself` : `agent message`} summary={`${isSelfSend ? `self-send` : fromAgent} · ${formatChatTimestamp(timestamp)}`} + actions={ + onReply ? ( + + ) : undefined + } defaultExpanded={false} + collapsible headerSurface >
@@ -421,8 +637,10 @@ function AgentInboxMessageRow({ function SignalTimelineRow({ signal, + onReply, }: { signal: NonNullable + onReply?: () => void }): React.ReactElement { return (
@@ -430,6 +648,11 @@ function SignalTimelineRow({ icon={CircleStop} title={`signal ${signal.signal}`} summary={signalSummary(signal)} + actions={ + onReply ? ( + + ) : undefined + } headerSurface />
@@ -593,11 +816,13 @@ function ManifestTimelineRow({ manifest, entityUrl, entityStatus, + onReply, }: { manifest: Manifest entityUrl: string | null tileId: string | null entityStatus?: EntityStatus + onReply?: () => void }): React.ReactElement { const workspace = useOptionalWorkspace() const navigate = useNavigate() @@ -667,10 +892,17 @@ function ManifestTimelineRow({ ) : null + const replyAction = onReply ? ( + + ) : null const actions = - statusBadge || openAction ? ( + statusBadge || openAction || replyAction ? ( <> {statusBadge} + {replyAction} {openAction} ) : undefined @@ -910,6 +1142,32 @@ function titleCase(value: string): string { .join(` `) } +function TimelineReplyAction({ + label, + onReply, +}: { + label: string + onReply?: () => void +}): React.ReactElement | null { + if (!onReply) return null + return ( + + + + + + ) +} + function stableEntityUrlKey(urls: Iterable): string { return Array.from(new Set(urls)).sort().join(`\0`) } @@ -920,6 +1178,8 @@ function entityUrlsFromKey(key: string): Array { const TimelineRow = memo(function TimelineRow({ row, + previousRow, + nextRow, responseTimestamp, isInitialUserMessage, entityStopped, @@ -936,8 +1196,13 @@ const TimelineRow = memo(function TimelineRow({ onStopGeneration, onForkFromHere, onRunSearchTextChange, + onReplyToRow, + onReplyToToolCall, + onCommentTargetClick, }: { row: RenderTimelineRow + previousRow?: RenderTimelineRow + nextRow?: RenderTimelineRow responseTimestamp: number | null isInitialUserMessage: boolean entityStopped: boolean @@ -957,10 +1222,34 @@ const TimelineRow = memo(function TimelineRow({ * we just invoke. */ onForkFromHere?: ForkFromHereAction onRunSearchTextChange: (rowKey: string, text: string) => void + onReplyToRow?: () => void + onReplyToToolCall?: (toolCall: EntityTimelineToolCallItem) => void + onCommentTargetClick?: (target: CommentTarget) => void }): React.ReactElement { + void previousRow + + if (row.comment) { + return ( + onReplyToRow() : undefined} + onTargetClick={onCommentTargetClick} + /> + ) + } + if (row.inbox) { if (row.inbox.from_agent) { - return + return ( + + ) } const timestamp = Date.parse(row.inbox.timestamp) return ( @@ -980,6 +1269,7 @@ const TimelineRow = memo(function TimelineRow({ } stopPending={stopPending} onStop={onStopGeneration} + onReply={onReplyToRow} /> ) } @@ -993,12 +1283,13 @@ const TimelineRow = memo(function TimelineRow({ timestamp: Date.parse(row.wake.payload.timestamp), }} entityUrl={entityUrl} + onReply={onReplyToRow} /> ) } if (row.signal) { - return + return } if (row.error) { @@ -1016,6 +1307,7 @@ const TimelineRow = memo(function TimelineRow({ ? entityStatusByUrl.get(getManifestEntityUrl(row.manifest)!) : undefined } + onReply={onReplyToRow} /> ) } @@ -1029,12 +1321,15 @@ const TimelineRow = memo(function TimelineRow({ renderWidth={renderWidth} forkFromHere={onForkFromHere} onSearchTextChange={onRunSearchTextChange} + onReply={onReplyToRow} + onReplyToToolCall={onReplyToToolCall} /> ) }) export function EntityTimeline({ rows, + rowAdjacency, loading, error, entityStopped, @@ -1047,8 +1342,13 @@ export function EntityTimeline({ stopPending = false, onStopGeneration, forkFromHereByRunKey, + onReplyToRow, + focusTarget, + onFocusTargetHandled, + onCommentTargetClick, }: { - rows: Array + rows: Array + rowAdjacency?: Array loading: boolean error: string | null entityStopped: boolean @@ -1067,6 +1367,10 @@ export function EntityTimeline({ * the fork pointer and runs the fork → navigate flow. */ forkFromHereByRunKey?: Map + onReplyToRow?: (target: SelectedCommentTarget) => void + focusTarget?: CommentTarget | null + onFocusTargetHandled?: () => void + onCommentTargetClick?: (target: CommentTarget) => void }): React.ReactElement { const { entitiesCollection, runnersCollection, usersCollection } = useElectricAgents() @@ -1156,6 +1460,9 @@ export function EntityTimeline({ const spawnMarkerRef = useRef(null) const [showJumpToBottom, setShowJumpToBottom] = useState(false) const [showTopDivider, setShowTopDivider] = useState(false) + const [highlightedRowKey, setHighlightedRowKey] = useState( + null + ) const [runSearchTextByKey, setRunSearchTextByKey] = useState( () => new Map() ) @@ -1163,6 +1470,7 @@ export function EntityTimeline({ const lastMeasureAtRef = useRef(new Map()) const settledKeysRef = useRef(new Set()) const settleCheckTimerRef = useRef | null>(null) + const highlightTimerRef = useRef | null>(null) const handledScrollSignalRef = useRef(scrollToBottomSignal) const previousStreamingAgentKeyRef = useRef(null) const textColumnWidth = Math.max(0, contentWidth - CHAT_SURFACE_GUTTER) @@ -1332,7 +1640,14 @@ export function EntityTimeline({ estimateSize: (index) => cachedSizeMapRef.current.get( displayRows[index] ? renderRowKey(displayRows[index]!) : `` - ) ?? estimateRowHeight(displayRows[index], textColumnWidth), + ) ?? + estimateRowHeight( + displayRows[index], + textColumnWidth, + displayRows[index] + ? (rowAdjacency?.[index]?.nextRow ?? displayRows[index + 1]) + : undefined + ), getItemKey: (index) => displayRows[index] ? renderRowKey(displayRows[index]!) : index, gap: 0, @@ -1341,6 +1656,50 @@ export function EntityTimeline({ enabled: displayRows.length > 0, }) + const revealCommentTarget = useCallback( + (target: CommentTarget): boolean => { + const targetIndex = displayRows.findIndex((row) => + timelineRowMatchesCommentTarget(row, target) + ) + if (targetIndex < 0) return false + + const row = displayRows[targetIndex] + if (!row) return false + + const rowKey = renderRowKey(row) + isNearBottom.current = false + setShowJumpToBottom(true) + rowVirtualizer.scrollToIndex(targetIndex, { align: `center` }) + setHighlightedRowKey(rowKey) + + if (highlightTimerRef.current !== null) { + clearTimeout(highlightTimerRef.current) + } + highlightTimerRef.current = setTimeout(() => { + highlightTimerRef.current = null + setHighlightedRowKey((current) => (current === rowKey ? null : current)) + }, 1600) + + return true + }, + [displayRows, rowVirtualizer] + ) + + const handleCommentTargetClick = useCallback( + (target: CommentTarget) => { + if (revealCommentTarget(target)) return + onCommentTargetClick?.(target) + }, + [onCommentTargetClick, revealCommentTarget] + ) + + useEffect(() => { + if (!focusTarget) return + if (revealCommentTarget(focusTarget)) { + onFocusTargetHandled?.() + } + }, [focusTarget, onFocusTargetHandled, revealCommentTarget]) + const paneFindAdapter = useMemo(() => { const getHighlightRoot = (match: PaneFindMatch): HTMLElement | null => { if (!contentElement || !isTimelineFindMatch(match)) return null @@ -1628,6 +1987,9 @@ export function EntityTimeline({ if (settleCheckTimerRef.current !== null) { clearTimeout(settleCheckTimerRef.current) } + if (highlightTimerRef.current !== null) { + clearTimeout(highlightTimerRef.current) + } }, [] ) @@ -1727,6 +2089,12 @@ export function EntityTimeline({ {rowVirtualizer.getVirtualItems().map((virtualRow) => { const row = displayRows[virtualRow.index] const rowKey = renderRowKey(row) + const previousRow = + rowAdjacency?.[virtualRow.index]?.previousRow ?? + displayRows[virtualRow.index - 1] + const nextRow = + rowAdjacency?.[virtualRow.index]?.nextRow ?? + displayRows[virtualRow.index + 1] // Stable row key. The previous implementation appended // `:${contentWidth}` to force remount on every column-width @@ -1744,15 +2112,20 @@ export function EntityTimeline({ data-index={virtualRow.index} data-item-key={rowKey} data-pane-find-row-key={rowKey} + data-highlighted={ + highlightedRowKey === rowKey ? `true` : undefined + } className={styles.virtualRow} style={{ transform: `translateY(${virtualRow.start}px)`, - paddingBottom: timelineRowGap(row), + paddingBottom: timelineRowGap(row, nextRow), }} > { + const target = createReplyTargetForRow( + row, + runSearchTextByKey + ) + if (target) onReplyToRow(target) + } + : undefined + } + onReplyToToolCall={ + onReplyToRow && row.run + ? (toolCall) => + onReplyToRow( + createReplyTargetForToolCall(row, toolCall) + ) + : undefined + } />
diff --git a/packages/agents-server-ui/src/components/InlineEventCard.test.tsx b/packages/agents-server-ui/src/components/InlineEventCard.test.tsx new file mode 100644 index 0000000000..959ff16a53 --- /dev/null +++ b/packages/agents-server-ui/src/components/InlineEventCard.test.tsx @@ -0,0 +1,39 @@ +import { describe, expect, it } from 'vitest' +import { renderToStaticMarkup } from 'react-dom/server' +import { Wrench } from 'lucide-react' +import { InlineEventCard } from './InlineEventCard' + +function hasNestedButton(markup: string): boolean { + let buttonDepth = 0 + const buttonTag = /<\/?button\b[^>]*>/g + for (const match of markup.matchAll(buttonTag)) { + const tag = match[0] + if (tag.startsWith(` 0) return true + buttonDepth += 1 + } + return false +} + +describe(`InlineEventCard`, () => { + it(`keeps header actions outside the expandable header button`, () => { + const markup = renderToStaticMarkup( + Reply} + collapsible + defaultExpanded + > +
result
+
+ ) + + expect(hasNestedButton(markup)).toBe(false) + expect(markup).toContain(`aria-label="Collapse details"`) + }) +}) diff --git a/packages/agents-server-ui/src/components/InlineEventCard.tsx b/packages/agents-server-ui/src/components/InlineEventCard.tsx index 2956052305..9d7691ff8c 100644 --- a/packages/agents-server-ui/src/components/InlineEventCard.tsx +++ b/packages/agents-server-ui/src/components/InlineEventCard.tsx @@ -32,7 +32,17 @@ export function InlineEventCard({ const [expanded, setExpanded] = useState(defaultExpanded) const showBody = children !== undefined && (!expandable || expanded) const headerOnly = children === undefined - const headerContent = ( + const toggle = () => setExpanded((value) => !value) + const toggleIcon = expandable ? ( + + ) : null + const headerLeadContent = ( <> {summary ? {summary} : null} {badge} + + ) + const headerContent = ( + <> + {headerLeadContent} {actions ? ( {actions} ) : null} - {expandable ? ( - - ) : null} + {toggleIcon} ) @@ -67,10 +74,36 @@ export function InlineEventCard({ className={toolBlock.card} data-header-surface={headerOnly || headerSurface ? `true` : undefined} > - {expandable ? ( + {expandable && actions ? ( + + + {actions} + + + ) : expandable ? (
+ ) : isCommentMode && commentTarget ? ( +
+
+ + {replyPreviewLabel} + + {replyPreviewText && ( + + {replyPreviewText} + + )} +
+ +
) : null } attachments={ - imageAttachmentsEnabled ? ( + imageAttachmentsEnabled && !isCommentMode ? ( + + + )} ) } + +function formatReplyBannerLabel(target: SelectedCommentTarget | null): string { + const label = target?.snapshot.label.trim() + if (!label) return `Reply` + return `Reply to ${label.charAt(0).toLowerCase()}${label.slice(1)}` +} diff --git a/packages/agents-server-ui/src/components/ToolCallView.module.css b/packages/agents-server-ui/src/components/ToolCallView.module.css index 6424f7f4a4..69df58a083 100644 --- a/packages/agents-server-ui/src/components/ToolCallView.module.css +++ b/packages/agents-server-ui/src/components/ToolCallView.module.css @@ -18,6 +18,19 @@ white-space: pre-wrap; } +.actionButton { + flex-shrink: 0; + width: 22px; + height: 22px; + border-radius: 4px; + color: var(--ds-gray-11); +} + +.actionButton:hover { + background: var(--ds-gray-a5); + color: var(--ds-gray-12); +} + .diffBlock { padding: 0; white-space: pre; diff --git a/packages/agents-server-ui/src/components/ToolCallView.tsx b/packages/agents-server-ui/src/components/ToolCallView.tsx index 30ce7c7f55..d736e58c8c 100644 --- a/packages/agents-server-ui/src/components/ToolCallView.tsx +++ b/packages/agents-server-ui/src/components/ToolCallView.tsx @@ -1,6 +1,6 @@ -import { Wrench } from 'lucide-react' +import { Reply, Wrench } from 'lucide-react' import type { EntityTimelineContentItem } from '@electric-ax/agents-runtime/client' -import { Badge, Stack, Text } from '../ui' +import { Badge, Icon, IconButton, Stack, Text, Tooltip } from '../ui' import type { BadgeTone } from '../ui' import { InlineEventCard } from './InlineEventCard' import { InlineStatusBadge } from './InlineStatusBadge' @@ -257,9 +257,28 @@ function ToolBody({ item }: { item: ToolCallItem }): React.ReactElement { export function ToolCallView({ item, + onReply, }: { item: ToolCallItem + onReply?: () => void }): React.ReactElement { + const replyAction = onReply ? ( + + + + + + ) : undefined + // send_message: same container style but always expanded with the message text if (item.toolName === `send_message` && typeof item.args.text === `string`) { const badge = statusBadge(item) @@ -270,6 +289,7 @@ export function ToolCallView({ title="send_message" titleFont="mono" collapsible={false} + actions={replyAction} badge={ badge ? ( @@ -299,6 +319,8 @@ export function ToolCallView({ titleFont="mono" summary={summary} defaultExpanded={shouldDefaultExpand} + collapsible + actions={replyAction} badge={ badge ? ( {badge.label} diff --git a/packages/agents-server-ui/src/components/UserMessage.module.css b/packages/agents-server-ui/src/components/UserMessage.module.css index 2dc522a98f..c4f357f3fb 100644 --- a/packages/agents-server-ui/src/components/UserMessage.module.css +++ b/packages/agents-server-ui/src/components/UserMessage.module.css @@ -77,6 +77,26 @@ } } +.meta { + width: 100%; +} + +.metaActions { + margin-left: auto; + display: inline-flex; + align-items: center; + gap: 2px; +} + +.metaActionButton { + color: var(--ds-text-4); + opacity: 0.7; +} + +.metaActionButton:hover { + opacity: 1; +} + .body { font-size: var(--ds-chat-text); line-height: var(--ds-chat-text-lh); @@ -220,9 +240,12 @@ } .meta { - opacity: 0.4; /* Match the bubble's 12px horizontal padding so the timestamp/sender * row aligns with the agent text column rather than with the wider * bubble background. */ padding-inline: 12px; } + +.meta > :not(.metaActions) { + opacity: 0.4; +} diff --git a/packages/agents-server-ui/src/components/UserMessage.tsx b/packages/agents-server-ui/src/components/UserMessage.tsx index d9bce06bd0..7d85b5d688 100644 --- a/packages/agents-server-ui/src/components/UserMessage.tsx +++ b/packages/agents-server-ui/src/components/UserMessage.tsx @@ -4,10 +4,11 @@ import { Download, File as FileIcon, Image as ImageIcon, + Reply, Square, } from 'lucide-react' import type { EntityTimelineSection } from '@electric-ax/agents-runtime/client' -import { Icon, Stack, Text } from '../ui' +import { Icon, IconButton, Stack, Text, Tooltip } from '../ui' import { downloadAttachment, formatAttachmentSize } from '../lib/attachments' import { streamdownComponents, @@ -19,7 +20,7 @@ import { } from './AttachmentImagePreviewDialog' import { TimeText } from './TimeText' import styles from './UserMessage.module.css' -import { principalKeyFromInput } from '../lib/principals' +import { formatSender } from '../lib/principals' import type { ElectricUser } from '../lib/ElectricAgentsProvider' type UserMessageSection = Extract< @@ -49,6 +50,7 @@ export const UserMessage = memo(function UserMessage({ currentPrincipal, usersById, onStop, + onReply, }: { section: UserMessageSection attachments?: Array @@ -57,6 +59,7 @@ export const UserMessage = memo(function UserMessage({ currentPrincipal?: string usersById?: Map onStop?: () => void + onReply?: () => void }): React.ReactElement { const sender = formatSender(section.from, { currentPrincipal, usersById }) @@ -112,6 +115,23 @@ export const UserMessage = memo(function UserMessage({ )} + {onReply && ( + + + + + + + + )} ) @@ -220,43 +240,3 @@ function AttachmentPreview({ ) } - -function formatSender( - from: string | null | undefined, - options: { - currentPrincipal?: string - usersById?: Map - } = {} -): { - label: string - title?: string -} { - const key = principalKeyFromInput(from) - if (!key) return { label: from || `user` } - if (key === principalKeyFromInput(options.currentPrincipal)) { - return { label: `Me`, title: key } - } - const colon = key.indexOf(`:`) - if (colon <= 0) return { label: key, title: key } - const kind = key.slice(0, colon) - const id = key.slice(colon + 1) - if (kind === `user`) { - const user = options.usersById?.get(id) - const label = userDisplayName(user) - if (label) return { label, title: key } - } - return { - label: `${kind}:${formatPrincipalId(id)}`, - title: key, - } -} - -function formatPrincipalId(id: string): string { - if (id.length <= 18) return id - return `${id.slice(0, 8)}…${id.slice(-6)}` -} - -function userDisplayName(user: ElectricUser | undefined): string | null { - if (!user) return null - return user.display_name || user.email || null -} diff --git a/packages/agents-server-ui/src/components/toolBlock.module.css b/packages/agents-server-ui/src/components/toolBlock.module.css index dca3ea9c04..4ef2aea5cd 100644 --- a/packages/agents-server-ui/src/components/toolBlock.module.css +++ b/packages/agents-server-ui/src/components/toolBlock.module.css @@ -92,6 +92,59 @@ background: var(--ds-accent-a2); } +.headerWithActions { + padding: 0; + gap: 0; +} + +.headerContentToggle { + display: flex; + align-items: center; + gap: 8px; + min-width: 0; + flex: 1 1 auto; + align-self: stretch; + padding: 7px 0 7px 10px; + background: none; + border: none; + color: inherit; + font: inherit; + text-align: left; + cursor: pointer; + outline: none; + transition: background 0.08s ease; +} + +.headerContentToggle:hover, +.headerChevronButton:hover { + background: var(--ds-bg-hover); +} + +.headerContentToggle:focus-visible, +.headerChevronButton:focus-visible { + background: var(--ds-accent-a2); +} + +.headerChevronButton { + display: inline-flex; + align-items: center; + justify-content: center; + width: 32px; + align-self: stretch; + flex-shrink: 0; + padding: 0; + background: none; + border: none; + color: inherit; + cursor: pointer; + outline: none; + transition: background 0.08s ease; +} + +.headerChevronButton .toggleArrow { + margin-left: 0; +} + /* Chevron lives in a fixed-size 16px slot so the row height doesn't jiggle when expand/collapse swaps glyphs. */ .toggleArrow { @@ -130,6 +183,11 @@ margin-left: 0; } +.headerWithActions .headerActions { + margin-left: 0; + margin-right: 4px; +} + /* Tool name is the only mono token in the row — it's an identifier, so reading it as code helps. Summary + everything else stays in the body font for legibility at small sizes. */ diff --git a/packages/agents-server-ui/src/components/views/ChatView.tsx b/packages/agents-server-ui/src/components/views/ChatView.tsx index 0f64c6c239..ff0c9ad9ad 100644 --- a/packages/agents-server-ui/src/components/views/ChatView.tsx +++ b/packages/agents-server-ui/src/components/views/ChatView.tsx @@ -2,29 +2,35 @@ import { useCallback, useEffect, useMemo, useState } from 'react' import { useNavigate } from '@tanstack/react-router' import { eq, useLiveQuery } from '@tanstack/react-db' import { useEntityTimeline } from '../../hooks/useEntityTimeline' +import { useForkFromHere } from '../../hooks/useForkFromHere' import { EntityTimeline } from '../EntityTimeline' import { MessageInput } from '../MessageInput' import { EntityContextDrawer } from '../EntityContextDrawer' import { useElectricAgents } from '../../lib/ElectricAgentsProvider' +import { useWorkspace } from '../../hooks/useWorkspace' import { schemaModelSupportsImageInput } from '../../lib/modelCapabilities' +import { + buildCommentsTimeline, + COMMENT_FOCUS_PARAM, + commentFocusViewParams, + decodeCommentTargetParam, +} from '../../lib/comments' +import type { SelectedCommentTarget, TimelineRow } from '../../lib/comments' import { useEntityPermission, useEntityPermissions, type EntityPermission, } from '../../hooks/useEntityPermission' import type { ViewProps } from '../../lib/workspace/viewRegistry' -import type { EntityTimelineQueryRow } from '@electric-ax/agents-runtime/client' -import type { EventPointer } from '@electric-ax/agents-runtime' +import type { CommentTarget } from '@electric-ax/agents-runtime/client' import type { OptimisticInboxMessage } from '../../lib/sendMessage' import type { SlashCommandRow } from '@electric-ax/agents-runtime/client' -import type { ForkFromHereAction } from '../UserMessage' const CHAT_VIEW_PERMISSIONS: ReadonlyArray = [ `write`, `signal`, `fork`, ] - /** * The default view: chat / timeline + message composer. * @@ -40,6 +46,7 @@ export function ChatView({ entityStopped, isSpawning, tileId, + viewParams, }: ViewProps): React.ReactElement { // While `spawning`, the entity has no inbox yet — `connectUrl` is null // so `useEntityTimeline` doesn't try to subscribe and we render an empty @@ -54,6 +61,7 @@ export function ChatView({ entityStopped={entityStopped} isSpawning={isSpawning} tileId={tileId} + viewParams={viewParams} /> ) } @@ -74,7 +82,6 @@ export function ChatLogView({ const connectUrl = isSpawning ? null : entityUrl const { timelineRows, pendingInbox, entities, db, loading, error } = useEntityTimeline(baseUrl || null, connectUrl) - const { forkEntity } = useElectricAgents() const canFork = useEntityPermission(entity, `fork`) const navigate = useNavigate() const processedInboxKeys = useMemo( @@ -101,14 +108,14 @@ export function ChatLogView({ pendingInboxByKey, processedInboxKeys, ]) - const visibleRows = useMemo>(() => { + const visibleRows = useMemo>(() => { if (!projectedPendingMessage) return timelineRows return [ ...timelineRows, { $key: `pending-inbox:${projectedPendingMessage.key}`, inbox: projectedPendingMessage, - } as EntityTimelineQueryRow, + } as TimelineRow, ] }, [projectedPendingMessage, timelineRows]) @@ -118,41 +125,12 @@ export function ChatLogView({ } }, [error, navigate, isSpawning]) - const forkFromHereByRunKey = useMemo(() => { - if (!forkEntity || !connectUrl || !db) return undefined - const runOffsets = db.collections.runs.__electricRowOffsets - if (!runOffsets) return undefined - const map = new Map() - let anchor: { rowKey: string; pointer: EventPointer } | null = null - for (const row of visibleRows) { - if (row.run && row.run.status === `completed`) { - const pointer = runOffsets.get(row.run.key) - anchor = pointer ? { rowKey: row.$key, pointer } : null - } - if (row.inbox && anchor) { - const capturedAnchor = anchor.pointer - const capturedRunKey = anchor.rowKey - map.set( - capturedRunKey, - canFork - ? { - onFork: () => { - void forkEntity(connectUrl, { pointer: capturedAnchor }) - .then((res) => - navigate({ - to: `/entity/$`, - params: { _splat: res.url.replace(/^\//, ``) }, - }) - ) - .catch(() => {}) - }, - } - : { disabled: true } - ) - } - } - return map - }, [visibleRows, canFork, db, forkEntity, connectUrl, navigate]) + const forkFromHereByRunKey = useForkFromHere({ + rows: visibleRows, + db, + entityUrl: connectUrl, + canFork, + }) return ( (null) + const commentsTimeline = useMemo( + () => buildCommentsTimeline(timelineRows), + [timelineRows] + ) + + useEffect(() => { + if (error && !isSpawning) { + void navigate({ to: `/` }) + } + }, [error, navigate, isSpawning]) + + useEffect(() => { + setSelectedCommentTarget(null) + }, [connectUrl]) + + const openFullTimelineTarget = useCallback( + (target: CommentTarget) => { + helpers.setTileView(tileId, `chat`, { + viewParams: commentFocusViewParams(target), + }) + }, + [helpers, tileId] + ) + + return ( + <> + + setSelectedCommentTarget(null)} + onSend={() => setSentCommentSignal((value) => value + 1)} + /> + + ) +} + function GenericChatBody({ baseUrl, entityUrl, @@ -178,6 +234,7 @@ function GenericChatBody({ entityStopped, isSpawning, tileId, + viewParams, }: { baseUrl: string entityUrl: string | null @@ -185,6 +242,7 @@ function GenericChatBody({ entityStopped: boolean isSpawning: boolean tileId: string + viewParams?: ViewProps[`viewParams`] }): React.ReactElement { const { timelineRows, @@ -194,16 +252,22 @@ function GenericChatBody({ db, loading, error, - } = useEntityTimeline(baseUrl || null, entityUrl) - const { signalEntity, forkEntity, entityTypesCollection } = - useElectricAgents() + commentsEnabled, + } = useEntityTimeline(baseUrl || null, entityUrl, { + comments: viewParams?.comments !== `hidden`, + }) + const showComments = commentsEnabled && viewParams?.comments !== `hidden` + const { signalEntity, entityTypesCollection } = useElectricAgents() const permissions = useEntityPermissions(entity, CHAT_VIEW_PERMISSIONS) const canWrite = permissions.write const canSignal = permissions.signal const canFork = permissions.fork const navigate = useNavigate() + const { helpers } = useWorkspace() const [sentMessageSignal, setSentMessageSignal] = useState(0) const [stopPending, setStopPending] = useState(false) + const [selectedCommentTarget, setSelectedCommentTarget] = + useState(null) const { data: matchingEntityTypes = [] } = useLiveQuery( (query) => { if (!entityTypesCollection) return undefined @@ -235,7 +299,7 @@ function GenericChatBody({ ) const inlinePendingInbox = !entityStopped && !generationActive ? visiblePendingInbox[0] : undefined - const timelineRowsWithInlinePending = useMemo>( + const timelineRowsWithInlinePending = useMemo>( () => inlinePendingInbox ? [ @@ -243,11 +307,26 @@ function GenericChatBody({ { $key: `pending-inbox:${inlinePendingInbox.key}`, inbox: inlinePendingInbox, - } as EntityTimelineQueryRow, + } as TimelineRow, ] : timelineRows, [inlinePendingInbox, timelineRows] ) + const focusTarget = useMemo( + () => decodeCommentTargetParam(viewParams?.[COMMENT_FOCUS_PARAM]), + [viewParams] + ) + const clearFocusTarget = useCallback(() => { + if (!viewParams?.[COMMENT_FOCUS_PARAM]) return + const nextParams = { ...viewParams } + delete nextParams[COMMENT_FOCUS_PARAM] + helpers.setTileView(tileId, `chat`, { + viewParams: Object.keys(nextParams).length > 0 ? nextParams : undefined, + }) + }, [helpers, tileId, viewParams]) + useEffect(() => { + if (!showComments) setSelectedCommentTarget(null) + }, [showComments]) const drawerPendingInbox = inlinePendingInbox ? visiblePendingInbox.slice(1) : visiblePendingInbox @@ -296,56 +375,12 @@ function GenericChatBody({ }) }, [canSignal, entityUrl, generationActive, signalEntity, stopPending]) - // "Fork from here" anchor map. For each completed `runs` row that is - // followed by a user-message inbox row, the run pointer identifies - // "fork up to and including this response, drop everything after." - // Completed runs without a following prompt (usually the current end - // of the conversation) get no entry, preserving the old "historic - // prompt" affordance while moving it to the response footer. - const forkFromHereByRunKey = useMemo(() => { - if (!forkEntity || !entityUrl || !db) return undefined - const runOffsets = db.collections.runs.__electricRowOffsets - if (!runOffsets) return undefined - const map = new Map() - let anchor: { rowKey: string; pointer: EventPointer } | null = null - for (const row of timelineRowsWithInlinePending) { - if (row.run && row.run.status === `completed`) { - const pointer = runOffsets.get(row.run.key) - anchor = pointer ? { rowKey: row.$key, pointer } : null - } - if (row.inbox && anchor) { - const capturedAnchor = anchor.pointer - const capturedRunKey = anchor.rowKey - map.set( - capturedRunKey, - canFork - ? { - onFork: () => { - // forkEntity surfaces failures via a danger toast before - // rejecting, so the caller just needs to swallow the rejection. - void forkEntity(entityUrl, { pointer: capturedAnchor }) - .then((res) => - navigate({ - to: `/entity/$`, - params: { _splat: res.url.replace(/^\//, ``) }, - }) - ) - .catch(() => {}) - }, - } - : { disabled: true } - ) - } - } - return map - }, [ - timelineRowsWithInlinePending, - canFork, + const forkFromHereByRunKey = useForkFromHere({ + rows: timelineRowsWithInlinePending, db, - forkEntity, entityUrl, - navigate, - ]) + canFork, + }) return ( <> @@ -355,7 +390,7 @@ function GenericChatBody({ error={error} entityStopped={entityStopped} baseUrl={baseUrl} - cacheKey={`${baseUrl}${entityUrl ?? ``}`} + cacheKey={`${baseUrl}${entityUrl ?? ``}:comments:${showComments ? `shown` : `hidden`}`} tileId={tileId} entityUrl={entityUrl} entities={entities} @@ -363,6 +398,9 @@ function GenericChatBody({ onStopGeneration={stopGeneration} stopPending={stopPending} forkFromHereByRunKey={forkFromHereByRunKey} + onReplyToRow={showComments ? setSelectedCommentTarget : undefined} + focusTarget={focusTarget} + onFocusTargetHandled={clearFocusTarget} /> setSelectedCommentTarget(null)} drawer={(pending) => ( setMenuOpen(false) /** Wraps a handler so it dispatches and then closes the menu. */ @@ -272,6 +281,15 @@ export function SplitMenu({ void navigator.clipboard.writeText(url.toString()) } + const setChatCommentsVisible = (visible: boolean) => { + const nextParams = { ...(tile.viewParams ?? {}) } + if (visible) delete nextParams.comments + else nextParams.comments = `hidden` + helpers.setTileView(tile.id, tile.viewId, { + viewParams: Object.keys(nextParams).length > 0 ? nextParams : undefined, + }) + } + // The menu and the dialogs are siblings — keeping them in the same // portal subtree caused focus / unmount races (Base UI // tears the menu popup down on close, and any dialog mounted inside @@ -328,6 +346,39 @@ export function SplitMenu({ )} + {showDisplayOptions && ( + <> + + + + Display options + + + + + setChatCommentsVisible(!chatCommentsVisible) + )} + > + + Show comments + + + + + + + )} + helpers.splitTile(tile.id, `right`)}> Split right diff --git a/packages/agents-server-ui/src/hooks/useEntityTimeline.ts b/packages/agents-server-ui/src/hooks/useEntityTimeline.ts index eba430f748..8834485356 100644 --- a/packages/agents-server-ui/src/hooks/useEntityTimeline.ts +++ b/packages/agents-server-ui/src/hooks/useEntityTimeline.ts @@ -4,12 +4,14 @@ import { compareTimelineOrders, createEntityTimelineQuery, normalizeTimelineEntities, + TIMELINE_ORDER_FALLBACK, } from '@electric-ax/agents-runtime/client' import { coalesce, eq } from '@durable-streams/state/db' import { connectEntityStream } from '../lib/entity-connection' +import { createCommentsTimelineSource } from '../lib/comments' +import type { TimelineRow } from '../lib/comments' import type { EntityStreamDBWithActions, - EntityTimelineQueryRow, IncludesInboxMessage, IncludesEntity, Manifest, @@ -46,15 +48,25 @@ function isTimelineEntityManifest( export function useEntityTimeline( baseUrl: string | null, - entityUrl: string | null + entityUrl: string | null, + opts?: { + /** Merge the `comments` collection into the timeline. Defaults to true. */ + comments?: boolean + } ): { - timelineRows: Array + timelineRows: Array pendingInbox: Array entities: Array generationActive: boolean db: EntityStreamDBWithActions | null loading: boolean error: string | null + /** + * True when the entity's type declares the comments collection — the + * stream connection only registers `db.collections.comments` for types + * whose registration advertises the comments contract. + */ + commentsEnabled: boolean } { const [db, setDb] = useState(null) const [loading, setLoading] = useState(false) @@ -102,12 +114,20 @@ export function useEntityTimeline( } }, [baseUrl, entityUrl]) + const commentsEnabled = Boolean( + db && (db.collections as Record).comments + ) + const includeComments = commentsEnabled && (opts?.comments ?? true) const { data: timelineRows = [] } = useLiveQuery( (q) => { if (!db) return undefined - return createEntityTimelineQuery(db)(q) + return createEntityTimelineQuery(db, { + ...(includeComments && { + customSources: { comment: createCommentsTimelineSource(db) }, + }), + })(q) }, - [db] + [db, includeComments] ) const { data: manifests = [] } = useLiveQuery( (q) => @@ -125,7 +145,8 @@ export function useEntityTimeline( .from({ inbox: db.collections.inbox as any }) .where(({ inbox }: any) => eq(inbox.status, `pending`)) .orderBy( - ({ inbox }: any) => coalesce(inbox._timeline_order, `~`), + ({ inbox }: any) => + coalesce(inbox._timeline_order, TIMELINE_ORDER_FALLBACK), `asc` ) .orderBy(({ inbox }: any) => @@ -134,7 +155,7 @@ export function useEntityTimeline( : undefined, [db] ) - const typedTimelineRows = timelineRows as Array + const typedTimelineRows = timelineRows as Array const pendingInbox = useMemo( () => @@ -213,5 +234,6 @@ export function useEntityTimeline( db, loading, error, + commentsEnabled, } } diff --git a/packages/agents-server-ui/src/hooks/useForkFromHere.ts b/packages/agents-server-ui/src/hooks/useForkFromHere.ts new file mode 100644 index 0000000000..ac5240b105 --- /dev/null +++ b/packages/agents-server-ui/src/hooks/useForkFromHere.ts @@ -0,0 +1,67 @@ +import { useMemo } from 'react' +import { useNavigate } from '@tanstack/react-router' +import { useElectricAgents } from '../lib/ElectricAgentsProvider' +import type { EventPointer } from '@electric-ax/agents-runtime' +import type { EntityStreamDBWithActions } from '@electric-ax/agents-runtime/client' +import type { TimelineRow } from '../lib/comments' +import type { ForkFromHereAction } from '../components/UserMessage' + +/** + * "Fork from here" anchor map. For each completed `runs` row that is + * followed by a user-message inbox row, the run pointer identifies + * "fork up to and including this response, drop everything after." + * Completed runs without a following prompt (usually the current end + * of the conversation) get no entry, preserving the old "historic + * prompt" affordance while moving it to the response footer. + */ +export function useForkFromHere({ + rows, + db, + entityUrl, + canFork, +}: { + rows: Array + db: EntityStreamDBWithActions | null + entityUrl: string | null + canFork: boolean +}): Map | undefined { + const { forkEntity } = useElectricAgents() + const navigate = useNavigate() + return useMemo(() => { + if (!forkEntity || !entityUrl || !db) return undefined + const runOffsets = db.collections.runs.__electricRowOffsets + if (!runOffsets) return undefined + const map = new Map() + let anchor: { rowKey: string; pointer: EventPointer } | null = null + for (const row of rows) { + if (row.run && row.run.status === `completed`) { + const pointer = runOffsets.get(row.run.key) + anchor = pointer ? { rowKey: row.$key, pointer } : null + } + if (row.inbox && anchor) { + const capturedAnchor = anchor.pointer + const capturedRunKey = anchor.rowKey + map.set( + capturedRunKey, + canFork + ? { + onFork: () => { + // forkEntity surfaces failures via a danger toast before + // rejecting, so the caller just needs to swallow the rejection. + void forkEntity(entityUrl, { pointer: capturedAnchor }) + .then((res) => + navigate({ + to: `/entity/$`, + params: { _splat: res.url.replace(/^\//, ``) }, + }) + ) + .catch(() => {}) + }, + } + : { disabled: true } + ) + } + } + return map + }, [rows, canFork, db, forkEntity, entityUrl, navigate]) +} diff --git a/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx b/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx index 7b745ad582..255bc6e69c 100644 --- a/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx +++ b/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx @@ -6,6 +6,7 @@ import { appendPathToUrl } from '@electric-ax/agents-runtime/client' import type { EventPointer } from '@electric-ax/agents-runtime' import type { ReactNode } from 'react' import { serverFetch } from './auth-fetch' +import { registerWritableCollectionsLookup } from './comments-capability' import { entityApiUrl, entitySpawnApiUrl } from './entity-api' import { showToast } from './toast' @@ -104,6 +105,12 @@ const entityTypeSchema = z.object({ .nullable() .optional(), serve_endpoint: z.string().nullable(), + externally_writable_collections: z + .record( + z.object({ type: z.string(), contract: z.string().optional() }).partial() + ) + .nullable() + .optional(), created_at: z.string(), updated_at: z.string(), }) @@ -839,6 +846,20 @@ export function ElectricAgentsProvider({ }) }, [baseUrl]) + // Expose a synchronous type → writable-collections lookup for non-React + // gates (the view registry's `isAvailable`). + useEffect(() => { + if (!baseUrl) { + registerWritableCollectionsLookup(null) + return + } + const { entityTypes } = getOrCreateAppCollections(baseUrl) + registerWritableCollectionsLookup( + (typeName) => entityTypes.get(typeName)?.externally_writable_collections + ) + return () => registerWritableCollectionsLookup(null) + }, [baseUrl]) + const state = useMemo(() => { if (!baseUrl) { return { diff --git a/packages/agents-server-ui/src/lib/comments-capability.ts b/packages/agents-server-ui/src/lib/comments-capability.ts new file mode 100644 index 0000000000..93a861a5b0 --- /dev/null +++ b/packages/agents-server-ui/src/lib/comments-capability.ts @@ -0,0 +1,45 @@ +import { COMMENTS_CONTRACT } from '@electric-ax/agents-runtime/client' + +/** + * Shape of an entity type's `externally_writable_collections` registration + * as seen by the client (entity GET response / synced `entity_types` rows). + */ +export type ExternallyWritableCollections = + | Record + | null + | undefined + +/** + * True when the map advertises the canonical comments contract. Keyed on + * both the reserved `comments` name and the contract marker so an agent's + * unrelated writable collection can never light up the comment UI. + */ +export function supportsComments( + collections: ExternallyWritableCollections +): boolean { + return collections?.comments?.contract === COMMENTS_CONTRACT +} + +type WritableCollectionsLookup = ( + typeName: string +) => ExternallyWritableCollections + +let lookup: WritableCollectionsLookup | null = null + +/** + * Registered by `ElectricAgentsProvider` (backed by the synced + * `entity_types` collection) so non-React callers — the view registry's + * `isAvailable` gate — can resolve a type's writable collections. + */ +export function registerWritableCollectionsLookup( + fn: WritableCollectionsLookup | null +): void { + lookup = fn +} + +export function entityTypeSupportsComments( + typeName: string | null | undefined +): boolean { + if (!typeName || !lookup) return false + return supportsComments(lookup(typeName)) +} diff --git a/packages/agents-server-ui/src/lib/comments.test.ts b/packages/agents-server-ui/src/lib/comments.test.ts new file mode 100644 index 0000000000..d81ef03409 --- /dev/null +++ b/packages/agents-server-ui/src/lib/comments.test.ts @@ -0,0 +1,307 @@ +import { afterEach, describe, expect, it, vi } from 'vitest' +import { createCollection, localOnlyCollectionOptions } from '@tanstack/db' +import { compareTimelineOrders } from '@electric-ax/agents-runtime/client' +import { createLiveQueryCollection } from '@durable-streams/state/db' +import { registerActiveServerHeaders } from './auth-fetch' +import { + buildCommentsTimeline, + commentFocusViewParams, + createCommentsTimelineSource, + createSendCommentAction, + decodeCommentTargetParam, +} from './comments' +import type { + CommentSnapshot, + CommentTarget, + EntityStreamDBWithActions, +} from '@electric-ax/agents-runtime/client' +import type { OptimisticComment, TimelineRow } from './comments' + +function createCommentsDb() { + const comments = createCollection( + localOnlyCollectionOptions({ + id: `test-comments-${Math.random().toString(36).slice(2)}`, + getKey: (comment: OptimisticComment) => comment.key, + }) + ) + return { + db: { + collections: { + comments, + }, + } as unknown as EntityStreamDBWithActions, + comments, + } +} + +describe(`createCommentsTimelineSource`, () => { + it(`projects author from _principal, falling back to the optimistic from`, async () => { + const { db, comments } = createCommentsDb() + const liveQuery = createLiveQueryCollection({ + query: createCommentsTimelineSource(db), + startSync: true, + }) + await liveQuery.preload() + + comments.insert({ + key: `c-synced`, + _timeline_order: `00000002`, + body: `hello`, + timestamp: `2026-04-15T18:00:00.000Z`, + _principal: { url: `/principal/user%3Ajane`, kind: `user`, id: `jane` }, + } as any) + comments.insert({ + key: `c-optimistic`, + _timeline_order: `~pending:000000000001`, + body: `mine`, + from: `/principal/user%3Ame`, + } as any) + await new Promise((r) => setTimeout(r, 50)) + + const rows = new Map(liveQuery.toArray.map((row: any) => [row.key, row])) + expect(rows.get(`c-synced`)).toMatchObject({ + order: `00000002`, + body: `hello`, + from: `/principal/user%3Ajane`, + }) + expect(rows.get(`c-optimistic`)).toMatchObject({ + from: `/principal/user%3Ame`, + }) + }) +}) + +describe(`createSendCommentAction`, () => { + afterEach(() => { + vi.restoreAllMocks() + registerActiveServerHeaders(null) + }) + + it(`inserts optimistic comments at increasing pending timeline orders`, async () => { + const fetchMock = vi + .spyOn(globalThis, `fetch`) + .mockResolvedValue(new Response(`{}`, { status: 201 })) + const { db } = createCommentsDb() + const optimistic: Array = [] + const sendComment = createSendCommentAction({ + db, + baseUrl: `http://localhost:4437`, + entityUrl: `/chat/test`, + from: `/principal/user%3Ame`, + onOptimisticComment: (comment) => optimistic.push(comment), + }) + + const firstTx = sendComment({ body: `first` }) + const secondTx = sendComment({ body: `second` }) + await Promise.all([ + firstTx.isPersisted.promise, + secondTx.isPersisted.promise, + ]) + + expect(optimistic).toHaveLength(2) + expect(optimistic[0]?._principal?.url).toBe(`/principal/user%3Ame`) + expect(optimistic[0]?._timeline_order).toMatch(/^~pending:/) + expect(optimistic[1]?._timeline_order).toMatch(/^~pending:/) + expect( + compareTimelineOrders( + optimistic[0]!._timeline_order, + optimistic[1]!._timeline_order + ) + ).toBeLessThan(0) + expect(fetchMock).toHaveBeenCalledTimes(2) + }) + + it(`posts reply metadata with the same key as the optimistic row`, async () => { + const fetchMock = vi + .spyOn(globalThis, `fetch`) + .mockResolvedValue(new Response(`{}`, { status: 201 })) + const { db } = createCommentsDb() + const optimistic: Array = [] + const replyTo: CommentTarget = { + kind: `timeline`, + collection: `run`, + key: `run-1`, + } + const targetSnapshot: CommentSnapshot = { + label: `Assistant response`, + text: `Draft reply`, + collection: `run`, + } + const sendComment = createSendCommentAction({ + db, + baseUrl: `http://localhost:4437`, + entityUrl: `/chat/test`, + from: `/principal/user%3Ame`, + onOptimisticComment: (comment) => optimistic.push(comment), + }) + + const tx = sendComment({ + body: `looks right`, + replyTo, + targetSnapshot, + }) + await tx.isPersisted.promise + + expect(optimistic).toHaveLength(1) + expect(optimistic[0]).toMatchObject({ + body: `looks right`, + from: `/principal/user%3Ame`, + reply_to: replyTo, + target_snapshot: targetSnapshot, + }) + expect(fetchMock).toHaveBeenCalledTimes(1) + const [url, init] = fetchMock.mock.calls[0]! + expect(url).toBe( + `http://localhost:4437/_electric/entities/chat/test/collections/comments` + ) + expect(init?.method).toBe(`POST`) + expect(new Headers(init?.headers).get(`content-type`)).toBe( + `application/json` + ) + const parsed = JSON.parse(String(init?.body)) + expect(parsed.operation).toBe(`insert`) + expect(parsed.key).toBe(optimistic[0]!.key) + expect(parsed.value).toMatchObject({ + body: `looks right`, + reply_to: replyTo, + target_snapshot: targetSnapshot, + }) + expect(parsed.value).not.toHaveProperty(`from_principal`) + }) + + it(`rejects the persistence promise when the server rejects the comment`, async () => { + vi.spyOn(globalThis, `fetch`).mockResolvedValue( + new Response(JSON.stringify({ message: `No write access` }), { + status: 403, + }) + ) + const { db } = createCommentsDb() + const sendComment = createSendCommentAction({ + db, + baseUrl: `http://localhost:4437`, + entityUrl: `/chat/test`, + }) + + const tx = sendComment({ body: `blocked` }) + + await expect(tx.isPersisted.promise).rejects.toThrow(`No write access`) + }) +}) + +function commentRow( + key: string, + fromPrincipal = `/principal/user%3Ame` +): TimelineRow { + return { + $key: `comment:${key}`, + comment: { + key, + order: key, + body: key, + from: fromPrincipal, + timestamp: `2026-04-15T18:00:00.000Z`, + }, + } as TimelineRow +} + +function wakeRow(key: string): TimelineRow { + return { + $key: `wake:${key}`, + wake: { + key, + order: key, + payload: { + type: `wake`, + timestamp: `2026-04-15T18:00:00.000Z`, + source: `/chat/test`, + timeout: false, + changes: [], + }, + }, + } as TimelineRow +} + +function attachmentRow(key: string): TimelineRow { + return { + $key: `manifest:${key}`, + manifest: { + key, + kind: `attachment`, + id: key, + streamPath: `/chat/test/attachments/${key}`, + status: `complete`, + subject: { type: `inbox`, key: `msg-1` }, + mimeType: `text/plain`, + byteLength: 12, + createdAt: `2026-04-15T18:00:00.000Z`, + }, + } as TimelineRow +} + +describe(`buildCommentsTimeline`, () => { + it(`keeps comments in stream order while using full-timeline adjacency`, () => { + const first = commentRow(`first`) + const wake = wakeRow(`wake-1`) + const second = commentRow(`second`) + const third = commentRow(`third`) + const attachment = attachmentRow(`att-1`) + const fourth = commentRow(`fourth`) + + const timeline = buildCommentsTimeline([ + first, + wake, + second, + third, + attachment, + fourth, + ]) + + expect(timeline.rows.map((row) => row.comment?.key)).toEqual([ + `first`, + `second`, + `third`, + `fourth`, + ]) + expect(timeline.adjacency[0]).toEqual({ + previousRow: undefined, + nextRow: wake, + }) + expect(timeline.adjacency[1]).toEqual({ + previousRow: wake, + nextRow: third, + }) + expect(timeline.adjacency[2]).toEqual({ + previousRow: second, + nextRow: fourth, + }) + expect(timeline.adjacency[3]).toEqual({ + previousRow: third, + }) + }) +}) + +describe(`comment focus view params`, () => { + it(`round-trips timeline targets for comments-view navigation`, () => { + const target: CommentTarget = { + kind: `timeline`, + collection: `tool_call`, + key: `tool-call-1`, + run_id: `run-1`, + } + + const params = commentFocusViewParams(target) + + expect(decodeCommentTargetParam(params.focus)).toEqual(target) + }) + + it(`rejects invalid encoded target collections`, () => { + const encoded = encodeURIComponent( + JSON.stringify({ + kind: `timeline`, + collection: `unknown`, + key: `thing-1`, + }) + ) + + expect(decodeCommentTargetParam(encoded)).toBeNull() + }) +}) diff --git a/packages/agents-server-ui/src/lib/comments.ts b/packages/agents-server-ui/src/lib/comments.ts new file mode 100644 index 0000000000..768cb9919e --- /dev/null +++ b/packages/agents-server-ui/src/lib/comments.ts @@ -0,0 +1,294 @@ +import { createOptimisticAction } from '@tanstack/db' +import { coalesce } from '@durable-streams/state/db' +import { + createPendingTimelineOrder, + TIMELINE_ORDER_FALLBACK, +} from '@electric-ax/agents-runtime/client' +import { getActivePrincipal, serverFetch } from './auth-fetch' +import { isAttachmentManifest } from './attachments' +import { entityApiUrl } from './entity-api' +import type { + CommentSnapshot, + CommentTarget, + EntityStreamDBWithActions, + EntityTimelineCustomSource, + EntityTimelineQueryRow, +} from '@electric-ax/agents-runtime/client' + +/** + * Comments are a UI-level concern: the runtime timeline query knows nothing + * about them. `useEntityTimeline` merges them in by passing + * `createCommentsTimelineSource` as a custom timeline source. + */ +export type EntityTimelineCommentRow = { + key: string + order: string + body: string + from: string + timestamp: string + reply_to?: CommentTarget + target_snapshot?: CommentSnapshot + edited_at?: string + deleted_at?: string + deleted_by?: string +} + +export type CommentTimelineRow = { + $key: string + comment: EntityTimelineCommentRow + inbox?: undefined + run?: undefined + wake?: undefined + signal?: undefined + error?: undefined + manifest?: undefined +} + +/** Timeline row as consumed by UI views: runtime rows plus merged comment rows. */ +export type TimelineRow = + | (EntityTimelineQueryRow & { comment?: undefined }) + | CommentTimelineRow + +/** + * Timeline source for the `comments` collection, passed to the runtime's + * `createEntityTimelineQuery` via `customSources`. The author resolves from + * the `_principal` virtual column (server-stamped, spoof-proof), falling back + * to the optimistic row's `from`. + */ +export function createCommentsTimelineSource( + db: EntityStreamDBWithActions +): EntityTimelineCustomSource { + const comments = (db.collections as Record).comments + return (q) => + q.from({ comment: comments }).select(({ comment }: any) => ({ + order: coalesce(comment._timeline_order, TIMELINE_ORDER_FALLBACK), + key: comment.key, + body: comment.body, + from: coalesce(comment._principal?.url, comment.from, ``), + timestamp: coalesce(comment.timestamp, ``), + reply_to: comment.reply_to, + target_snapshot: comment.target_snapshot, + edited_at: comment.edited_at, + deleted_at: comment.deleted_at, + deleted_by: comment.deleted_by, + })) +} + +const OPTIMISTIC_COMMENT_ORDER_START = Number.MAX_SAFE_INTEGER - 2_000_000 + +let optimisticCommentOrderIndex = OPTIMISTIC_COMMENT_ORDER_START + +export type OptimisticComment = EntityTimelineCommentRow & { + _timeline_order: string + _principal?: { url: string } +} + +export type SelectedCommentTarget = { + target: CommentTarget + snapshot: CommentSnapshot +} + +type SendCommentInput = { + key: string + body: string + replyTo?: CommentTarget + targetSnapshot?: CommentSnapshot + pendingOrderIndex: number +} + +function nextOptimisticCommentOrderIndex(): number { + optimisticCommentOrderIndex += 1 + if (optimisticCommentOrderIndex >= Number.MAX_SAFE_INTEGER) { + optimisticCommentOrderIndex = OPTIMISTIC_COMMENT_ORDER_START + } + return optimisticCommentOrderIndex +} + +function createClientCommentKey(pendingOrderIndex: number): string { + return `client-comment-${Date.now()}-${pendingOrderIndex}` +} + +function readCommentError(status: number, body: string): Error { + let message = `Failed to post comment (${status})` + if (body) { + try { + const data = JSON.parse(body) as Record + if (data.message) message = String(data.message) + } catch { + message = body + } + } + return new Error(message) +} + +export function createSendCommentAction({ + db, + baseUrl, + entityUrl, + from, + onOptimisticComment, +}: { + db: EntityStreamDBWithActions + baseUrl: string + entityUrl: string + from?: string + onOptimisticComment?: (comment: OptimisticComment) => void +}) { + const action = createOptimisticAction({ + onMutate: ({ key, body, replyTo, targetSnapshot, pendingOrderIndex }) => { + const now = new Date().toISOString() + const principalUrl = from ?? getActivePrincipal() + const comment: OptimisticComment = { + key, + order: createPendingTimelineOrder(pendingOrderIndex), + _timeline_order: createPendingTimelineOrder(pendingOrderIndex), + body, + from: principalUrl, + _principal: { url: principalUrl }, + timestamp: now, + ...(replyTo ? { reply_to: replyTo } : {}), + ...(targetSnapshot ? { target_snapshot: targetSnapshot } : {}), + } + onOptimisticComment?.(comment) + db.collections.comments.insert(comment) + }, + mutationFn: async ({ key, body, replyTo, targetSnapshot }) => { + const now = new Date().toISOString() + const value = { + body, + timestamp: now, + ...(replyTo ? { reply_to: replyTo } : {}), + ...(targetSnapshot ? { target_snapshot: targetSnapshot } : {}), + } + const res = await serverFetch( + entityApiUrl(baseUrl, entityUrl, `/collections/comments`), + { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ operation: `insert`, key, value }), + } + ) + if (!res.ok) { + const body = await res.text().catch(() => ``) + throw readCommentError(res.status, body) + } + }, + }) + + return ({ + body, + replyTo, + targetSnapshot, + }: { + body: string + replyTo?: CommentTarget + targetSnapshot?: CommentSnapshot + }) => { + const pendingOrderIndex = nextOptimisticCommentOrderIndex() + return action({ + key: createClientCommentKey(pendingOrderIndex), + body, + replyTo, + targetSnapshot, + pendingOrderIndex, + }) + } +} + +export const COMMENT_FOCUS_PARAM = `focus` + +const COMMENT_TARGET_COLLECTIONS = new Set([ + `inbox`, + `run`, + `text`, + `tool_call`, + `wake`, + `signal`, + `manifest`, +]) + +export function encodeCommentTargetParam(target: CommentTarget): string { + return encodeURIComponent(JSON.stringify(target)) +} + +export function decodeCommentTargetParam( + value: string | undefined +): CommentTarget | null { + if (!value) return null + try { + const decoded = JSON.parse(decodeURIComponent(value)) as unknown + if (!isCommentTarget(decoded)) return null + return decoded + } catch { + return null + } +} + +export function commentFocusViewParams( + target: CommentTarget +): Record { + return { [COMMENT_FOCUS_PARAM]: encodeCommentTargetParam(target) } +} + +function isCommentTarget(value: unknown): value is CommentTarget { + if (!value || typeof value !== `object`) return false + const target = value as Partial + if (target.kind === `comment`) { + return typeof target.key === `string` + } + if (target.kind !== `timeline`) return false + const timelineTarget = target as Partial< + Extract + > + return ( + typeof timelineTarget.key === `string` && + typeof timelineTarget.collection === `string` && + COMMENT_TARGET_COLLECTIONS.has(timelineTarget.collection) && + (timelineTarget.run_id === undefined || + typeof timelineTarget.run_id === `string`) + ) +} + +export type TimelineRowAdjacency = { + previousRow?: TimelineRow + nextRow?: TimelineRow +} + +/** + * Comment-only timeline: keeps just the comment rows, recording each one's + * neighboring renderable rows so the view can show surrounding context. + */ +export function buildCommentsTimeline(timelineRows: Array): { + rows: Array + adjacency: Array +} { + const rows: Array = [] + const adjacency: Array = [] + let previousRenderableRow: TimelineRow | undefined + let pendingCommentAdjacencyIndex: number | null = null + + for (const row of timelineRows) { + if (isAttachmentManifest(row.manifest)) continue + + if (pendingCommentAdjacencyIndex !== null) { + const pendingAdjacency = adjacency[pendingCommentAdjacencyIndex]! + adjacency[pendingCommentAdjacencyIndex] = { + ...pendingAdjacency, + nextRow: row, + } + pendingCommentAdjacencyIndex = null + } + + if (row.comment) { + rows.push(row) + adjacency.push({ + previousRow: previousRenderableRow, + }) + pendingCommentAdjacencyIndex = adjacency.length - 1 + } + + previousRenderableRow = row + } + + return { rows, adjacency } +} diff --git a/packages/agents-server-ui/src/lib/entity-connection.custom-state.test.ts b/packages/agents-server-ui/src/lib/entity-connection.custom-state.test.ts new file mode 100644 index 0000000000..77e7b70d7d --- /dev/null +++ b/packages/agents-server-ui/src/lib/entity-connection.custom-state.test.ts @@ -0,0 +1,26 @@ +import { describe, expect, it } from 'vitest' +import { COMMENTS_CONTRACT } from '@electric-ax/agents-runtime/client' +import { uiCustomStateForEntity } from './entity-connection' + +describe(`uiCustomStateForEntity`, () => { + it(`registers comments when the type advertises the comments contract`, () => { + const customState = uiCustomStateForEntity({ + comments: { type: `state:comments`, contract: COMMENTS_CONTRACT }, + }) + expect(customState.comments).toBeDefined() + expect(customState.comments!.type).toBe(`state:comments`) + expect(customState.comments!.externallyWritable).toBe(true) + }) + + it(`registers nothing when the type declares no writable collections`, () => { + expect(uiCustomStateForEntity(undefined)).toEqual({}) + expect(uiCustomStateForEntity(null)).toEqual({}) + expect(uiCustomStateForEntity({})).toEqual({}) + }) + + it(`ignores a comments entry without the canonical contract`, () => { + expect( + uiCustomStateForEntity({ comments: { type: `state:comments` } }) + ).toEqual({}) + }) +}) diff --git a/packages/agents-server-ui/src/lib/entity-connection.test.ts b/packages/agents-server-ui/src/lib/entity-connection.test.ts index 701d26f308..5b5762b69b 100644 --- a/packages/agents-server-ui/src/lib/entity-connection.test.ts +++ b/packages/agents-server-ui/src/lib/entity-connection.test.ts @@ -8,14 +8,24 @@ vi.mock(`./auth-fetch`, () => ({ serverFetch: fetchMock, })) +const createEntityStreamDBMock = vi.fn((..._args: Array) => ({ + preload: preloadMock, + close: closeMock, + collections: {}, +})) + vi.mock(`@electric-ax/agents-runtime/client`, () => ({ appendPathToUrl: (baseUrl: string, path: string) => `${baseUrl.replace(/\/+$/, ``)}${path}`, - createEntityStreamDB: vi.fn(() => ({ - preload: preloadMock, - close: closeMock, - collections: {}, - })), + COMMENTS_CONTRACT: `comments/v1`, + commentsCollection: { + schema: {}, + type: `state:comments`, + primaryKey: `key`, + externallyWritable: true, + contract: `comments/v1`, + }, + createEntityStreamDB: createEntityStreamDBMock, })) describe(`connectEntityStream`, () => { @@ -53,4 +63,44 @@ describe(`connectEntityStream`, () => { ) expect(connection.db).toBeTruthy() }) + + it(`registers the comments collection only when the type advertises the contract`, async () => { + fetchMock.mockResolvedValueOnce( + new Response( + JSON.stringify({ + url: `/horton/abc`, + externally_writable_collections: { + comments: { type: `state:comments`, contract: `comments/v1` }, + }, + }), + { status: 200 } + ) + ) + preloadMock.mockResolvedValue(undefined) + + const { connectEntityStream } = await import(`./entity-connection`) + await connectEntityStream({ + baseUrl: `http://server`, + entityUrl: `/horton/abc`, + }) + + const customState = createEntityStreamDBMock.mock.calls.at(-1)![1] as any + expect(customState.comments).toMatchObject({ type: `state:comments` }) + }) + + it(`registers no comments collection when the type does not declare it`, async () => { + fetchMock.mockResolvedValueOnce( + new Response(JSON.stringify({ url: `/worker/abc` }), { status: 200 }) + ) + preloadMock.mockResolvedValue(undefined) + + const { connectEntityStream } = await import(`./entity-connection`) + await connectEntityStream({ + baseUrl: `http://server`, + entityUrl: `/worker/abc`, + }) + + const customState = createEntityStreamDBMock.mock.calls.at(-1)![1] as any + expect(customState.comments).toBeUndefined() + }) }) diff --git a/packages/agents-server-ui/src/lib/entity-connection.ts b/packages/agents-server-ui/src/lib/entity-connection.ts index 8e4b5a9032..35df0e3d2d 100644 --- a/packages/agents-server-ui/src/lib/entity-connection.ts +++ b/packages/agents-server-ui/src/lib/entity-connection.ts @@ -4,10 +4,14 @@ import { DurableStream } from '@durable-streams/client' import type { StreamOptions } from '@durable-streams/client' import { appendPathToUrl, + commentsCollection, createEntityStreamDB, type EntityStreamDBWithActions, } from '@electric-ax/agents-runtime/client' +import { supportsComments } from './comments-capability' +import type { ExternallyWritableCollections } from './comments-capability' + function getMainStreamPath(entityUrl: string): string { return `${entityUrl}/main` } @@ -20,6 +24,20 @@ function getMainStreamPath(entityUrl: string): string { */ export type UICustomState = Record +/** + * Collections the UI registers on the entity stream when the entity's + * type advertises the matching contract in its + * `externally_writable_collections` registration. `db.collections.comments` + * is therefore only defined for types that declared comments — its absence + * is what gates the comment affordances. Callers may overlay their own + * customState on top; explicitly-passed entries take precedence. + */ +export function uiCustomStateForEntity( + collections: ExternallyWritableCollections +): Record { + return supportsComments(collections) ? { comments: commentsCollection } : {} +} + let activeBaseUrl: string | null = null const ENTITY_METADATA_RETRY_DELAYS_MS = [250, 500, 1000, 2000] @@ -253,7 +271,9 @@ async function connectEntityStreamFresh(opts: { entityUrl, signal, }) - await res.body?.cancel() + const metadata = (await res.json().catch(() => null)) as { + externally_writable_collections?: ExternallyWritableCollections + } | null throwIfAborted(signal) const streamUrl = appendPathToUrl(baseUrl, getMainStreamPath(entityUrl)) const stream: EntityStreamHandle = isReactNativeRuntime() @@ -263,12 +283,13 @@ async function connectEntityStreamFresh(opts: { contentType: `application/json`, fetch: serverFetch, }) as unknown as EntityStreamHandle) - const db = createEntityStreamDB( - streamUrl, - customState as unknown as Parameters[1], - undefined, - { stream } - ) + const mergedCustomState: Parameters[1] = { + ...uiCustomStateForEntity(metadata?.externally_writable_collections), + ...(customState ?? {}), + } + const db = createEntityStreamDB(streamUrl, mergedCustomState, undefined, { + stream, + }) try { await preloadWithAbort(db, signal) } catch (err) { diff --git a/packages/agents-server-ui/src/lib/principals.ts b/packages/agents-server-ui/src/lib/principals.ts index b4995e73a3..4d37d67279 100644 --- a/packages/agents-server-ui/src/lib/principals.ts +++ b/packages/agents-server-ui/src/lib/principals.ts @@ -1,3 +1,5 @@ +import type { ElectricUser } from './ElectricAgentsProvider' + export function principalUrlFromKey(principalKey: string): string { const trimmed = principalKey.trim() return trimmed.startsWith(`/principal/`) @@ -39,3 +41,48 @@ export function userIdFromPrincipal( const key = principalKeyFromInput(value) return key?.startsWith(`user:`) ? key.slice(`user:`.length) : null } + +/** + * Display label for a message/comment sender principal: "Me" for the current + * principal, the user's display name when known, otherwise `kind:id` with a + * truncated id. `title` always carries the full principal key for tooltips. + */ +export function formatSender( + from: string | null | undefined, + options: { + currentPrincipal?: string + usersById?: Map + } = {} +): { + label: string + title?: string +} { + const key = principalKeyFromInput(from) + if (!key) return { label: from || `user` } + if (key === principalKeyFromInput(options.currentPrincipal)) { + return { label: `Me`, title: key } + } + const colon = key.indexOf(`:`) + if (colon <= 0) return { label: key, title: key } + const kind = key.slice(0, colon) + const id = key.slice(colon + 1) + if (kind === `user`) { + const user = options.usersById?.get(id) + const label = userDisplayName(user) + if (label) return { label, title: key } + } + return { + label: `${kind}:${formatPrincipalId(id)}`, + title: key, + } +} + +export function userDisplayName(user: ElectricUser | undefined): string | null { + if (!user) return null + return user.display_name || user.email || null +} + +function formatPrincipalId(id: string): string { + if (id.length <= 18) return id + return `${id.slice(0, 8)}…${id.slice(-6)}` +} diff --git a/packages/agents-server-ui/src/lib/workspace/registerViews.ts b/packages/agents-server-ui/src/lib/workspace/registerViews.ts index e887aeb9bb..20f29596ca 100644 --- a/packages/agents-server-ui/src/lib/workspace/registerViews.ts +++ b/packages/agents-server-ui/src/lib/workspace/registerViews.ts @@ -1,7 +1,8 @@ -import { Database, MessageSquare, SquarePen } from 'lucide-react' +import { Database, MessageCircle, MessageSquare, SquarePen } from 'lucide-react' +import { entityTypeSupportsComments } from '../comments-capability' import { registerView } from './viewRegistry' import { NEW_SESSION_VIEW_ID } from './types' -import { ChatView } from '../../components/views/ChatView' +import { ChatView, CommentsView } from '../../components/views/ChatView' import { StateExplorerView } from '../../components/views/StateExplorerView' import { NewSessionView } from '../../components/views/NewSessionView' @@ -23,6 +24,18 @@ registerView({ Component: ChatView, }) +registerView({ + kind: `entity`, + id: `comments`, + label: `Comments`, + icon: MessageCircle, + description: `Comment-only timeline`, + // Only entity types whose registration declares the comments collection + // get the comment-only view (and the rest of the comment affordances). + isAvailable: (entity) => entityTypeSupportsComments(entity.type), + Component: CommentsView, +}) + registerView({ kind: `entity`, id: `state-explorer`, diff --git a/packages/agents-server/drizzle/0016_entity_type_externally_writable_collections.sql b/packages/agents-server/drizzle/0016_entity_type_externally_writable_collections.sql new file mode 100644 index 0000000000..63871f3427 --- /dev/null +++ b/packages/agents-server/drizzle/0016_entity_type_externally_writable_collections.sql @@ -0,0 +1 @@ +ALTER TABLE "entity_types" ADD COLUMN "externally_writable_collections" jsonb; diff --git a/packages/agents-server/drizzle/meta/_journal.json b/packages/agents-server/drizzle/meta/_journal.json index 925f30d355..cd05bd62e3 100644 --- a/packages/agents-server/drizzle/meta/_journal.json +++ b/packages/agents-server/drizzle/meta/_journal.json @@ -113,6 +113,13 @@ "when": 1779728400000, "tag": "0015_pg_sync_bridges", "breakpoints": true + }, + { + "idx": 16, + "version": "7", + "when": 1781200000000, + "tag": "0016_entity_type_externally_writable_collections", + "breakpoints": true } ] } diff --git a/packages/agents-server/src/db/schema.ts b/packages/agents-server/src/db/schema.ts index 683b649e74..f65c77b1d5 100644 --- a/packages/agents-server/src/db/schema.ts +++ b/packages/agents-server/src/db/schema.ts @@ -14,6 +14,7 @@ import { timestamp, unique, } from 'drizzle-orm/pg-core' +import type { ExternallyWritableCollectionConfig } from '../electric-agents-types.js' export const entityTypes = pgTable( `entity_types`, @@ -24,6 +25,9 @@ export const entityTypes = pgTable( creationSchema: jsonb(`creation_schema`), inboxSchemas: jsonb(`inbox_schemas`), stateSchemas: jsonb(`state_schemas`), + externallyWritableCollections: jsonb( + `externally_writable_collections` + ).$type>(), slashCommands: jsonb(`slash_commands`), serveEndpoint: text(`serve_endpoint`), defaultDispatchPolicy: jsonb(`default_dispatch_policy`), diff --git a/packages/agents-server/src/electric-agents-types.ts b/packages/agents-server/src/electric-agents-types.ts index 60ffefec96..aca2e1de40 100644 --- a/packages/agents-server/src/electric-agents-types.ts +++ b/packages/agents-server/src/electric-agents-types.ts @@ -494,12 +494,24 @@ export function toPublicEntity( } } +/** Per-collection config making an entity-state collection externally writable via the router. */ +export interface ExternallyWritableCollectionConfig { + /** Durable-stream event type for this collection, e.g. `state:comments`. */ + type: string + /** Well-known contract this collection implements, e.g. `comments/v1`. */ + contract?: string +} + export interface ElectricAgentsEntityType { name: string description: string creation_schema?: Record inbox_schemas?: Record> state_schemas?: Record> + externally_writable_collections?: Record< + string, + ExternallyWritableCollectionConfig + > slash_commands?: Array serve_endpoint?: string default_dispatch_policy?: DispatchPolicy @@ -514,6 +526,10 @@ export interface RegisterEntityTypeRequest { creation_schema?: Record inbox_schemas?: Record> state_schemas?: Record> + externally_writable_collections?: Record< + string, + ExternallyWritableCollectionConfig + > slash_commands?: Array serve_endpoint?: string default_dispatch_policy?: DispatchPolicy diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index fbbb189028..945be61237 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -71,6 +71,7 @@ import type { SignalRequest, SignalResponse, TypedSpawnRequest, + ExternallyWritableCollectionConfig, } from './electric-agents-types.js' import type { EntityBridgeCoordinator } from './entity-bridge-manager.js' import type { Principal } from './principal.js' @@ -131,6 +132,23 @@ export interface CreateAttachmentRequest { meta?: Record } +export interface WriteCollectionPrincipal { + url: string + kind: string + id: string +} + +export interface WriteCollectionRequest { + operation: `insert` | `update` | `delete` + key?: string + value?: Record + principal: WriteCollectionPrincipal +} + +export interface WriteCollectionResult { + key: string +} + export interface ReadAttachmentResult { attachment: ManifestAttachmentEntry bytes: Uint8Array @@ -488,6 +506,7 @@ export class EntityManager { creation_schema: req.creation_schema, inbox_schemas: req.inbox_schemas, state_schemas: req.state_schemas, + externally_writable_collections: req.externally_writable_collections, slash_commands: req.slash_commands, serve_endpoint: req.serve_endpoint, default_dispatch_policy: defaultDispatchPolicy, @@ -2434,6 +2453,97 @@ export class EntityManager { } } + async writeCollection( + entityUrl: string, + collection: string, + req: WriteCollectionRequest + ): Promise { + const entity = await this.registry.getEntity(entityUrl) + if (!entity) { + throw new ElectricAgentsError(ErrCodeNotFound, `Entity not found`, 404) + } + + const { externallyWritableCollections } = + await this.getEffectiveSchemas(entity) + const config = externallyWritableCollections?.[collection] + if (!config) { + throw new ElectricAgentsError( + ErrCodeUnauthorized, + `Collection "${collection}" is not writable`, + 403 + ) + } + + if (rejectsNormalWrites(entity.status)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is not accepting writes`, + 409 + ) + } + if (this.isForkWorkLockedEntity(entityUrl)) { + this.assertEntityNotForkWorkLocked(entityUrl) + } + + if ( + req.operation !== `delete` && + (req.value === undefined || req.value === null) + ) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `value is required for ${req.operation}`, + 400 + ) + } + if (req.operation !== `insert` && !req.key) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `key is required for ${req.operation}`, + 400 + ) + } + + const key = req.key ?? `${collection}-${randomUUID()}` + + const event: Record = { + type: config.type, + key, + headers: { + operation: req.operation, + timestamp: new Date().toISOString(), + principal: req.principal, + }, + } + if (req.operation !== `delete`) { + event.value = req.value + } + + const validationError = await this.validateWriteEvent(entity, event) + if (validationError) { + throw new ElectricAgentsError( + validationError.code, + validationError.message, + validationError.status + ) + } + + const encoded = this.encodeChangeEvent(event) + try { + await this.streamClient.append(entity.streams.main, encoded) + } catch (err) { + if (this.isClosedStreamError(err)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is stopped`, + 409 + ) + } + throw err + } + + return { key } + } + async updateInboxMessage( entityUrl: string, key: string, @@ -3876,11 +3986,16 @@ export class EntityManager { private async getEffectiveSchemas(entity: ElectricAgentsEntity): Promise<{ inboxSchemas?: Record> stateSchemas?: Record> + externallyWritableCollections?: Record< + string, + ExternallyWritableCollectionConfig + > }> { if (!entity.type) { return { inboxSchemas: entity.inbox_schemas, stateSchemas: entity.state_schemas, + externallyWritableCollections: undefined, } } @@ -3893,6 +4008,8 @@ export class EntityManager { stateSchemas: latestType?.state_schemas ? { ...(entity.state_schemas ?? {}), ...latestType.state_schemas } : entity.state_schemas, + externallyWritableCollections: + latestType?.externally_writable_collections, } } diff --git a/packages/agents-server/src/entity-registry.ts b/packages/agents-server/src/entity-registry.ts index 28722afd13..37d17195e9 100644 --- a/packages/agents-server/src/entity-registry.ts +++ b/packages/agents-server/src/entity-registry.ts @@ -43,6 +43,7 @@ import type { EntityTypePermission, EntityTypePermissionGrant, PermissionSubjectKind, + ExternallyWritableCollectionConfig, } from './electric-agents-types.js' import type { EntityTags, PgSyncOptions } from '@electric-ax/agents-runtime' import type { Principal } from './principal.js' @@ -654,6 +655,8 @@ export class PostgresRegistry { creationSchema: et.creation_schema ?? null, inboxSchemas: et.inbox_schemas ?? null, stateSchemas: et.state_schemas ?? null, + externallyWritableCollections: + et.externally_writable_collections ?? null, slashCommands: et.slash_commands ?? null, serveEndpoint: et.serve_endpoint ?? null, defaultDispatchPolicy: et.default_dispatch_policy ?? null, @@ -668,6 +671,8 @@ export class PostgresRegistry { creationSchema: et.creation_schema ?? null, inboxSchemas: et.inbox_schemas ?? null, stateSchemas: et.state_schemas ?? null, + externallyWritableCollections: + et.externally_writable_collections ?? null, slashCommands: et.slash_commands ?? null, serveEndpoint: et.serve_endpoint ?? null, defaultDispatchPolicy: et.default_dispatch_policy ?? null, @@ -691,6 +696,8 @@ export class PostgresRegistry { creationSchema: et.creation_schema ?? null, inboxSchemas: et.inbox_schemas ?? null, stateSchemas: et.state_schemas ?? null, + externallyWritableCollections: + et.externally_writable_collections ?? null, slashCommands: et.slash_commands ?? null, serveEndpoint: et.serve_endpoint ?? null, defaultDispatchPolicy: et.default_dispatch_policy ?? null, @@ -733,6 +740,8 @@ export class PostgresRegistry { creationSchema: et.creation_schema ?? null, inboxSchemas: et.inbox_schemas ?? null, stateSchemas: et.state_schemas ?? null, + externallyWritableCollections: + et.externally_writable_collections ?? null, slashCommands: et.slash_commands ?? null, serveEndpoint: et.serve_endpoint ?? null, defaultDispatchPolicy: et.default_dispatch_policy ?? null, @@ -1957,6 +1966,11 @@ export class PostgresRegistry { state_schemas: row.stateSchemas as | Record> | undefined, + externally_writable_collections: + (row.externallyWritableCollections as Record< + string, + ExternallyWritableCollectionConfig + > | null) ?? undefined, slash_commands: (row.slashCommands as ElectricAgentsEntityType[`slash_commands`]) ?? undefined, diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index 7a6189643b..1bbce19f8a 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -149,6 +149,19 @@ const spawnBodySchema = Type.Object({ ), }) +const writeCollectionBodySchema = Type.Object( + { + operation: Type.Union([ + Type.Literal(`insert`), + Type.Literal(`update`), + Type.Literal(`delete`), + ]), + key: Type.Optional(Type.String()), + value: Type.Optional(Type.Record(Type.String(), Type.Unknown())), + }, + { additionalProperties: false } +) + const sendBodySchema = Type.Object({ payload: Type.Optional(Type.Unknown()), key: Type.Optional(Type.String()), @@ -328,6 +341,7 @@ const eventSourceSubscriptionBodySchema = Type.Object({ }) type SpawnBody = Static +type WriteCollectionBody = Static type SendBody = Static type InboxMessageBody = Static type ForkBody = Static @@ -408,6 +422,13 @@ entitiesRouter.post( withEntityPermission(`write`), sendEntity ) +entitiesRouter.post( + `/:type/:instanceId/collections/:collection`, + withExistingEntity, + withSchema(writeCollectionBodySchema), + withEntityPermission(`write`), + writeCollection +) entitiesRouter.post( `/:type/:instanceId/attachments`, withExistingEntity, @@ -1308,6 +1329,31 @@ async function sendEntity( return json(result) } +async function writeCollection( + request: AgentsRouteRequest, + ctx: TenantContext +): Promise { + const parsed = routeBody(request) + await ctx.entityManager.ensurePrincipal(ctx.principal) + const { entityUrl } = requireExistingEntityRoute(request) + const collection = request.params.collection + const result = await ctx.entityManager.writeCollection( + entityUrl, + collection, + { + operation: parsed.operation, + key: parsed.key, + value: parsed.value, + principal: { + url: ctx.principal.url, + kind: ctx.principal.kind, + id: ctx.principal.id, + }, + } + ) + return json(result, { status: parsed.operation === `insert` ? 201 : 200 }) +} + async function createAttachment( request: AgentsRouteRequest, ctx: TenantContext @@ -1473,8 +1519,21 @@ async function spawnEntity( ) } -function getEntity(request: AgentsRouteRequest): Response { - return json(toPublicEntity(requireExistingEntityRoute(request).entity)) +async function getEntity( + request: AgentsRouteRequest, + ctx: TenantContext +): Promise { + const { entity } = requireExistingEntityRoute(request) + const entityType = entity.type + ? await ctx.entityManager.registry.getEntityType(entity.type) + : null + return json({ + ...toPublicEntity(entity), + ...(entityType?.externally_writable_collections && { + externally_writable_collections: + entityType.externally_writable_collections, + }), + }) } function headEntity(): Response { diff --git a/packages/agents-server/src/routing/entity-types-router.ts b/packages/agents-server/src/routing/entity-types-router.ts index 76e7a50afe..61266a660c 100644 --- a/packages/agents-server/src/routing/entity-types-router.ts +++ b/packages/agents-server/src/routing/entity-types-router.ts @@ -4,6 +4,7 @@ import { Type, type Static } from '@sinclair/typebox' import { Router, json, status } from 'itty-router' +import { COMMENTS_CONTRACT } from '@electric-ax/agents-runtime' import { dispatchPolicySchema } from '../dispatch-policy-schema.js' import { ElectricAgentsError } from '../entity-manager.js' import { @@ -45,6 +46,20 @@ type PublicEntityTypeResponse = ElectricAgentsEntityType & { const jsonObjectSchema = Type.Record(Type.String(), Type.Unknown()) const schemaMapSchema = Type.Record(Type.String(), jsonObjectSchema) +// `principalColumn` is accepted and ignored: older runtimes still send it +// (the column is fixed to `_principal` now), and rejecting it would break +// registration during version skew. +const externallyWritableCollectionsSchema = Type.Record( + Type.String(), + Type.Object( + { + type: Type.String(), + contract: Type.Optional(Type.String()), + principalColumn: Type.Optional(Type.String()), + }, + { additionalProperties: false } + ) +) const slashCommandArgumentSchema = Type.Object( { name: Type.String(), @@ -93,6 +108,9 @@ const registerEntityTypeBodySchema = Type.Object( permission_grants: Type.Optional( Type.Array(typePermissionGrantInputSchema) ), + externally_writable_collections: Type.Optional( + externallyWritableCollectionsSchema + ), }, { additionalProperties: false } ) @@ -445,9 +463,37 @@ function parseExpiresAt(value: string | undefined): Date | undefined { return expiresAt } +/** + * The `comments` collection name is reserved for the canonical comments + * contract: the UI keys its comment affordances on it, so a divergent + * collection registered under that name (or the contract mounted under + * another name) would break that assumption silently. + */ +function validateExternallyWritableCollections( + collections: RegisterEntityTypeRequest[`externally_writable_collections`] +): void { + for (const [name, config] of Object.entries(collections ?? {})) { + if (name === `comments` && config.contract !== COMMENTS_CONTRACT) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `The externally-writable collection name "comments" is reserved for the "${COMMENTS_CONTRACT}" contract`, + 400 + ) + } + if (config.contract === COMMENTS_CONTRACT && name !== `comments`) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `The "${COMMENTS_CONTRACT}" contract must be registered under the collection name "comments"`, + 400 + ) + } + } +} + function normalizeEntityTypeRequest( parsed: RegisterEntityTypeBody | RegisterEntityTypeRequest ): RegisterEntityTypeRequest { + validateExternallyWritableCollections(parsed.externally_writable_collections) const serveEndpoint = rewriteLoopbackWebhookUrl(parsed.serve_endpoint) return { name: parsed.name ?? ``, @@ -465,6 +511,7 @@ function normalizeEntityTypeRequest( } as RegisterEntityTypeRequest[`default_dispatch_policy`]) : undefined), permission_grants: parsed.permission_grants, + externally_writable_collections: parsed.externally_writable_collections, } } diff --git a/packages/agents-server/test/electric-agents-manager-write-validation.test.ts b/packages/agents-server/test/electric-agents-manager-write-validation.test.ts index 4cab89f72b..72a75ee3ca 100644 --- a/packages/agents-server/test/electric-agents-manager-write-validation.test.ts +++ b/packages/agents-server/test/electric-agents-manager-write-validation.test.ts @@ -364,6 +364,155 @@ describe(`ElectricAgentsManager event source subscriptions`, () => { }) }) +function decodeAppendEvent(bytes: Uint8Array): Record { + return JSON.parse(new TextDecoder().decode(bytes)) as Record +} + +describe(`ElectricAgentsManager.writeCollection`, () => { + const principal = { + url: `/principal/user%3Aalice`, + kind: `user`, + id: `alice`, + } + + it(`stamps the principal header and appends a generic collection insert`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ streamClient: { append } }) + manager.registry.getEntity = vi.fn().mockResolvedValue({ + url: `/chat/session-1`, + type: `chat`, + status: `running`, + streams: { main: `/chat/session-1` }, + }) + manager.registry.getEntityType = vi.fn().mockResolvedValue({ + name: `chat`, + state_schemas: { 'state:comments': {} }, + externally_writable_collections: { + comments: { type: `state:comments` }, + }, + }) + + const result = await manager.writeCollection( + `/chat/session-1`, + `comments`, + { + operation: `insert`, + key: `c1`, + value: { body: `hi` }, + principal, + } + ) + + expect(result).toEqual({ key: `c1` }) + const event = decodeAppendEvent(append.mock.calls[0]?.[1]) + expect(event).toMatchObject({ + type: `state:comments`, + key: `c1`, + headers: { operation: `insert`, principal }, + value: { body: `hi` }, + }) + expect( + (event.value as Record).from_principal + ).toBeUndefined() + }) + + it(`rejects writes to a collection that is not writable`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ streamClient: { append } }) + manager.registry.getEntity = vi.fn().mockResolvedValue({ + url: `/chat/session-1`, + type: `chat`, + status: `running`, + streams: { main: `/chat/session-1` }, + }) + manager.registry.getEntityType = vi.fn().mockResolvedValue({ + name: `chat`, + state_schemas: { 'state:notes': {} }, + externally_writable_collections: {}, + }) + + await expect( + manager.writeCollection(`/chat/session-1`, `notes`, { + operation: `insert`, + value: { note: `x` }, + principal, + }) + ).rejects.toMatchObject({ status: 403 }) + expect(append).not.toHaveBeenCalled() + }) + + it(`rejects values that fail the collection schema with 422`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ streamClient: { append } }) + manager.registry.getEntity = vi.fn().mockResolvedValue({ + url: `/chat/session-1`, + type: `chat`, + status: `running`, + streams: { main: `/chat/session-1` }, + }) + manager.registry.getEntityType = vi.fn().mockResolvedValue({ + name: `chat`, + state_schemas: { + 'state:comments': { + type: `object`, + properties: { body: { type: `string` } }, + required: [`body`], + additionalProperties: false, + }, + }, + externally_writable_collections: { + comments: { type: `state:comments` }, + }, + }) + + await expect( + manager.writeCollection(`/chat/session-1`, `comments`, { + operation: `insert`, + key: `c1`, + value: { body: 42 }, + principal, + }) + ).rejects.toMatchObject({ status: 422 }) + expect(append).not.toHaveBeenCalled() + + await manager.writeCollection(`/chat/session-1`, `comments`, { + operation: `insert`, + key: `c2`, + value: { body: `valid` }, + principal, + }) + expect(append).toHaveBeenCalledTimes(1) + }) + + it(`rejects writes to a stopped entity with 409`, async () => { + const append = vi.fn() + const { manager } = createAttachmentManager({ streamClient: { append } }) + manager.registry.getEntity = vi.fn().mockResolvedValue({ + url: `/chat/session-1`, + type: `chat`, + status: `stopped`, + streams: { main: `/chat/session-1` }, + }) + manager.registry.getEntityType = vi.fn().mockResolvedValue({ + name: `chat`, + state_schemas: { 'state:comments': {} }, + externally_writable_collections: { + comments: { type: `state:comments` }, + }, + }) + + await expect( + manager.writeCollection(`/chat/session-1`, `comments`, { + operation: `insert`, + key: `c1`, + value: { body: `hi` }, + principal, + }) + ).rejects.toMatchObject({ status: 409 }) + expect(append).not.toHaveBeenCalled() + }) +}) + function createManifestManager(calls: Array) { return new EntityManager({ registry: { diff --git a/packages/agents-server/test/electric-agents-routes.test.ts b/packages/agents-server/test/electric-agents-routes.test.ts index b5a8f9ef57..020de60f8e 100644 --- a/packages/agents-server/test/electric-agents-routes.test.ts +++ b/packages/agents-server/test/electric-agents-routes.test.ts @@ -1498,3 +1498,123 @@ describe(`ElectricAgentsRoutes fork endpoint`, () => { expect(manager.forkSubtree).not.toHaveBeenCalled() }) }) + +describe(`ElectricAgentsRoutes collections endpoint`, () => { + it(`routes a collection write to the manager with the authenticated principal`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/test` }), + getEntityType: vi.fn(), + }, + ensurePrincipal: vi.fn().mockResolvedValue(undefined), + writeCollection: vi.fn().mockResolvedValue({ key: `c1` }), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/test/collections/comments`, + { operation: `insert`, key: `c1`, value: { body: `hi` } } + ) + + expect(response.status).toBe(201) + expect(await responseJson(response)).toEqual({ key: `c1` }) + expect(manager.writeCollection).toHaveBeenCalledWith( + `/chat/test`, + `comments`, + expect.objectContaining({ + operation: `insert`, + key: `c1`, + value: { body: `hi` }, + principal: expect.objectContaining({ url: expect.any(String) }), + }) + ) + }) +}) + +describe(`ElectricAgentsRoutes entity-type registration`, () => { + it(`persists externally_writable_collections on entity type registration`, async () => { + const registerEntityType = vi.fn().mockResolvedValue({ + name: `chat`, + description: `chat`, + revision: 1, + created_at: `t`, + updated_at: `t`, + externally_writable_collections: { + comments: { type: `state:comments`, contract: `comments/v1` }, + }, + }) + const manager = { + registry: { getEntityType: vi.fn() }, + registerEntityType, + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entity-types`, + { + name: `chat`, + description: `chat`, + externally_writable_collections: { + comments: { type: `state:comments`, contract: `comments/v1` }, + }, + } + ) + + expect(response.status).toBe(201) + expect(registerEntityType).toHaveBeenCalledWith( + expect.objectContaining({ + externally_writable_collections: { + comments: { type: `state:comments`, contract: `comments/v1` }, + }, + }) + ) + }) + + it(`rejects a writable "comments" collection without the canonical contract`, async () => { + const manager = { + registry: { getEntityType: vi.fn() }, + registerEntityType: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entity-types`, + { + name: `chat`, + description: `chat`, + externally_writable_collections: { + comments: { type: `state:comments` }, + }, + } + ) + + expect(response.status).toBe(400) + expect(manager.registerEntityType).not.toHaveBeenCalled() + }) + + it(`rejects the comments contract registered under another collection name`, async () => { + const manager = { + registry: { getEntityType: vi.fn() }, + registerEntityType: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entity-types`, + { + name: `chat`, + description: `chat`, + externally_writable_collections: { + feedback: { type: `state:feedback`, contract: `comments/v1` }, + }, + } + ) + + expect(response.status).toBe(400) + expect(manager.registerEntityType).not.toHaveBeenCalled() + }) +}) diff --git a/packages/agents-server/test/entity-type-registry.test.ts b/packages/agents-server/test/entity-type-registry.test.ts index 3fd1813343..1128053668 100644 --- a/packages/agents-server/test/entity-type-registry.test.ts +++ b/packages/agents-server/test/entity-type-registry.test.ts @@ -41,6 +41,22 @@ describe(`PostgresRegistry entity type registration`, () => { await client?.end() }, 120_000) + it(`persists and retrieves externally_writable_collections round-trip`, async () => { + const registry = new PostgresRegistry(db, `tenant-a`) + const externallyWritableCollections = { + comments: { type: `state:comments`, contract: `comments/v1` }, + } + await registry.createEntityType( + entityType({ + externally_writable_collections: externallyWritableCollections, + }) + ) + const result = await registry.getEntityType(`horton`) + expect(result?.externally_writable_collections).toEqual( + externallyWritableCollections + ) + }) + it(`upserts entity types against the tenant-scoped primary key`, async () => { const tenantA = new PostgresRegistry(db, `tenant-a`) const tenantB = new PostgresRegistry(db, `tenant-b`) diff --git a/packages/agents-server/test/test-backend.ts b/packages/agents-server/test/test-backend.ts index 382ac22101..49f767fa63 100644 --- a/packages/agents-server/test/test-backend.ts +++ b/packages/agents-server/test/test-backend.ts @@ -130,6 +130,7 @@ async function ensureExpectedSchema(postgresUrl: string): Promise { hasSharedStateLinks, hasEntityBridgePrincipal, hasLegacyEntitiesMetadata, + hasEntityTypeExternallyWritableCollections, ] = await Promise.all([ hasColumn(postgresUrl, `entities`, `tags`), hasColumn(postgresUrl, `entities`, `tags_index`), @@ -140,6 +141,7 @@ async function ensureExpectedSchema(postgresUrl: string): Promise { hasTable(postgresUrl, `shared_state_links`), hasColumn(postgresUrl, `entity_bridges`, `principal_url`), hasColumn(postgresUrl, `entities`, `metadata`), + hasColumn(postgresUrl, `entity_types`, `externally_writable_collections`), ]) return ( @@ -151,7 +153,8 @@ async function ensureExpectedSchema(postgresUrl: string): Promise { hasEntityEffectivePermissions && hasSharedStateLinks && hasEntityBridgePrincipal && - !hasLegacyEntitiesMetadata + !hasLegacyEntitiesMetadata && + hasEntityTypeExternallyWritableCollections ) } diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index 7c95f88b54..fa4c2b4119 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -20,6 +20,7 @@ import { buildSkillSlashCommands, createContextSkillLoader, completeWithLowCostModel, + commentsCollection, } from '@electric-ax/agents-runtime' import type { EntityRegistry, @@ -793,6 +794,9 @@ export function registerHorton( permission: `manage`, }, ], + state: { + comments: commentsCollection, + }, slashCommands: buildSkillSlashCommands(skillsRegistry), handler: assistantHandler, }) diff --git a/packages/agents/src/agents/worker.ts b/packages/agents/src/agents/worker.ts index da833feb01..ddb4a5bfc7 100644 --- a/packages/agents/src/agents/worker.ts +++ b/packages/agents/src/agents/worker.ts @@ -1,5 +1,5 @@ import { Type } from '@sinclair/typebox' -import { db } from '@electric-ax/agents-runtime' +import { db, commentsCollection } from '@electric-ax/agents-runtime' import { createBashTool, braveSearchTool, @@ -314,6 +314,9 @@ export function registerWorker( permission: `manage`, }, ], + state: { + comments: commentsCollection, + }, async handler(ctx) { const args = parseWorkerArgs(ctx.args) const readSet = new Set() diff --git a/packages/agents/test/comments-collection-registration.test.ts b/packages/agents/test/comments-collection-registration.test.ts new file mode 100644 index 0000000000..85a85fd9a8 --- /dev/null +++ b/packages/agents/test/comments-collection-registration.test.ts @@ -0,0 +1,39 @@ +import { describe, it, expect } from 'vitest' +import { createEntityRegistry } from '@electric-ax/agents-runtime' +import { registerHorton } from '../src/agents/horton' +import { registerWorker } from '../src/agents/worker' +import type { BuiltinModelCatalog } from '../src/model-catalog' + +const modelCatalog: BuiltinModelCatalog = { + defaultChoice: { + provider: `anthropic` as const, + id: `claude-sonnet-4-6`, + label: `Anthropic Claude Sonnet 4.6`, + value: `anthropic:claude-sonnet-4-6`, + reasoning: true, + input: [`text`, `image`], + }, + choices: [ + { + provider: `anthropic` as const, + id: `claude-sonnet-4-6`, + label: `Anthropic Claude Sonnet 4.6`, + value: `anthropic:claude-sonnet-4-6`, + reasoning: true, + input: [`text`, `image`], + }, + ], +} + +describe(`comments collection registration`, () => { + it(`declares comments as an externally-writable state collection on horton and worker`, () => { + const registry = createEntityRegistry() + registerHorton(registry, { workingDirectory: `/tmp`, modelCatalog }) + registerWorker(registry, { workingDirectory: `/tmp`, modelCatalog }) + + for (const name of [`horton`, `worker`]) { + const def = registry.get(name)?.definition as any + expect(def.state?.comments?.externallyWritable).toBe(true) + } + }) +})