Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/custom-collections.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@electric-ax/agents-runtime": patch
"@electric-ax/agents-server": patch
"@electric-ax/agents-server-ui": patch
---

Custom collections on entity streams.

Server: entity types declare the custom collections they accept via `custom_collection_schemas: { <name>: <JSON Schema> }` (same shape as `inbox_schemas` / `state_schemas`). `POST /_electric/entities/:type/:id/collections/:name` validates every write against the declared schema before appending; writes to a name the entity type did not declare are rejected with 422, and reserved built-in collection names (`BUILT_IN_COLLECTION_TYPES` exported from `@electric-ax/agents-runtime`) are rejected too so the runtime stays the sole writer of agent-managed collections.

Runtime: `createEntityTimelineQuery` accepts an optional `customSource` query-builder branch. Callers shape their custom-collection rows into the `EntityTimelineCustomRow` envelope (`{ collection, key, order, value }`) and the runtime splices them into the same unionAll/orderBy pipeline as the built-in timeline collections, so consumers don't have to client-side merge a second source. `EntityDefinition.customCollectionSchemas` lets typed entity definitions declare schemas that the runtime forwards through entity-type registration.

UI: session comments are wired on top of the generic mechanism — registered as a `comments` custom collection in the StreamDB customState, projected through the runtime's `customSource` branch, and reshaped back into a `comment` row variant for renderers. Comment surface (bubbles, reply affordances, comments-only tile view) is gated on whether the entity's type opted in to a `comment` custom collection schema.
1 change: 1 addition & 0 deletions packages/agents-runtime/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export { createEntityStreamDB } from './entity-stream-db'
export { passthrough } from './entity-schema'
export { createAgentsClient } from './agents-client'
export {
compareTimelineOrders,
Expand Down
5 changes: 5 additions & 0 deletions packages/agents-runtime/src/create-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,11 @@ export function createRuntimeRouter(
...(definition.inboxSchemas && {
inbox_schemas: mapSchemas(definition.inboxSchemas),
}),
...(definition.customCollectionSchemas && {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can already define custom schemas so we don't need this

custom_collection_schemas: mapSchemas(
definition.customCollectionSchemas
),
}),
...(definition.slashCommands && {
slash_commands: definition.slashCommands,
}),
Expand Down
4 changes: 4 additions & 0 deletions packages/agents-runtime/src/entity-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,10 @@ export const builtInCollections: EntityCollectionsDefinition = {
export const entityStateSchema: StateSchema<EntityCollectionsDefinition> =
createStateSchema(builtInCollections)

export const BUILT_IN_COLLECTION_TYPES: ReadonlySet<string> = new Set(
Object.values(builtInCollections).map((collection) => collection.type)
)

// ============================================================================
// Management Event Guard
// ============================================================================
Expand Down
98 changes: 77 additions & 21 deletions packages/agents-runtime/src/entity-timeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import { caseWhen } from '@tanstack/db'
import type {
Collection,
Context,
InitialQueryBuilder,
QueryBuilder,
} from '@tanstack/db'
Expand Down Expand Up @@ -176,8 +177,34 @@ export interface EntityTimelineData {

export type EntityTimelineInboxMode = `processed` | `all`

/**
* Envelope for custom-collection rows projected into the timeline. Each
* custom collection is an entity-type-declared shape (see
* `custom_collection_schemas` on the entity type); the caller pre-shapes
* its rows into `{ collection, key, order, value }` and the runtime
* weaves the source into the same unionAll/orderBy pipeline as the
* built-in timeline collections so there's a single ordered live query.
*/
export interface EntityTimelineCustomRow {
collection: string
key: string
order: TimelineOrder
value: Record<string, unknown>
}

/**
* Pre-shaped query-builder branch a caller supplies to project custom
* collections into the timeline. The builder's selected row shape must
* match `EntityTimelineCustomRow`. Multiple custom collections compose by
* unioning them client-side before passing the result as `customSource`.
*/
export type EntityTimelineCustomSource = QueryBuilder<
Context & { result: EntityTimelineCustomRow; hasResult: true }
>

export interface EntityTimelineQueryOptions {
inboxMode?: EntityTimelineInboxMode
customSource?: EntityTimelineCustomSource
}

export interface EntityTimelineTextChunk {
Expand Down Expand Up @@ -259,6 +286,7 @@ export type EntityTimelineQueryRow =
wake?: undefined
signal?: undefined
manifest?: undefined
custom?: undefined
}
| {
$key: string
Expand All @@ -267,6 +295,7 @@ export type EntityTimelineQueryRow =
wake?: undefined
signal?: undefined
manifest?: undefined
custom?: undefined
}
| {
$key: string
Expand All @@ -275,6 +304,7 @@ export type EntityTimelineQueryRow =
wake: EntityTimelineWakeRow
signal?: undefined
manifest?: undefined
custom?: undefined
}
| {
$key: string
Expand All @@ -283,6 +313,7 @@ export type EntityTimelineQueryRow =
wake?: undefined
signal: EntityTimelineSignalRow
manifest?: undefined
custom?: undefined
}
| {
$key: string
Expand All @@ -291,6 +322,16 @@ export type EntityTimelineQueryRow =
wake?: undefined
signal?: undefined
manifest: ManifestEntry
custom?: undefined
}
| {
$key: string
inbox?: undefined
run?: undefined
wake?: undefined
signal?: undefined
manifest?: undefined
custom: EntityTimelineCustomRow
}

function normalizeTimelineRun(run: IncludesRun): IncludesRun {
Expand Down Expand Up @@ -1376,36 +1417,51 @@ function buildEntityTimelineQuery(
})),
}))

const baseBranches = {
inbox: inboxSource,
run: runSource,
wake: wakeSource,
signal: signalSource,
manifest: db.collections.manifests,
}
const branches = opts.customSource
? { ...baseBranches, custom: opts.customSource }
: baseBranches

return q
.unionAll({
inbox: inboxSource,
run: runSource,
wake: wakeSource,
signal: signalSource,
manifest: db.collections.manifests,
})
.orderBy(({ inbox, run, wake, signal, manifest }) =>
.unionAll(branches)
.orderBy((args) =>
coalesce(
inbox.order,
run.order,
wake.order,
signal.order,
manifest._timeline_order,
args.inbox.order,
args.run.order,
args.wake.order,
args.signal.order,
args.manifest._timeline_order,
`custom` in args ? args.custom.order : undefined,
`~`
)
)
.orderBy(({ inbox, run, wake, signal, manifest }) =>
.orderBy((args) =>
coalesce(
caseWhen(inbox.key, `inbox`),
caseWhen(run.key, `run`),
caseWhen(wake.key, `wake`),
caseWhen(signal.key, `signal`),
caseWhen(manifest.key, `manifest`),
caseWhen(args.inbox.key, `inbox`),
caseWhen(args.run.key, `run`),
caseWhen(args.wake.key, `wake`),
caseWhen(args.signal.key, `signal`),
caseWhen(args.manifest.key, `manifest`),
`custom` in args ? caseWhen(args.custom.key, `custom`) : undefined,
``
)
)
.orderBy(({ inbox, run, wake, signal, manifest }) =>
coalesce(inbox.key, run.key, wake.key, signal.key, manifest.key, ``)
.orderBy((args) =>
coalesce(
args.inbox.key,
args.run.key,
args.wake.key,
args.signal.key,
args.manifest.key,
`custom` in args ? args.custom.key : undefined,
``
)
)
}

Expand Down
3 changes: 3 additions & 0 deletions packages/agents-runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export type {
} from './types'

export {
BUILT_IN_COLLECTION_TYPES,
ENTITY_COLLECTIONS,
builtInCollections,
entityStateSchema,
Expand Down Expand Up @@ -181,6 +182,8 @@ export {
export type {
EntityTimelineData,
EntityTimelineContentItem,
EntityTimelineCustomRow,
EntityTimelineCustomSource,
EntityTimelineInboxMode,
EntityTimelineQueryOptions,
EntityTimelineQueryRow,
Expand Down
4 changes: 4 additions & 0 deletions packages/agents-runtime/src/tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ export const entityMembershipRowSchema = z.object({
type_revision: z.number().int().nullable().optional(),
inbox_schemas: z.record(z.string(), z.unknown()).nullable().optional(),
state_schemas: z.record(z.string(), z.unknown()).nullable().optional(),
custom_collection_schemas: z
.record(z.string(), z.unknown())
.nullable()
.optional(),
created_at: z.number(),
updated_at: z.number(),
})
Expand Down
4 changes: 4 additions & 0 deletions packages/agents-runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,10 @@ export interface EntityDefinition<
creationSchema?: TCreationSchema
inboxSchemas?: Record<string, StandardJSONSchemaV1>
stateSchemas?: Record<string, StandardJSONSchemaV1>
customCollectionSchemas?: Record<
string,
StandardSchemaV1 | Readonly<Record<string, unknown>>
>
permissionGrants?: ReadonlyArray<EntityTypePermissionGrantDefinition>
slashCommands?: Array<SlashCommandDefinition>

Expand Down
Loading
Loading