From 819c7d338d35cd0169b3e5b663fcc0627bb7a3c5 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 08:51:10 -0600 Subject: [PATCH 1/3] Rename agent event sources to webhook sources --- ...ratovolt-webhook-source-rename-followup.md | 142 ++++++++++++++ packages/agents-runtime/CHANGELOG.md | 2 +- .../agents-runtime/src/context-factory.ts | 14 +- packages/agents-runtime/src/create-handler.ts | 18 +- packages/agents-runtime/src/index.ts | 44 ++--- packages/agents-runtime/src/process-wake.ts | 30 +-- .../src/runtime-server-client.ts | 60 +++--- packages/agents-runtime/src/tools.ts | 2 +- .../{event-sources.ts => webhook-sources.ts} | 178 +++++++++--------- packages/agents-runtime/src/types.ts | 18 +- .../{event-sources.ts => webhook-sources.ts} | 170 ++++++++--------- .../test/context-factory.test.ts | 12 +- .../test/helpers/context-test-helpers.ts | 6 +- .../agents-runtime/test/process-wake.test.ts | 24 +-- ...time-server-client-update-metadata.test.ts | 36 ++-- .../test/use-context-default.test.ts | 10 +- .../test/use-context-memoization.test.ts | 2 +- ...s.test.ts => webhook-source-tools.test.ts} | 96 +++++----- ...ources.test.ts => webhook-sources.test.ts} | 68 +++---- packages/agents-server/CHANGELOG.md | 2 +- packages/agents-server/src/entity-manager.ts | 14 +- packages/agents-server/src/index.ts | 12 +- packages/agents-server/src/routing/context.ts | 22 +-- .../src/routing/entities-router.ts | 56 +++--- .../src/routing/internal-router.ts | 14 +- packages/agents-server/src/server.ts | 16 +- ...ic-agents-manager-write-validation.test.ts | 12 +- ...ebhook-source-subscriptions-route.test.ts} | 114 +++++------ packages/agents/CHANGELOG.md | 2 +- packages/agents/src/agents/horton.ts | 14 +- packages/agents/src/bootstrap.ts | 4 +- .../agents/test/horton-system-prompt.test.ts | 16 +- .../test/horton-tool-composition.test.ts | 38 ++-- 33 files changed, 705 insertions(+), 563 deletions(-) create mode 100644 docs/stratovolt-webhook-source-rename-followup.md rename packages/agents-runtime/src/tools/{event-sources.ts => webhook-sources.ts} (52%) rename packages/agents-runtime/src/{event-sources.ts => webhook-sources.ts} (74%) rename packages/agents-runtime/test/{event-source-tools.test.ts => webhook-source-tools.test.ts} (63%) rename packages/agents-runtime/test/{event-sources.test.ts => webhook-sources.test.ts} (77%) rename packages/agents-server/test/{event-source-subscriptions-route.test.ts => webhook-source-subscriptions-route.test.ts} (60%) diff --git a/docs/stratovolt-webhook-source-rename-followup.md b/docs/stratovolt-webhook-source-rename-followup.md new file mode 100644 index 0000000000..c3f8eacdb1 --- /dev/null +++ b/docs/stratovolt-webhook-source-rename-followup.md @@ -0,0 +1,142 @@ +# Stratovolt follow-up: webhook source rename + +This Electric PR makes a breaking rename from the overly generic **event source** terminology to **webhook source** terminology for agent-visible webhook subscriptions. + +After this PR is merged, Stratovolt/cloud needs a companion update before it can consume the new agents runtime/server APIs. + +## Why this is needed + +Stratovolt currently mirrors agent-visible webhook ingress metadata into the cloud agents server as **event source** contracts. The Electric API/types/tools are being renamed to **webhook source** equivalents, including: + +- `EventSource*` → `WebhookSource*` +- `EventSourceCatalog` → `WebhookSourceCatalog` +- `sourceKey` → `webhookKey` +- `listEventSources()` → `listWebhookSources()` +- `getEventSource()` → `getWebhookSource()` +- `eventSources` tenant context option → `webhookSources` +- `ensureEventSourceWakeSource` → `ensureWebhookSourceWakeSource` +- `/_electric/event-sources` → `/_electric/webhook-sources` +- `/event-source-subscriptions/:id` → `/webhook-source-subscriptions/:id` +- `event-source:` manifest keys → `webhook-source:` +- `config.eventSource` → `config.webhookSource` +- `event_source_wake` → `webhook_source_wake` + +We are intentionally **not** adding backwards compatibility aliases in Electric for the old names/routes/manifests. + +## Stratovolt areas to update + +The following references were found in `/Users/kylemathews/programs/stratovolt` and should be updated in the companion PR. + +### Cloud agents server + +Primary integration points: + +- `packages/cloud-agents-server/src/event-source-registry.ts` + - Rename file/class if desired, e.g. `webhook-source-registry.ts` / `WebhookSourceRegistry`. + - Update imported agents types: + - `EventSourceBucket` → `WebhookSourceBucket` + - `EventSourceCatalog` → `WebhookSourceCatalog` + - `EventSourceContract` → `WebhookSourceContract` + - Update public methods: + - `listEventSources()` → `listWebhookSources()` + - `getEventSource()` → `getWebhookSource()` + - Update contract field projection: + - `sourceKey` → `webhookKey` + - Update logs from “event source shape” to “webhook source shape”. + +- `packages/cloud-agents-server/src/app.ts` + - Tenant context currently passes: + - `eventSources: options.eventSourceRegistry.forService(...)` + - `ensureEventSourceWakeSource: ...` + - Update to: + - `webhookSources: options.webhookSourceRegistry.forService(...)` + - `ensureWebhookSourceWakeSource: ...` + +- `packages/cloud-agents-server/src/main.ts` + - Rename registry import/instantiation if the registry file/class is renamed. + - Update config option names if changed. + +- `packages/cloud-agents-server/src/config.ts` + - Consider renaming: + - `eventSourceShapeUrl` → `webhookSourceShapeUrl` + - env var `AGENTS_EVENT_SOURCE_SHAPE_URL` → `AGENTS_WEBHOOK_SOURCE_SHAPE_URL` + +- `infra/cloud-agents-server/server.ts` + - Update environment variable wiring: + - `AGENTS_EVENT_SOURCE_SHAPE_URL` → `AGENTS_WEBHOOK_SOURCE_SHAPE_URL` + +- `scripts/generate-dev-env.ts` + - Update generated dev env var: + - `AGENTS_EVENT_SOURCE_SHAPE_URL` → `AGENTS_WEBHOOK_SOURCE_SHAPE_URL` + +### Admin API / contract + +The Admin API exposes the webhook ingress catalog shape consumed by the cloud agents server. + +- `packages/admin-api-contract/src/schemas/services.ts` + - Rename schema/types if we want public contract naming to match: + - `WebhookIngressEventSourceFilterConditionSchema` + - `WebhookIngressEventSourceFilterSchema` + - `WebhookIngressEventSourceFilter` + - Update contract field names from `sourceKey` to `webhookKey` if the API surface should match Electric. + +- `packages/admin-api-contract/src/schemas/index.ts` + - Update exports for renamed schemas/types. + +- `packages/admin-api/src/lib/webhook-ingresses.ts` + - Currently projects database `source_key` to `sourceKey`. + - Update output to `webhookKey` if the shape/API is renamed. + +- `packages/admin-api/src/routes/shapes/admin.ts` + - Shape alias currently uses `agent-event-sources`. + - Consider renaming to `agent-webhook-sources` and updating the description. + - If renaming the shape alias, update all consumers/tests/env vars accordingly. + +- `migrations/049-add-agent-event-source-fields.sql` + - Existing migration name/comments mention event source terminology. + - Usually do **not** rewrite applied migrations, but future migrations/docs/comments should use webhook source terminology. + +### Dashboard + +User-facing dashboard copy and form state still use “event source” language. + +- `packages/dashboard/src/routes/_dashboard/projects/$projectId/envs/$envId/svc/streams/$serviceId/webhooks/$endpointKey/edit.tsx` + - Rename form field/state if API changes: + - `sourceKey` → `webhookKey` + - Update copy: “event-source tools” → “webhook-source tools”. + +- `packages/dashboard/src/components/webhooks/flow/BucketRouteSidePanel.tsx` + - Update copy: “What agents see when they discover this bucket as an event source.” + +- `packages/dashboard/src/components/webhooks/flow/AgentSourcePanel.tsx` + - Update copy: “agent event-source discovery”. + +- `packages/dashboard/src/components/webhooks/bucketTemplateFormState.ts` + - Update comment reference from `electric/packages/agents-runtime/src/event-sources.ts` to `webhook-sources.ts`. + +### Tests + +Update tests after renaming the cloud/admin/dashboard surfaces: + +- `packages/cloud-agents-server/test/event-source-registry.test.ts` +- `packages/cloud-agents-server/test/app.test.ts` +- `packages/admin-api/test/routes/public/shapes.test.ts` +- `packages/admin-api/test/orpc/procedures/services/streams/webhook-ingresses.test.ts` + +## Suggested approach + +1. Wait until the Electric webhook-source rename PR is merged. +2. Update Stratovolt dependencies / workspace link to the merged Electric packages. +3. Rename the cloud registry and tenant context wiring to the new webhook-source API. +4. Decide whether the Admin API shape alias should also break from `agent-event-sources` to `agent-webhook-sources`. + - If yes, update env vars and tests together. + - If no, keep the shape alias as a cloud-internal compatibility detail but project rows into the new `WebhookSourceContract` shape for agents-server. +5. Update dashboard copy and form/API field names as appropriate. +6. Run targeted tests for: + - cloud agents server registry/app + - admin API webhook ingress shape + - dashboard webhook ingress editing + +## Notes + +Electric intentionally does not provide old route/type/manifest aliases for this rename. If Stratovolt has existing persisted dynamic subscriptions using `event-source:` manifest keys, they should be considered disposable and recreated after rollout. diff --git a/packages/agents-runtime/CHANGELOG.md b/packages/agents-runtime/CHANGELOG.md index 58de0dcff8..8038aa8cce 100644 --- a/packages/agents-runtime/CHANGELOG.md +++ b/packages/agents-runtime/CHANGELOG.md @@ -179,7 +179,7 @@ output? }` summed across the run's steps at section-build time, and ### Patch Changes -- 833a1cb: Add agent event source contracts and dynamic event source subscription tools. Agents can list active, agent-visible webhook-backed event sources, subscribe entities to resolved bucket streams with explicit lifetimes, and persist those subscriptions as manifest-backed wake registrations. Bucket params are validated against the advertised `paramsSchema` before a subscription is accepted. Horton now receives these tools through the built-in agents runtime by default. Runtime-managed event source wakes now hydrate matching webhook rows into the agent trigger message so tool-created subscriptions include the event payload that caused the wake. +- 833a1cb: Add agent webhook source contracts and dynamic webhook source subscription tools. Agents can list active, agent-visible webhook-backed webhook sources, subscribe entities to resolved bucket streams with explicit lifetimes, and persist those subscriptions as manifest-backed wake registrations. Bucket params are validated against the advertised `paramsSchema` before a subscription is accepted. Horton now receives these tools through the built-in agents runtime by default. Runtime-managed webhook source wakes now hydrate matching webhook rows into the agent trigger message so tool-created subscriptions include the event payload that caused the wake. - 833a1cb: Add `webhook(endpointKey, { bucket })` observation sources for webhook ingress streams, including deterministic stream path generation, event schema, default wake registration, and observe-time stream creation before preload. ## 0.3.3 diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index fb387b9ab8..52c4af7947 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -16,7 +16,7 @@ import { createContextTools } from './tools/context-tools' import { CACHE_TIERS } from './types' import { composeToolsWithProviders } from './tool-providers' import { validateSlashCommandDefinitions } from './composer-input' -import type { HydratedEventSourceWake } from './event-sources' +import type { HydratedWebhookSourceWake } from './webhook-sources' import type { ChangeEvent } from '@durable-streams/state' import type { Sandbox } from './sandbox/types' import type { @@ -96,7 +96,7 @@ export interface HandlerContextConfig { payload?: unknown }) => void | Promise ) => void - hydratedEventSourceWake?: HydratedEventSourceWake | null + hydratedWebhookSourceWake?: HydratedWebhookSourceWake | null doObserve: ( source: ObservationSource, wake?: Wake @@ -193,7 +193,7 @@ function getTriggerMessageText( wakeEvent: WakeEvent, events: Array, wakeOffset: string, - hydratedEventSourceWake?: HydratedEventSourceWake | null + hydratedWebhookSourceWake?: HydratedWebhookSourceWake | null ): string { if (wakeEvent.type === `inbox`) { let latestPayload: unknown = wakeEvent.payload @@ -227,8 +227,8 @@ function getTriggerMessageText( } if (wakeEvent.type === `wake` && typeof wakeEvent.source === `string`) { - if (hydratedEventSourceWake) { - return asMessageText(hydratedEventSourceWake) + if (hydratedWebhookSourceWake) { + return asMessageText(hydratedWebhookSourceWake) } const cronPayload = getCronScheduleTriggerPayload(db, wakeEvent.source) @@ -731,7 +731,7 @@ export function createHandlerContext( config.wakeEvent, config.events, config.wakeOffset, - config.hydratedEventSourceWake + config.hydratedWebhookSourceWake ) const effectiveInput = input ?? messageText @@ -770,7 +770,7 @@ export function createHandlerContext( const latestMessageRole = messages.at(-1)?.role const runInput = input !== undefined || - config.hydratedEventSourceWake != null || + config.hydratedWebhookSourceWake != null || latestMessageRole !== `user` ? effectiveInput : undefined diff --git a/packages/agents-runtime/src/create-handler.ts b/packages/agents-runtime/src/create-handler.ts index c4008862d4..6824f4e8ac 100644 --- a/packages/agents-runtime/src/create-handler.ts +++ b/packages/agents-runtime/src/create-handler.ts @@ -26,10 +26,10 @@ import type { import type { ChangeEvent } from '@durable-streams/state' import type { DispatchPolicy } from './runtime-server-client' import type { - EventSourceContract, - EventSourceSubscription, - EventSourceSubscriptionInput, -} from './event-sources' + WebhookSourceContract, + WebhookSourceSubscription, + WebhookSourceSubscriptionInput, +} from './webhook-sources' export interface RuntimeRouterConfig { /** Base URL of the durable streams server (e.g. http://localhost:4200) */ @@ -91,11 +91,11 @@ export interface RuntimeRouterConfig { messageType?: string }) => Promise<{ txid: string }> deleteSchedule: (opts: { id: string }) => Promise<{ txid: string }> - listEventSources: () => Promise> - subscribeToEventSource: ( - opts: EventSourceSubscriptionInput - ) => Promise<{ txid: string; subscription: EventSourceSubscription }> - unsubscribeFromEventSource: (opts: { + listWebhookSources: () => Promise> + subscribeToWebhookSource: ( + opts: WebhookSourceSubscriptionInput + ) => Promise<{ txid: string; subscription: WebhookSourceSubscription }> + unsubscribeFromWebhookSource: (opts: { id: string }) => Promise<{ txid: string }> }) => Array | Promise> diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 5e65e45711..946b12fb18 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -259,30 +259,30 @@ export type { SendEntityMessageOptions, } from './runtime-server-client' export { - buildEventSourceManifestEntry, - buildHydratedEventSourceWake, - buildEventSourceSubscriptionId, - defaultEventSourceSubscriptionLifetime, - eventSourceWakeInfoFromManifests, - eventSourceSubscriptionManifestKey, - renderEventSourceBucketPath, - resolveEventSourceSubscription, -} from './event-sources' + buildWebhookSourceManifestEntry, + buildHydratedWebhookSourceWake, + buildWebhookSourceSubscriptionId, + defaultWebhookSourceSubscriptionLifetime, + webhookSourceWakeInfoFromManifests, + webhookSourceSubscriptionManifestKey, + renderWebhookSourceBucketPath, + resolveWebhookSourceSubscription, +} from './webhook-sources' export type { - EventSourceBucket, - EventSourceContract, - EventSourceFilter, - EventSourceFilterCondition, - EventSourceStatus, - EventSourceSubscription, - EventSourceSubscriptionInput, - EventSourceType, - EventSourceWakeChange, - EventSourceWakeInfo, - HydratedEventSourceWake, - ResolvedEventSourceSubscription, + WebhookSourceBucket, + WebhookSourceContract, + WebhookSourceFilter, + WebhookSourceFilterCondition, + WebhookSourceStatus, + WebhookSourceSubscription, + WebhookSourceSubscriptionInput, + WebhookSourceType, + WebhookSourceWakeChange, + WebhookSourceWakeInfo, + HydratedWebhookSourceWake, + ResolvedWebhookSourceSubscription, SubscriptionLifetime, -} from './event-sources' +} from './webhook-sources' export { createAgentsClient } from './agents-client' export type { AgentsClient, AgentsClientConfig } from './agents-client' diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index c419a3114a..9cac59ecc1 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -18,11 +18,11 @@ import { appendPathToUrl } from './url' import { manifestChildKey } from './manifest-helpers' import { ModelProviderError } from './model-provider-error' import { - buildHydratedEventSourceWake, - eventSourceWakeInfoFromManifests, -} from './event-sources' + buildHydratedWebhookSourceWake, + webhookSourceWakeInfoFromManifests, +} from './webhook-sources' import { webhookObservationCollections } from './observation-sources' -import type { HydratedEventSourceWake } from './event-sources' +import type { HydratedWebhookSourceWake } from './webhook-sources' import { SandboxError } from './sandbox/types' import type { Sandbox } from './sandbox/types' import type { @@ -1641,9 +1641,9 @@ export async function processWake( }) setupCtx.restorePersistedSharedStateHandles() - const hydrateCurrentEventSourceWake = - async (): Promise => { - const info = eventSourceWakeInfoFromManifests({ + const hydrateCurrentWebhookSourceWake = + async (): Promise => { + const info = webhookSourceWakeInfoFromManifests({ wakeEvent: currentWakeEvent, manifests: db.collections.manifests.toArray, }) @@ -1661,10 +1661,10 @@ export async function processWake( ) const rows = (sourceDb.collections.events?.toArray ?? []) as unknown as Array - return buildHydratedEventSourceWake(info, rows) + return buildHydratedWebhookSourceWake(info, rows) } catch (error) { log.warn( - `failed to hydrate event source wake source=${info.sourceUrl}: ${error instanceof Error ? error.message : String(error)}` + `failed to hydrate webhook source wake source=${info.sourceUrl}: ${error instanceof Error ? error.message : String(error)}` ) return null } @@ -2077,14 +2077,14 @@ export async function processWake( entityUrl, ...opts, }), - listEventSources: () => serverClient.listEventSources(), - subscribeToEventSource: (opts) => - serverClient.subscribeToEventSource({ + listWebhookSources: () => serverClient.listWebhookSources(), + subscribeToWebhookSource: (opts) => + serverClient.subscribeToWebhookSource({ entityUrl, ...opts, }), - unsubscribeFromEventSource: (opts) => - serverClient.unsubscribeFromEventSource({ + unsubscribeFromWebhookSource: (opts) => + serverClient.unsubscribeFromWebhookSource({ entityUrl, ...opts, }), @@ -2129,7 +2129,7 @@ export async function processWake( registerSignalHandler: (handler) => { activeSignalHandler = handler }, - hydratedEventSourceWake: await hydrateCurrentEventSourceWake(), + hydratedWebhookSourceWake: await hydrateCurrentWebhookSourceWake(), doObserve, doSpawn, doFork, diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index 871840a2fd..4fb8dbb828 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -4,7 +4,7 @@ import type { } from './observation-sources' import type { EntityTags, TagOperation } from './tags' import { appendPathToUrl } from './url' -import { buildEventSourceSubscriptionId } from './event-sources' +import { buildWebhookSourceSubscriptionId } from './webhook-sources' import type { AttachmentCreateInput, ClaimTokenHeader, @@ -13,10 +13,10 @@ import type { } from './types' import type { EntitySignal } from './entity-schema' import type { - EventSourceContract, - EventSourceSubscription, - EventSourceSubscriptionInput, -} from './event-sources' + WebhookSourceContract, + WebhookSourceSubscription, + WebhookSourceSubscriptionInput, +} from './webhook-sources' export type { EntitySignal } from './entity-schema' const ELECTRIC_PRINCIPAL_HEADER = `electric-principal` @@ -183,11 +183,11 @@ export interface RuntimeServerClient { streamUrl: string sourceRef: string }> - listEventSources: () => Promise> - subscribeToEventSource: ( - options: EventSourceSubscriptionInput & { entityUrl: string } - ) => Promise<{ txid: string; subscription: EventSourceSubscription }> - unsubscribeFromEventSource: (options: { + listWebhookSources: () => Promise> + subscribeToWebhookSource: ( + options: WebhookSourceSubscriptionInput & { entityUrl: string } + ) => Promise<{ txid: string; subscription: WebhookSourceSubscription }> + unsubscribeFromWebhookSource: (options: { entityUrl: string id: string }) => Promise<{ txid: string }> @@ -711,39 +711,39 @@ export function createRuntimeServerClient( return (await response.json()) as { streamUrl: string; sourceRef: string } } - const listEventSources = async (): Promise> => { - const response = await request(`/_electric/event-sources`, { + const listWebhookSources = async (): Promise> => { + const response = await request(`/_electric/webhook-sources`, { method: `GET`, }) if (!response.ok) { throw new Error( - `listEventSources failed (${response.status}): ${await readErrorText(response)}` + `listWebhookSources failed (${response.status}): ${await readErrorText(response)}` ) } const data = (await response.json()) as { - eventSources?: Array + webhookSources?: Array } - return data.eventSources ?? [] + return data.webhookSources ?? [] } - const subscribeToEventSource = async ( - options: EventSourceSubscriptionInput & { entityUrl: string } - ): Promise<{ txid: string; subscription: EventSourceSubscription }> => { + const subscribeToWebhookSource = async ( + options: WebhookSourceSubscriptionInput & { entityUrl: string } + ): Promise<{ txid: string; subscription: WebhookSourceSubscription }> => { const id = options.id ?? - buildEventSourceSubscriptionId({ - sourceKey: options.sourceKey, + buildWebhookSourceSubscriptionId({ + webhookKey: options.webhookKey, bucketKey: options.bucketKey, params: options.params, filterKey: options.filterKey, }) const response = await request( - `${entityRpcPath(options.entityUrl)}/event-source-subscriptions/${encodeURIComponent(id)}`, + `${entityRpcPath(options.entityUrl)}/webhook-source-subscriptions/${encodeURIComponent(id)}`, { method: `PUT`, headers: { 'content-type': `application/json` }, body: JSON.stringify({ - sourceKey: options.sourceKey, + webhookKey: options.webhookKey, bucketKey: options.bucketKey, params: options.params, filterKey: options.filterKey, @@ -754,26 +754,26 @@ export function createRuntimeServerClient( ) if (!response.ok) { throw new Error( - `subscribeToEventSource failed (${response.status}): ${await readErrorText(response)}` + `subscribeToWebhookSource failed (${response.status}): ${await readErrorText(response)}` ) } return (await response.json()) as { txid: string - subscription: EventSourceSubscription + subscription: WebhookSourceSubscription } } - const unsubscribeFromEventSource = async (options: { + const unsubscribeFromWebhookSource = async (options: { entityUrl: string id: string }): Promise<{ txid: string }> => { const response = await request( - `${entityRpcPath(options.entityUrl)}/event-source-subscriptions/${encodeURIComponent(options.id)}`, + `${entityRpcPath(options.entityUrl)}/webhook-source-subscriptions/${encodeURIComponent(options.id)}`, { method: `DELETE` } ) if (!response.ok) { throw new Error( - `unsubscribeFromEventSource failed (${response.status}): ${await readErrorText(response)}` + `unsubscribeFromWebhookSource failed (${response.status}): ${await readErrorText(response)}` ) } return (await response.json()) as { txid: string } @@ -932,9 +932,9 @@ export function createRuntimeServerClient( ensureCronStream, ensureEntitiesMembershipStream, registerPgSyncSource, - listEventSources, - subscribeToEventSource, - unsubscribeFromEventSource, + listWebhookSources, + subscribeToWebhookSource, + unsubscribeFromWebhookSource, upsertCronSchedule, upsertFutureSendSchedule, deleteSchedule, diff --git a/packages/agents-runtime/src/tools.ts b/packages/agents-runtime/src/tools.ts index aa0f8741f8..ae3801ab21 100644 --- a/packages/agents-runtime/src/tools.ts +++ b/packages/agents-runtime/src/tools.ts @@ -5,5 +5,5 @@ export { createEditTool } from './tools/edit' export { braveSearchTool } from './tools/brave-search' export { createFetchUrlTool } from './tools/fetch-url' export { createScheduleTools } from './tools/schedules' -export { createEventSourceTools } from './tools/event-sources' +export { createWebhookSourceTools } from './tools/webhook-sources' export { createSendTool } from './tools/send' diff --git a/packages/agents-runtime/src/tools/event-sources.ts b/packages/agents-runtime/src/tools/webhook-sources.ts similarity index 52% rename from packages/agents-runtime/src/tools/event-sources.ts rename to packages/agents-runtime/src/tools/webhook-sources.ts index 1a626e3398..9b345e7d7c 100644 --- a/packages/agents-runtime/src/tools/event-sources.ts +++ b/packages/agents-runtime/src/tools/webhook-sources.ts @@ -1,18 +1,18 @@ import { Type } from '@sinclair/typebox' import { runtimeLog } from '../log' import { - buildEventSourceSubscriptionId, - defaultEventSourceSubscriptionLifetime, - eventSourceSubscriptionManifestKey, -} from '../event-sources' + buildWebhookSourceSubscriptionId, + defaultWebhookSourceSubscriptionLifetime, + webhookSourceSubscriptionManifestKey, +} from '../webhook-sources' import type { AgentTool } from '@mariozechner/pi-agent-core' import type { EntityStreamDBWithActions } from '../types' import type { - EventSourceContract, - EventSourceSubscription, - EventSourceSubscriptionInput, + WebhookSourceContract, + WebhookSourceSubscription, + WebhookSourceSubscriptionInput, SubscriptionLifetime, -} from '../event-sources' +} from '../webhook-sources' type ToolResult = { content: Array<{ type: `text`; text: string }> @@ -40,7 +40,7 @@ function formatForLog(value: unknown): string { } } -function withEventSourceToolLogging( +function withWebhookSourceToolLogging( entityUrl: string, toolName: string, execute: ( @@ -76,51 +76,51 @@ function isRecord(value: unknown): value is Record { return typeof value === `object` && value !== null && !Array.isArray(value) } -function isEventSourceManifest(value: unknown): boolean { +function isWebhookSourceManifest(value: unknown): boolean { if (!isRecord(value)) return false if (value.kind !== `source` || value.sourceType !== `webhook`) return false const config = value.config return ( isRecord(config) && - isRecord(config.eventSource) && - typeof config.eventSource.id === `string` + isRecord(config.webhookSource) && + typeof config.webhookSource.id === `string` ) } -function getEventSourceSubscriptions( +function getWebhookSourceSubscriptions( entityUrl: string, db: EntityStreamDBWithActions -): Array { - const entries: Array = [] +): Array { + const entries: Array = [] for (const entry of db.collections.manifests.toArray) { - if (!isEventSourceManifest(entry)) continue + if (!isWebhookSourceManifest(entry)) continue const manifest = entry as Record const config = manifest.config as Record - const eventSource = config.eventSource as Record - const id = String(eventSource.id) - const lifetime = isRecord(eventSource.lifetime) - ? (eventSource.lifetime as SubscriptionLifetime) - : defaultEventSourceSubscriptionLifetime() + const webhookSource = config.webhookSource as Record + const id = String(webhookSource.id) + const lifetime = isRecord(webhookSource.lifetime) + ? (webhookSource.lifetime as SubscriptionLifetime) + : defaultWebhookSourceSubscriptionLifetime() entries.push({ id, entityUrl, - sourceKey: - typeof eventSource.sourceKey === `string` - ? eventSource.sourceKey + webhookKey: + typeof webhookSource.webhookKey === `string` + ? webhookSource.webhookKey : String(config.endpointKey ?? ``), - ...(typeof eventSource.bucketKey === `string` - ? { bucketKey: eventSource.bucketKey } + ...(typeof webhookSource.bucketKey === `string` + ? { bucketKey: webhookSource.bucketKey } : {}), - params: isRecord(eventSource.params) ? eventSource.params : {}, - ...(typeof eventSource.filterKey === `string` - ? { filterKey: eventSource.filterKey } + params: isRecord(webhookSource.params) ? webhookSource.params : {}, + ...(typeof webhookSource.filterKey === `string` + ? { filterKey: webhookSource.filterKey } : {}), - filterApplied: eventSource.filterApplied === true, + filterApplied: webhookSource.filterApplied === true, contractRevision: - typeof eventSource.contractRevision === `number` - ? eventSource.contractRevision + typeof webhookSource.contractRevision === `number` + ? webhookSource.contractRevision : 0, sourceUrl: typeof config.streamUrl === `string` @@ -130,20 +130,20 @@ function getEventSourceSubscriptions( manifestKey: typeof manifest.key === `string` ? manifest.key - : eventSourceSubscriptionManifestKey(id), + : webhookSourceSubscriptionManifestKey(id), lifetime, - ...(typeof eventSource.reason === `string` - ? { reason: eventSource.reason } + ...(typeof webhookSource.reason === `string` + ? { reason: webhookSource.reason } : {}), createdBy: - eventSource.createdBy === `handler` || - eventSource.createdBy === `user` || - eventSource.createdBy === `system` - ? eventSource.createdBy + webhookSource.createdBy === `handler` || + webhookSource.createdBy === `user` || + webhookSource.createdBy === `system` + ? webhookSource.createdBy : `tool`, createdAt: - typeof eventSource.createdAt === `string` - ? eventSource.createdAt + typeof webhookSource.createdAt === `string` + ? webhookSource.createdAt : new Date(0).toISOString(), }) } @@ -151,12 +151,12 @@ function getEventSourceSubscriptions( return entries.sort((left, right) => left.id.localeCompare(right.id)) } -function getEventSourceSubscription( +function getWebhookSourceSubscription( entityUrl: string, db: EntityStreamDBWithActions, id: string -): EventSourceSubscription | undefined { - return getEventSourceSubscriptions(entityUrl, db).find( +): WebhookSourceSubscription | undefined { + return getWebhookSourceSubscriptions(entityUrl, db).find( (entry) => entry.id === id ) } @@ -172,65 +172,65 @@ const lifetimeSchema = Type.Union([ Type.Object({ kind: Type.Literal(`manual`) }), ]) -export function createEventSourceTools(opts: { +export function createWebhookSourceTools(opts: { entityUrl: string db: EntityStreamDBWithActions - listEventSources: () => Promise> - subscribeToEventSource: ( - opts: EventSourceSubscriptionInput - ) => Promise<{ txid: string; subscription: EventSourceSubscription }> - unsubscribeFromEventSource: (opts: { + listWebhookSources: () => Promise> + subscribeToWebhookSource: ( + opts: WebhookSourceSubscriptionInput + ) => Promise<{ txid: string; subscription: WebhookSourceSubscription }> + unsubscribeFromWebhookSource: (opts: { id: string }) => Promise<{ txid: string }> }): Array { const { db, entityUrl, - listEventSources, - subscribeToEventSource, - unsubscribeFromEventSource, + listWebhookSources, + subscribeToWebhookSource, + unsubscribeFromWebhookSource, } = opts const listSourcesTool: AgentTool = { - name: `list_event_sources`, - label: `List Event Sources`, - description: `List external event feeds you can subscribe to, such as webhook integrations from GitHub, Stripe, email, CI, or other services. Sources may expose named buckets and optional filters; use paramsSchema to choose sourceKey, bucketKey, params, and filterKey for subscribe_event_source.`, + name: `list_webhook_sources`, + label: `List Webhook Sources`, + description: `List external webhook feeds you can subscribe to, such as GitHub, Stripe, email, CI, or other webhook integrations. Webhook sources may expose named buckets and optional filters; use paramsSchema to choose webhookKey, bucketKey, params, and filterKey for subscribe_webhook_source.`, parameters: Type.Object({}), - execute: withEventSourceToolLogging( + execute: withWebhookSourceToolLogging( entityUrl, - `list_event_sources`, - async () => asToolResult(await listEventSources()) + `list_webhook_sources`, + async () => asToolResult(await listWebhookSources()) ), } const listSubscriptionsTool: AgentTool = { - name: `list_event_source_subscriptions`, - label: `List Event Subscriptions`, - description: `List your active event source subscriptions: external feeds and buckets that are currently configured to wake you when matching events arrive.`, + name: `list_webhook_source_subscriptions`, + label: `List Webhook Source Subscriptions`, + description: `List your active webhook source subscriptions: external feeds and buckets that are currently configured to wake you when matching events arrive.`, parameters: Type.Object({}), - execute: withEventSourceToolLogging( + execute: withWebhookSourceToolLogging( entityUrl, - `list_event_source_subscriptions`, - async () => asToolResult(getEventSourceSubscriptions(entityUrl, db)) + `list_webhook_source_subscriptions`, + async () => asToolResult(getWebhookSourceSubscriptions(entityUrl, db)) ), } const subscribeTool: AgentTool = { - name: `subscribe_event_source`, - label: `Subscribe Event Source`, - description: `Subscribe to a discoverable external event feed or one of its buckets so matching future events wake you with the matching event data in your next message. Use filterKey only when list_event_sources advertises a named filter you want; filters are advisory until server-side source filters are enabled.`, + name: `subscribe_webhook_source`, + label: `Subscribe Webhook Source`, + description: `Subscribe to a discoverable external webhook feed or one of its buckets so matching future webhooks wake you with the matching webhook data in your next message. Use filterKey only when list_webhook_sources advertises a named filter you want; filters are advisory until server-side webhook filters are enabled.`, parameters: Type.Object({ id: Type.Optional( Type.String({ - description: `Optional stable subscription id. Defaults to a deterministic id from sourceKey, bucketKey, params, and filterKey.`, + description: `Optional stable subscription id. Defaults to a deterministic id from webhookKey, bucketKey, params, and filterKey.`, }) ), - sourceKey: Type.String({ - description: `Event source key from list_event_sources`, + webhookKey: Type.String({ + description: `Webhook source key from list_webhook_sources`, }), bucketKey: Type.Optional( Type.String({ - description: `Bucket key from list_event_sources. Omit to subscribe to the source root stream.`, + description: `Bucket key from list_webhook_sources. Omit to subscribe to the source root stream.`, }) ), params: Type.Optional( @@ -250,56 +250,56 @@ export function createEventSourceTools(opts: { }) ), }), - execute: withEventSourceToolLogging( + execute: withWebhookSourceToolLogging( entityUrl, - `subscribe_event_source`, + `subscribe_webhook_source`, async (_toolCallId, params) => { - const parsed = params as EventSourceSubscriptionInput + const parsed = params as WebhookSourceSubscriptionInput const id = parsed.id ?? - buildEventSourceSubscriptionId({ - sourceKey: parsed.sourceKey, + buildWebhookSourceSubscriptionId({ + webhookKey: parsed.webhookKey, bucketKey: parsed.bucketKey, params: parsed.params, filterKey: parsed.filterKey, }) - const { txid, subscription } = await subscribeToEventSource({ + const { txid, subscription } = await subscribeToWebhookSource({ ...parsed, id, - lifetime: parsed.lifetime ?? defaultEventSourceSubscriptionLifetime(), + lifetime: parsed.lifetime ?? defaultWebhookSourceSubscriptionLifetime(), }) await db.utils.awaitTxId(txid, 10_000) return asToolResult( - getEventSourceSubscription(entityUrl, db, id) ?? subscription + getWebhookSourceSubscription(entityUrl, db, id) ?? subscription ) } ), } const unsubscribeTool: AgentTool = { - name: `unsubscribe_event_source`, - label: `Unsubscribe Event Source`, - description: `Stop being woken by an event source subscription.`, + name: `unsubscribe_webhook_source`, + label: `Unsubscribe Webhook Source`, + description: `Stop being woken by a webhook source subscription.`, parameters: Type.Object({ id: Type.String({ description: `Subscription id` }), }), - execute: withEventSourceToolLogging( + execute: withWebhookSourceToolLogging( entityUrl, - `unsubscribe_event_source`, + `unsubscribe_webhook_source`, async (_toolCallId, params) => { const { id } = params as { id: string } - const existing = getEventSourceSubscription(entityUrl, db, id) + const existing = getWebhookSourceSubscription(entityUrl, db, id) if (!existing) { return asToolResult( - `No event source subscription found for id "${id}"` + `No webhook source subscription found for id "${id}"` ) } - const { txid } = await unsubscribeFromEventSource({ id }) + const { txid } = await unsubscribeFromWebhookSource({ id }) await db.utils.awaitTxId(txid, 10_000) return asToolResult({ deleted: true, id, - key: eventSourceSubscriptionManifestKey(id), + key: webhookSourceSubscriptionManifestKey(id), }) } ), diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 7f07f57fad..82ceb2be0c 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -53,10 +53,10 @@ import type { SlashCommandHelpers, } from './composer-input' import type { - EventSourceContract, - EventSourceSubscription, - EventSourceSubscriptionInput, -} from './event-sources' + WebhookSourceContract, + WebhookSourceSubscription, + WebhookSourceSubscriptionInput, +} from './webhook-sources' import type { EntityTags, TagOperation } from './tags' export type EntityStreamDB = RuntimeEntityStreamDB @@ -734,11 +734,11 @@ export interface ProcessWakeConfig { messageType?: string }) => Promise<{ txid: string }> deleteSchedule: (opts: { id: string }) => Promise<{ txid: string }> - listEventSources: () => Promise> - subscribeToEventSource: ( - opts: EventSourceSubscriptionInput - ) => Promise<{ txid: string; subscription: EventSourceSubscription }> - unsubscribeFromEventSource: (opts: { + listWebhookSources: () => Promise> + subscribeToWebhookSource: ( + opts: WebhookSourceSubscriptionInput + ) => Promise<{ txid: string; subscription: WebhookSourceSubscription }> + unsubscribeFromWebhookSource: (opts: { id: string }) => Promise<{ txid: string }> }) => Array | Promise> diff --git a/packages/agents-runtime/src/event-sources.ts b/packages/agents-runtime/src/webhook-sources.ts similarity index 74% rename from packages/agents-runtime/src/event-sources.ts rename to packages/agents-runtime/src/webhook-sources.ts index ed79e07301..b1908f98a5 100644 --- a/packages/agents-runtime/src/event-sources.ts +++ b/packages/agents-runtime/src/webhook-sources.ts @@ -3,15 +3,15 @@ import { getWebhookStreamPath } from './observation-sources' import type { ErrorObject, ValidateFunction } from 'ajv' import type { WebhookEventRow } from './observation-sources' -export type EventSourceType = `webhook` -export type EventSourceStatus = `active` | `disabled` | `revoked` +export type WebhookSourceType = `webhook` +export type WebhookSourceStatus = `active` | `disabled` | `revoked` export type SubscriptionLifetime = | { kind: `until_entity_stopped` } | { kind: `expires_at`; at: string } | { kind: `manual` } -export type EventSourceFilterCondition = { +export type WebhookSourceFilterCondition = { collections?: Array ops?: Array<`insert` | `update` | `delete`> where?: { @@ -20,40 +20,40 @@ export type EventSourceFilterCondition = { } } -export type EventSourceFilter = { +export type WebhookSourceFilter = { key: string label: string description?: string - condition?: EventSourceFilterCondition + condition?: WebhookSourceFilterCondition } -export type EventSourceBucket = { +export type WebhookSourceBucket = { key: string label: string description?: string pathTemplate: string paramsSchema: Record eventTypes?: Array - filters?: Array + filters?: Array } -export type EventSourceContract = { +export type WebhookSourceContract = { serviceId?: string - sourceKey: string - sourceType: EventSourceType + webhookKey: string + sourceType: WebhookSourceType endpointKey: string - status: EventSourceStatus + status: WebhookSourceStatus label: string description?: string agentVisible: boolean - buckets: Array + buckets: Array updatedAt?: string revision: number } -export type EventSourceSubscriptionInput = { +export type WebhookSourceSubscriptionInput = { id?: string - sourceKey: string + webhookKey: string bucketKey?: string params?: Record filterKey?: string @@ -61,17 +61,17 @@ export type EventSourceSubscriptionInput = { reason?: string } -export type EventSourceSubscription = { +export type WebhookSourceSubscription = { id: string entityUrl: string - sourceKey: string + webhookKey: string bucketKey?: string params: Record filterKey?: string filterApplied: boolean contractRevision: number sourceUrl: string - sourceType: EventSourceType + sourceType: WebhookSourceType manifestKey: string lifetime: SubscriptionLifetime reason?: string @@ -79,39 +79,39 @@ export type EventSourceSubscription = { createdAt: string } -export type ResolvedEventSourceSubscription = { - subscription: EventSourceSubscription - contract: EventSourceContract - bucket?: EventSourceBucket +export type ResolvedWebhookSourceSubscription = { + subscription: WebhookSourceSubscription + contract: WebhookSourceContract + bucket?: WebhookSourceBucket bucketPath?: string } -export type EventSourceWakeChange = { +export type WebhookSourceWakeChange = { collection: string kind: `insert` | `update` | `delete` key: string } -export type EventSourceWakeInfo = { +export type WebhookSourceWakeInfo = { sourceUrl: string - sourceType: EventSourceType + sourceType: WebhookSourceType endpointKey: string - sourceKey: string + webhookKey: string subscriptionId: string bucket?: string bucketKey?: string params: Record filterKey?: string reason?: string - changes: Array + changes: Array } -export type HydratedEventSourceWake = { - type: `event_source_wake` +export type HydratedWebhookSourceWake = { + type: `webhook_source_wake` source: string - sourceType: EventSourceType + sourceType: WebhookSourceType endpointKey: string - sourceKey: string + webhookKey: string subscription: { id: string bucketKey?: string @@ -120,7 +120,7 @@ export type HydratedEventSourceWake = { reason?: string } bucket: string | null - changes: Array + changes: Array events: Array missingEventKeys?: Array } @@ -132,30 +132,30 @@ const paramsSchemaCache = new WeakMap< ValidateFunction >() -export function defaultEventSourceSubscriptionLifetime(): SubscriptionLifetime { +export function defaultWebhookSourceSubscriptionLifetime(): SubscriptionLifetime { return { ...DEFAULT_LIFETIME } } -export function eventSourceSubscriptionManifestKey(id: string): string { - return `event-source:${id}` +export function webhookSourceSubscriptionManifestKey(id: string): string { + return `webhook-source:${id}` } -export function buildEventSourceSubscriptionId(input: { - sourceKey: string +export function buildWebhookSourceSubscriptionId(input: { + webhookKey: string bucketKey?: string params?: Record filterKey?: string }): string { const prefix = normalizeIdentifierPart( - [input.sourceKey, input.bucketKey ?? `root`, input.filterKey] + [input.webhookKey, input.bucketKey ?? `root`, input.filterKey] .filter(Boolean) .join(`-`) ) return `${prefix}-${hashString(stableJson(input))}` } -export function renderEventSourceBucketPath( - bucket: EventSourceBucket, +export function renderWebhookSourceBucketPath( + bucket: WebhookSourceBucket, params: Record = {} ): string { const rendered = bucket.pathTemplate.replace( @@ -179,20 +179,20 @@ export function renderEventSourceBucketPath( return rendered } -export function resolveEventSourceSubscription(input: { - contract: EventSourceContract +export function resolveWebhookSourceSubscription(input: { + contract: WebhookSourceContract entityUrl: string - request: EventSourceSubscriptionInput - createdBy?: EventSourceSubscription[`createdBy`] + request: WebhookSourceSubscriptionInput + createdBy?: WebhookSourceSubscription[`createdBy`] createdAt?: string -}): ResolvedEventSourceSubscription { +}): ResolvedWebhookSourceSubscription { const { contract, request } = input if (!contract.agentVisible || contract.status !== `active`) { - throw new Error(`Event source "${contract.sourceKey}" is not active`) + throw new Error(`Webhook source "${contract.webhookKey}" is not active`) } - if (request.sourceKey !== contract.sourceKey) { + if (request.webhookKey !== contract.webhookKey) { throw new Error( - `Event source key mismatch: expected ${contract.sourceKey}, got ${request.sourceKey}` + `Webhook key mismatch: expected ${contract.webhookKey}, got ${request.webhookKey}` ) } @@ -202,7 +202,7 @@ export function resolveEventSourceSubscription(input: { : undefined if (request.bucketKey && !bucket) { throw new Error( - `Unknown bucket "${request.bucketKey}" for event source "${contract.sourceKey}"` + `Unknown bucket "${request.bucketKey}" for webhook source "${contract.webhookKey}"` ) } @@ -211,7 +211,7 @@ export function resolveEventSourceSubscription(input: { !bucket?.filters?.some((f) => f.key === request.filterKey) ) { throw new Error( - `Unknown filter "${request.filterKey}" for event source "${contract.sourceKey}"` + `Unknown filter "${request.filterKey}" for webhook source "${contract.webhookKey}"` ) } @@ -220,18 +220,18 @@ export function resolveEventSourceSubscription(input: { } const bucketPath = bucket - ? renderEventSourceBucketPath(bucket, params) + ? renderWebhookSourceBucketPath(bucket, params) : undefined const sourceUrl = getWebhookStreamPath(contract.endpointKey, bucketPath) const id = request.id ?? - buildEventSourceSubscriptionId({ - sourceKey: request.sourceKey, + buildWebhookSourceSubscriptionId({ + webhookKey: request.webhookKey, bucketKey: request.bucketKey, params, filterKey: request.filterKey, }) - const manifestKey = eventSourceSubscriptionManifestKey(id) + const manifestKey = webhookSourceSubscriptionManifestKey(id) return { contract, @@ -240,7 +240,7 @@ export function resolveEventSourceSubscription(input: { subscription: { id, entityUrl: input.entityUrl, - sourceKey: contract.sourceKey, + webhookKey: contract.webhookKey, ...(request.bucketKey ? { bucketKey: request.bucketKey } : {}), params, ...(request.filterKey ? { filterKey: request.filterKey } : {}), @@ -249,7 +249,7 @@ export function resolveEventSourceSubscription(input: { sourceUrl, sourceType: contract.sourceType, manifestKey, - lifetime: request.lifetime ?? defaultEventSourceSubscriptionLifetime(), + lifetime: request.lifetime ?? defaultWebhookSourceSubscriptionLifetime(), ...(request.reason ? { reason: request.reason } : {}), createdBy: input.createdBy ?? `tool`, createdAt: input.createdAt ?? new Date().toISOString(), @@ -258,7 +258,7 @@ export function resolveEventSourceSubscription(input: { } function validateBucketParams( - bucket: EventSourceBucket, + bucket: WebhookSourceBucket, params: Record ): void { const schema = bucket.paramsSchema @@ -302,8 +302,8 @@ function formatParamsSchemaError(error: ErrorObject): string { return `${path} ${error.message ?? `is invalid`}` } -export function buildEventSourceManifestEntry( - resolved: ResolvedEventSourceSubscription +export function buildWebhookSourceManifestEntry( + resolved: ResolvedWebhookSourceSubscription ): Record { const { subscription, contract, bucketPath } = resolved return { @@ -317,9 +317,9 @@ export function buildEventSourceManifestEntry( endpointKey: contract.endpointKey, streamUrl: subscription.sourceUrl, ...(bucketPath ? { bucket: bucketPath } : {}), - eventSource: { + webhookSource: { id: subscription.id, - sourceKey: subscription.sourceKey, + webhookKey: subscription.webhookKey, ...(subscription.bucketKey ? { bucketKey: subscription.bucketKey } : {}), @@ -343,14 +343,14 @@ export function buildEventSourceManifestEntry( } } -export function eventSourceWakeInfoFromManifests(input: { +export function webhookSourceWakeInfoFromManifests(input: { wakeEvent: { type?: unknown source?: unknown payload?: unknown } manifests: Array -}): EventSourceWakeInfo | null { +}): WebhookSourceWakeInfo | null { const wakePayload = asRecord(input.wakeEvent.payload) const sourceUrl = typeof wakePayload?.source === `string` @@ -360,7 +360,7 @@ export function eventSourceWakeInfoFromManifests(input: { : null if (input.wakeEvent.type !== `wake` || !sourceUrl) return null - const changes = normalizeEventSourceWakeChanges(wakePayload?.changes) + const changes = normalizeWebhookSourceWakeChanges(wakePayload?.changes) if (!changes.some((change) => change.collection === `webhook_event`)) { return null } @@ -373,31 +373,31 @@ export function eventSourceWakeInfoFromManifests(input: { } const config = asRecord(manifest.config) - const eventSource = asRecord(config?.eventSource) - if (!config || !eventSource) continue + const webhookSource = asRecord(config?.webhookSource) + if (!config || !webhookSource) continue if (config.streamUrl !== sourceUrl) continue const endpointKey = stringFrom(config.endpointKey) - const sourceKey = stringFrom(eventSource.sourceKey) - const subscriptionId = stringFrom(eventSource.id) - if (!endpointKey || !sourceKey || !subscriptionId) continue + const webhookKey = stringFrom(webhookSource.webhookKey) + const subscriptionId = stringFrom(webhookSource.id) + if (!endpointKey || !webhookKey || !subscriptionId) continue return { sourceUrl, sourceType: `webhook`, endpointKey, - sourceKey, + webhookKey, subscriptionId, ...(typeof config.bucket === `string` ? { bucket: config.bucket } : {}), - ...(typeof eventSource.bucketKey === `string` - ? { bucketKey: eventSource.bucketKey } + ...(typeof webhookSource.bucketKey === `string` + ? { bucketKey: webhookSource.bucketKey } : {}), - params: asRecord(eventSource.params) ?? {}, - ...(typeof eventSource.filterKey === `string` - ? { filterKey: eventSource.filterKey } + params: asRecord(webhookSource.params) ?? {}, + ...(typeof webhookSource.filterKey === `string` + ? { filterKey: webhookSource.filterKey } : {}), - ...(typeof eventSource.reason === `string` - ? { reason: eventSource.reason } + ...(typeof webhookSource.reason === `string` + ? { reason: webhookSource.reason } : {}), changes, } @@ -406,10 +406,10 @@ export function eventSourceWakeInfoFromManifests(input: { return null } -export function buildHydratedEventSourceWake( - info: EventSourceWakeInfo, +export function buildHydratedWebhookSourceWake( + info: WebhookSourceWakeInfo, events: Array -): HydratedEventSourceWake { +): HydratedWebhookSourceWake { const eventKeys = new Set( info.changes .filter((change) => change.collection === `webhook_event`) @@ -424,11 +424,11 @@ export function buildHydratedEventSourceWake( const missingEventKeys = [...eventKeys].filter((key) => !matchedKeys.has(key)) return { - type: `event_source_wake`, + type: `webhook_source_wake`, source: info.sourceUrl, sourceType: info.sourceType, endpointKey: info.endpointKey, - sourceKey: info.sourceKey, + webhookKey: info.webhookKey, subscription: { id: info.subscriptionId, ...(info.bucketKey ? { bucketKey: info.bucketKey } : {}), @@ -463,7 +463,7 @@ function normalizeIdentifierPart(input: string): string { .toLowerCase() .replace(/[^a-z0-9._-]+/g, `-`) .replace(/^-+|-+$/g, ``) - return normalized.length > 0 ? normalized.slice(0, 80) : `event-source` + return normalized.length > 0 ? normalized.slice(0, 80) : `webhook-source` } function asRecord(value: unknown): Record | null { @@ -476,12 +476,12 @@ function stringFrom(value: unknown): string | null { return typeof value === `string` && value.length > 0 ? value : null } -function normalizeEventSourceWakeChanges( +function normalizeWebhookSourceWakeChanges( value: unknown -): Array { +): Array { if (!Array.isArray(value)) return [] - return value.flatMap((entry): Array => { + return value.flatMap((entry): Array => { const record = asRecord(entry) if (!record) return [] const collection = stringFrom(record.collection) diff --git a/packages/agents-runtime/test/context-factory.test.ts b/packages/agents-runtime/test/context-factory.test.ts index eba3ba632f..231fcb0891 100644 --- a/packages/agents-runtime/test/context-factory.test.ts +++ b/packages/agents-runtime/test/context-factory.test.ts @@ -353,15 +353,15 @@ describe(`createHandlerContext`, () => { expect(receivedMessage).toBe(`load xyz skills`) }) - it(`uses hydrated event source wake data as the trigger message`, async () => { + it(`uses hydrated webhook source wake data as the trigger message`, async () => { const db = createMockDb([]) let receivedMessage = `` - const hydratedEventSourceWake = { - type: `event_source_wake` as const, + const hydratedWebhookSourceWake = { + type: `webhook_source_wake` as const, source: `/_webhooks/github-repo/prs/42`, sourceType: `webhook` as const, endpointKey: `github-repo`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, subscription: { id: `watch-pr-42`, bucketKey: `pull_request`, @@ -454,7 +454,7 @@ describe(`createHandlerContext`, () => { ], }, }, - hydratedEventSourceWake, + hydratedWebhookSourceWake, doObserve: vi.fn(), doSpawn: vi.fn(), doFork: vi.fn(), @@ -478,7 +478,7 @@ describe(`createHandlerContext`, () => { await ctx.agent.run() expect(JSON.parse(receivedMessage)).toMatchObject({ - type: `event_source_wake`, + type: `webhook_source_wake`, source: `/_webhooks/github-repo/prs/42`, events: [ { diff --git a/packages/agents-runtime/test/helpers/context-test-helpers.ts b/packages/agents-runtime/test/helpers/context-test-helpers.ts index 2aa60e55fd..0555003991 100644 --- a/packages/agents-runtime/test/helpers/context-test-helpers.ts +++ b/packages/agents-runtime/test/helpers/context-test-helpers.ts @@ -16,7 +16,7 @@ import type { WakeEvent, WakeSession, } from '../../src/types' -import type { HydratedEventSourceWake } from '../../src/event-sources' +import type { HydratedWebhookSourceWake } from '../../src/webhook-sources' import type { Sandbox } from '../../src/sandbox/types' // Minimal sandbox stub for tests that exercise HandlerContext shape but @@ -302,7 +302,7 @@ export function createTestHandlerContext( db?: ReturnType writeEvent?: (event: ChangeEvent) => void wakeEvent?: WakeEvent - hydratedEventSourceWake?: HydratedEventSourceWake | null + hydratedWebhookSourceWake?: HydratedWebhookSourceWake | null prepareAgentRun?: () => Promise } = {} ) { @@ -333,7 +333,7 @@ export function createTestHandlerContext( eventCount: 1, payload: `hi`, }, - hydratedEventSourceWake: opts.hydratedEventSourceWake, + hydratedWebhookSourceWake: opts.hydratedWebhookSourceWake, prepareAgentRun: opts.prepareAgentRun, doObserve: vi.fn(), doSpawn: vi.fn(), diff --git a/packages/agents-runtime/test/process-wake.test.ts b/packages/agents-runtime/test/process-wake.test.ts index e2ccf280dc..ce7b85a05c 100644 --- a/packages/agents-runtime/test/process-wake.test.ts +++ b/packages/agents-runtime/test/process-wake.test.ts @@ -3,10 +3,10 @@ import { createTransaction } from '@durable-streams/state/db' import { createAssistantMessageEventStream } from '@mariozechner/pi-ai' import { getCronSourceRef } from '../src/cron-utils' import { - buildEventSourceManifestEntry, - resolveEventSourceSubscription, - type EventSourceContract, -} from '../src/event-sources' + buildWebhookSourceManifestEntry, + resolveWebhookSourceSubscription, + type WebhookSourceContract, +} from '../src/webhook-sources' import { manifestSourceKey } from '../src/manifest-helpers' import { db, pgSync } from '../src/observation-sources' import { processWake } from '../src/process-wake' @@ -397,8 +397,8 @@ const sharedFindingsSchema = { }, } -const githubEventSourceContract: EventSourceContract = { - sourceKey: `github-repo`, +const githubWebhookSourceContract: WebhookSourceContract = { + webhookKey: `github-repo`, sourceType: `webhook`, endpointKey: `github-repo`, status: `active`, @@ -1764,21 +1764,21 @@ describe(`processWake`, () => { ) }) - it(`hydrates dynamic event source wake rows into the agent trigger message`, async () => { + it(`hydrates dynamic webhook source wake rows into the agent trigger message`, async () => { const sourceUrl = `/_webhooks/github-repo/prs/42` - const resolved = resolveEventSourceSubscription({ - contract: githubEventSourceContract, + const resolved = resolveWebhookSourceSubscription({ + contract: githubWebhookSourceContract, entityUrl: `http://localhost:3000/test-agent/agent-1`, request: { id: `watch-pr-42`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 42 }, reason: `Watch PR comments`, }, createdAt: `2026-05-23T00:00:00.000Z`, }) - mockInitialManifests.current = [buildEventSourceManifestEntry(resolved)] + mockInitialManifests.current = [buildWebhookSourceManifestEntry(resolved)] mockSourceEvents.current = [ { key: `event-42`, @@ -1861,7 +1861,7 @@ describe(`processWake`, () => { }) ) expect(JSON.parse(receivedMessage)).toMatchObject({ - type: `event_source_wake`, + type: `webhook_source_wake`, source: sourceUrl, subscription: { id: `watch-pr-42`, diff --git a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts index 731087a8f0..7965bb133c 100644 --- a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts +++ b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts @@ -266,15 +266,15 @@ describe(`runtime-server-client.deleteTag`, () => { }) }) -describe(`runtime-server-client event sources`, () => { - it(`lists event sources from the runtime server`, async () => { +describe(`runtime-server-client webhook sources`, () => { + it(`lists webhook sources from the runtime server`, async () => { const fakeFetch = vi.fn( async () => new Response( JSON.stringify({ - eventSources: [ + webhookSources: [ { - sourceKey: `github-repo`, + webhookKey: `github-repo`, sourceType: `webhook`, endpointKey: `github-repo`, status: `active`, @@ -296,28 +296,28 @@ describe(`runtime-server-client event sources`, () => { fetch: fakeFetch, }) - await expect(client.listEventSources()).resolves.toMatchObject([ - { sourceKey: `github-repo` }, + await expect(client.listWebhookSources()).resolves.toMatchObject([ + { webhookKey: `github-repo` }, ]) expect(fakeFetch).toHaveBeenCalledWith( - `http://test.example/t/tenant-a/v1/_electric/event-sources`, + `http://test.example/t/tenant-a/v1/_electric/webhook-sources`, expect.objectContaining({ method: `GET` }) ) }) - it(`subscribes to event sources with a deterministic id and JSON body`, async () => { + it(`subscribes to webhook sources with a deterministic id and JSON body`, async () => { const calls: Array<{ url: string; init?: RequestInit }> = [] const subscription = { id: `github-repo-pull-request-1kwxl2f`, entityUrl: `/coder/session-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, filterApplied: false, contractRevision: 1, sourceUrl: `/_webhooks/github-repo/prs/123`, sourceType: `webhook`, - manifestKey: `event-source:github-repo-pull-request-1kwxl2f`, + manifestKey: `webhook-source:github-repo-pull-request-1kwxl2f`, lifetime: { kind: `until_entity_stopped` }, createdBy: `tool`, createdAt: `2026-05-23T00:00:00.000Z`, @@ -335,9 +335,9 @@ describe(`runtime-server-client event sources`, () => { }) await expect( - client.subscribeToEventSource({ + client.subscribeToWebhookSource({ entityUrl: `/coder/session-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, reason: `Watch PR feedback`, @@ -346,18 +346,18 @@ describe(`runtime-server-client event sources`, () => { expect(calls).toHaveLength(1) expect(calls[0]!.url).toMatch( - /^http:\/\/test\.example\/_electric\/entities\/coder\/session-1\/event-source-subscriptions\/github-repo-pull_request-/ + /^http:\/\/test\.example\/_electric\/entities\/coder\/session-1\/webhook-source-subscriptions\/github-repo-pull_request-/ ) expect(calls[0]!.init?.method).toBe(`PUT`) expect(JSON.parse(calls[0]!.init!.body as string)).toEqual({ - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, reason: `Watch PR feedback`, }) }) - it(`surfaces event source subscription failures`, async () => { + it(`surfaces webhook source subscription failures`, async () => { const fakeFetch = vi.fn( async () => new Response(`invalid params`, { @@ -370,11 +370,11 @@ describe(`runtime-server-client event sources`, () => { }) await expect( - client.subscribeToEventSource({ + client.subscribeToWebhookSource({ entityUrl: `/coder/session-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, }) - ).rejects.toThrow(/subscribeToEventSource failed \(400\): invalid params/) + ).rejects.toThrow(/subscribeToWebhookSource failed \(400\): invalid params/) }) }) diff --git a/packages/agents-runtime/test/use-context-default.test.ts b/packages/agents-runtime/test/use-context-default.test.ts index b8d273c6bd..867fa319b3 100644 --- a/packages/agents-runtime/test/use-context-default.test.ts +++ b/packages/agents-runtime/test/use-context-default.test.ts @@ -76,7 +76,7 @@ describe(`zero-config default path`, () => { ) }) - it(`uses hydrated event source wake input even when context ends with a wake user message`, async () => { + it(`uses hydrated webhook source wake input even when context ends with a wake user message`, async () => { const source = `/_webhooks/github-repo/prs/54` const wakePayload = { source, @@ -106,12 +106,12 @@ describe(`zero-config default path`, () => { eventCount: 1, payload: wakePayload, }, - hydratedEventSourceWake: { - type: `event_source_wake`, + hydratedWebhookSourceWake: { + type: `webhook_source_wake`, source, sourceType: `webhook`, endpointKey: `github-repo`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, subscription: { id: `watch-pr-54`, bucketKey: `pull_request`, @@ -168,7 +168,7 @@ describe(`zero-config default path`, () => { content: expect.stringContaining(`webhook_event`), }) expect(JSON.parse(capturedInputs[0] ?? ``)).toMatchObject({ - type: `event_source_wake`, + type: `webhook_source_wake`, source, events: [ { diff --git a/packages/agents-runtime/test/use-context-memoization.test.ts b/packages/agents-runtime/test/use-context-memoization.test.ts index 95fed76502..01a2cb820f 100644 --- a/packages/agents-runtime/test/use-context-memoization.test.ts +++ b/packages/agents-runtime/test/use-context-memoization.test.ts @@ -63,7 +63,7 @@ describe(`useContext memoization`, () => { expect(v2).toHaveBeenCalledTimes(1) }) - it(`reordering structurally identical source keys does not trigger a new registration`, async () => { + it(`reordering structurally identical webhook keys does not trigger a new registration`, async () => { const { ctx } = createTestHandlerContext({}) const a = vi.fn(() => `a`) const b = vi.fn(() => `b`) diff --git a/packages/agents-runtime/test/event-source-tools.test.ts b/packages/agents-runtime/test/webhook-source-tools.test.ts similarity index 63% rename from packages/agents-runtime/test/event-source-tools.test.ts rename to packages/agents-runtime/test/webhook-source-tools.test.ts index badcf78653..0b47f8c37d 100644 --- a/packages/agents-runtime/test/event-source-tools.test.ts +++ b/packages/agents-runtime/test/webhook-source-tools.test.ts @@ -1,37 +1,37 @@ import { describe, expect, it, vi } from 'vitest' -import { createEventSourceTools } from '../src/tools/event-sources' +import { createWebhookSourceTools } from '../src/tools/webhook-sources' import type { - EventSourceContract, - EventSourceSubscription, -} from '../src/event-sources' + WebhookSourceContract, + WebhookSourceSubscription, +} from '../src/webhook-sources' -describe(`event source tools`, () => { - it(`lists discoverable event sources`, async () => { - const tools = createEventSourceTools({ +describe(`webhook source tools`, () => { + it(`lists discoverable webhook sources`, async () => { + const tools = createWebhookSourceTools({ entityUrl: `/coder/session-1`, db: dbWithManifests([]), - listEventSources: vi.fn(async () => [githubContract]), - subscribeToEventSource: vi.fn(), - unsubscribeFromEventSource: vi.fn(), + listWebhookSources: vi.fn(async () => [githubContract]), + subscribeToWebhookSource: vi.fn(), + unsubscribeFromWebhookSource: vi.fn(), }) - const result = await executeTool(tools, `list_event_sources`, {}) + const result = await executeTool(tools, `list_webhook_sources`, {}) expect(JSON.parse(result.content[0]!.text)).toEqual([githubContract]) }) it(`lists subscriptions from webhook source manifests`, async () => { - const tools = createEventSourceTools({ + const tools = createWebhookSourceTools({ entityUrl: `/coder/session-1`, - db: dbWithManifests([eventSourceManifest]), - listEventSources: vi.fn(async () => []), - subscribeToEventSource: vi.fn(), - unsubscribeFromEventSource: vi.fn(), + db: dbWithManifests([webhookSourceManifest]), + listWebhookSources: vi.fn(async () => []), + subscribeToWebhookSource: vi.fn(), + unsubscribeFromWebhookSource: vi.fn(), }) const result = await executeTool( tools, - `list_event_source_subscriptions`, + `list_webhook_source_subscriptions`, {} ) @@ -39,7 +39,7 @@ describe(`event source tools`, () => { { id: `watch-pr-123`, entityUrl: `/coder/session-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, sourceUrl: `/_webhooks/github-repo/prs/123`, @@ -49,7 +49,7 @@ describe(`event source tools`, () => { it(`subscribes and waits for the returned txid`, async () => { const awaitTxId = vi.fn(async () => {}) - const subscribeToEventSource = vi.fn(async (opts) => ({ + const subscribeToWebhookSource = vi.fn(async (opts) => ({ txid: `tx-1`, subscription: { ...subscription, @@ -57,25 +57,25 @@ describe(`event source tools`, () => { lifetime: opts.lifetime, }, })) - const tools = createEventSourceTools({ + const tools = createWebhookSourceTools({ entityUrl: `/coder/session-1`, db: dbWithManifests([], awaitTxId), - listEventSources: vi.fn(async () => []), - subscribeToEventSource, - unsubscribeFromEventSource: vi.fn(), + listWebhookSources: vi.fn(async () => []), + subscribeToWebhookSource, + unsubscribeFromWebhookSource: vi.fn(), }) - const result = await executeTool(tools, `subscribe_event_source`, { - sourceKey: `github-repo`, + const result = await executeTool(tools, `subscribe_webhook_source`, { + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, reason: `Watch PR feedback`, }) - expect(subscribeToEventSource).toHaveBeenCalledWith( + expect(subscribeToWebhookSource).toHaveBeenCalledWith( expect.objectContaining({ id: expect.stringMatching(/^github-repo-pull_request-/), - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, lifetime: { kind: `until_entity_stopped` }, @@ -84,34 +84,34 @@ describe(`event source tools`, () => { ) expect(awaitTxId).toHaveBeenCalledWith(`tx-1`, 10_000) expect(JSON.parse(result.content[0]!.text)).toMatchObject({ - sourceKey: `github-repo`, + webhookKey: `github-repo`, sourceUrl: `/_webhooks/github-repo/prs/123`, }) }) - it(`unsubscribes existing event source subscriptions`, async () => { + it(`unsubscribes existing webhook source subscriptions`, async () => { const awaitTxId = vi.fn(async () => {}) - const unsubscribeFromEventSource = vi.fn(async () => ({ txid: `tx-2` })) - const tools = createEventSourceTools({ + const unsubscribeFromWebhookSource = vi.fn(async () => ({ txid: `tx-2` })) + const tools = createWebhookSourceTools({ entityUrl: `/coder/session-1`, - db: dbWithManifests([eventSourceManifest], awaitTxId), - listEventSources: vi.fn(async () => []), - subscribeToEventSource: vi.fn(), - unsubscribeFromEventSource, + db: dbWithManifests([webhookSourceManifest], awaitTxId), + listWebhookSources: vi.fn(async () => []), + subscribeToWebhookSource: vi.fn(), + unsubscribeFromWebhookSource, }) - const result = await executeTool(tools, `unsubscribe_event_source`, { + const result = await executeTool(tools, `unsubscribe_webhook_source`, { id: `watch-pr-123`, }) - expect(unsubscribeFromEventSource).toHaveBeenCalledWith({ + expect(unsubscribeFromWebhookSource).toHaveBeenCalledWith({ id: `watch-pr-123`, }) expect(awaitTxId).toHaveBeenCalledWith(`tx-2`, 10_000) expect(JSON.parse(result.content[0]!.text)).toEqual({ deleted: true, id: `watch-pr-123`, - key: `event-source:watch-pr-123`, + key: `webhook-source:watch-pr-123`, }) }) }) @@ -133,7 +133,7 @@ function dbWithManifests( } async function executeTool( - tools: ReturnType, + tools: ReturnType, name: string, params: Record ) { @@ -142,8 +142,8 @@ async function executeTool( return await (tool.execute as any)(`call-1`, params) } -const githubContract: EventSourceContract = { - sourceKey: `github-repo`, +const githubContract: WebhookSourceContract = { + webhookKey: `github-repo`, sourceType: `webhook`, endpointKey: `github-repo`, status: `active`, @@ -164,25 +164,25 @@ const githubContract: EventSourceContract = { ], } -const subscription: EventSourceSubscription = { +const subscription: WebhookSourceSubscription = { id: `watch-pr-123`, entityUrl: `/coder/session-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, filterApplied: false, contractRevision: 1, sourceUrl: `/_webhooks/github-repo/prs/123`, sourceType: `webhook`, - manifestKey: `event-source:watch-pr-123`, + manifestKey: `webhook-source:watch-pr-123`, lifetime: { kind: `until_entity_stopped` }, reason: `Watch PR feedback`, createdBy: `tool`, createdAt: `2026-05-23T00:00:00.000Z`, } -const eventSourceManifest = { - key: `event-source:watch-pr-123`, +const webhookSourceManifest = { + key: `webhook-source:watch-pr-123`, kind: `source`, sourceType: `webhook`, sourceRef: `github-repo/prs/123`, @@ -190,9 +190,9 @@ const eventSourceManifest = { endpointKey: `github-repo`, streamUrl: `/_webhooks/github-repo/prs/123`, bucket: `prs/123`, - eventSource: { + webhookSource: { id: `watch-pr-123`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, filterApplied: false, diff --git a/packages/agents-runtime/test/event-sources.test.ts b/packages/agents-runtime/test/webhook-sources.test.ts similarity index 77% rename from packages/agents-runtime/test/event-sources.test.ts rename to packages/agents-runtime/test/webhook-sources.test.ts index ba715d6e59..57ecfe9e42 100644 --- a/packages/agents-runtime/test/event-sources.test.ts +++ b/packages/agents-runtime/test/webhook-sources.test.ts @@ -1,20 +1,20 @@ import { describe, expect, it } from 'vitest' import { - buildEventSourceManifestEntry, - buildHydratedEventSourceWake, - buildEventSourceSubscriptionId, - eventSourceWakeInfoFromManifests, - renderEventSourceBucketPath, - resolveEventSourceSubscription, - type EventSourceContract, - type EventSourceWakeInfo, -} from '../src/event-sources' + buildWebhookSourceManifestEntry, + buildHydratedWebhookSourceWake, + buildWebhookSourceSubscriptionId, + webhookSourceWakeInfoFromManifests, + renderWebhookSourceBucketPath, + resolveWebhookSourceSubscription, + type WebhookSourceContract, + type WebhookSourceWakeInfo, +} from '../src/webhook-sources' import type { WebhookEventRow } from '../src/observation-sources' -describe(`event source helpers`, () => { +describe(`webhook source helpers`, () => { it(`renders bucket template paths from params`, () => { expect( - renderEventSourceBucketPath( + renderWebhookSourceBucketPath( { key: `pull_request`, label: `Pull request`, @@ -27,11 +27,11 @@ describe(`event source helpers`, () => { }) it(`resolves webhook subscriptions into durable stream manifest entries`, () => { - const resolved = resolveEventSourceSubscription({ + const resolved = resolveWebhookSourceSubscription({ contract: githubContract, entityUrl: `/coder/session-1`, request: { - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, lifetime: { kind: `until_entity_stopped` }, @@ -42,7 +42,7 @@ describe(`event source helpers`, () => { expect(resolved.subscription).toMatchObject({ entityUrl: `/coder/session-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, sourceUrl: `/_webhooks/github-repo/prs/123`, sourceType: `webhook`, @@ -52,7 +52,7 @@ describe(`event source helpers`, () => { reason: `Watch PR comments`, }) - expect(buildEventSourceManifestEntry(resolved)).toMatchObject({ + expect(buildWebhookSourceManifestEntry(resolved)).toMatchObject({ key: resolved.subscription.manifestKey, kind: `source`, sourceType: `webhook`, @@ -61,8 +61,8 @@ describe(`event source helpers`, () => { endpointKey: `github-repo`, streamUrl: `/_webhooks/github-repo/prs/123`, bucket: `prs/123`, - eventSource: { - sourceKey: `github-repo`, + webhookSource: { + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, filterApplied: false, @@ -79,11 +79,11 @@ describe(`event source helpers`, () => { it(`rejects bucket params that do not match paramsSchema`, () => { expect(() => - resolveEventSourceSubscription({ + resolveWebhookSourceSubscription({ contract: githubContract, entityUrl: `/coder/session-1`, request: { - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: `123` }, }, @@ -92,27 +92,27 @@ describe(`event source helpers`, () => { }) it(`builds deterministic subscription ids`, () => { - const left = buildEventSourceSubscriptionId({ - sourceKey: `github-repo`, + const left = buildWebhookSourceSubscriptionId({ + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, }) - const right = buildEventSourceSubscriptionId({ + const right = buildWebhookSourceSubscriptionId({ params: { number: 123 }, bucketKey: `pull_request`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, }) expect(left).toBe(right) }) - it(`hydrates event source wake changes with matching webhook rows`, () => { - const resolved = resolveEventSourceSubscription({ + it(`hydrates webhook source wake changes with matching webhook rows`, () => { + const resolved = resolveWebhookSourceSubscription({ contract: githubContract, entityUrl: `/coder/session-1`, request: { id: `watch-pr-123`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, lifetime: { kind: `until_entity_stopped` }, @@ -120,9 +120,9 @@ describe(`event source helpers`, () => { }, createdAt: `2026-05-23T00:00:00.000Z`, }) - const manifest = buildEventSourceManifestEntry(resolved) + const manifest = buildWebhookSourceManifestEntry(resolved) - const info = eventSourceWakeInfoFromManifests({ + const info = webhookSourceWakeInfoFromManifests({ manifests: [manifest], wakeEvent: { type: `wake`, @@ -145,7 +145,7 @@ describe(`event source helpers`, () => { sourceUrl: `/_webhooks/github-repo/prs/123`, sourceType: `webhook`, endpointKey: `github-repo`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, subscriptionId: `watch-pr-123`, bucket: `prs/123`, bucketKey: `pull_request`, @@ -161,16 +161,16 @@ describe(`event source helpers`, () => { }, }, }) - const hydrated = buildHydratedEventSourceWake(info as EventSourceWakeInfo, [ + const hydrated = buildHydratedWebhookSourceWake(info as WebhookSourceWakeInfo, [ webhookEvent({ key: `event-0` }), event, ]) expect(hydrated).toMatchObject({ - type: `event_source_wake`, + type: `webhook_source_wake`, source: `/_webhooks/github-repo/prs/123`, endpointKey: `github-repo`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucket: `prs/123`, subscription: { id: `watch-pr-123`, @@ -212,9 +212,9 @@ function webhookEvent( } } -const githubContract: EventSourceContract = { +const githubContract: WebhookSourceContract = { serviceId: `svc-agent-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, sourceType: `webhook`, endpointKey: `github-repo`, status: `active`, diff --git a/packages/agents-server/CHANGELOG.md b/packages/agents-server/CHANGELOG.md index 09ae28d2dc..ba2975b9da 100644 --- a/packages/agents-server/CHANGELOG.md +++ b/packages/agents-server/CHANGELOG.md @@ -154,7 +154,7 @@ ### Patch Changes -- 833a1cb: Add agent event source contracts and dynamic event source subscription tools. Agents can list active, agent-visible webhook-backed event sources, subscribe entities to resolved bucket streams with explicit lifetimes, and persist those subscriptions as manifest-backed wake registrations. Bucket params are validated against the advertised `paramsSchema` before a subscription is accepted. Horton now receives these tools through the built-in agents runtime by default. Runtime-managed event source wakes now hydrate matching webhook rows into the agent trigger message so tool-created subscriptions include the event payload that caused the wake. +- 833a1cb: Add agent webhook source contracts and dynamic webhook source subscription tools. Agents can list active, agent-visible webhook-backed webhook sources, subscribe entities to resolved bucket streams with explicit lifetimes, and persist those subscriptions as manifest-backed wake registrations. Bucket params are validated against the advertised `paramsSchema` before a subscription is accepted. Horton now receives these tools through the built-in agents runtime by default. Runtime-managed webhook source wakes now hydrate matching webhook rows into the agent trigger message so tool-created subscriptions include the event payload that caused the wake. - Updated dependencies [833a1cb] - Updated dependencies [1349a55] - Updated dependencies [833a1cb] diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index fbbb189028..9183e26991 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -7,7 +7,7 @@ import { getCronStreamPath, getSharedStateStreamPath, getNextCronFireAt, - eventSourceSubscriptionManifestKey, + webhookSourceSubscriptionManifestKey, manifestChildKey, manifestSharedStateKey, manifestSourceKey, @@ -56,7 +56,7 @@ import type { queueAsPromised } from 'fastq' import type { SchedulerClient } from './scheduler.js' import type { WakeEvalResult, WakeRegistry } from './wake-registry.js' import type { WakeMessage } from '@electric-ax/agents-runtime' -import type { EventSourceSubscription } from '@electric-ax/agents-runtime' +import type { WebhookSourceSubscription } from '@electric-ax/agents-runtime' import type { PostgresRegistry } from './entity-registry.js' import type { SchemaValidator } from './electric-agents/schema-validator.js' import type { StreamClient } from './stream-client.js' @@ -3023,13 +3023,13 @@ export class EntityManager { return { txid } } - async upsertEventSourceSubscription( + async upsertWebhookSourceSubscription( entityUrl: string, req: { - subscription: EventSourceSubscription + subscription: WebhookSourceSubscription manifest: Record } - ): Promise<{ txid: string; subscription: EventSourceSubscription }> { + ): Promise<{ txid: string; subscription: WebhookSourceSubscription }> { const manifestKey = req.subscription.manifestKey const txid = randomUUID() await this.writeManifestEntry( @@ -3065,11 +3065,11 @@ export class EntityManager { return { txid, subscription: req.subscription } } - async deleteEventSourceSubscription( + async deleteWebhookSourceSubscription( entityUrl: string, req: { id: string } ): Promise<{ txid: string }> { - const manifestKey = eventSourceSubscriptionManifestKey(req.id) + const manifestKey = webhookSourceSubscriptionManifestKey(req.id) const txid = randomUUID() await this.writeManifestEntry(entityUrl, manifestKey, `delete`, undefined, { txid, diff --git a/packages/agents-server/src/index.ts b/packages/agents-server/src/index.ts index e411f78771..2dda597414 100644 --- a/packages/agents-server/src/index.ts +++ b/packages/agents-server/src/index.ts @@ -65,17 +65,17 @@ export type { AuthorizeRequest, } from './electric-agents-types.js' export type { - EventSourceBucket, - EventSourceContract, - EventSourceFilter, - EventSourceSubscription, - EventSourceSubscriptionInput, + WebhookSourceBucket, + WebhookSourceContract, + WebhookSourceFilter, + WebhookSourceSubscription, + WebhookSourceSubscriptionInput, SubscriptionLifetime, } from '@electric-ax/agents-runtime' export type { Principal, PrincipalKind } from './principal.js' export { globalRouter } from './routing/global-router.js' export type { GlobalRoutes } from './routing/global-router.js' -export type { EventSourceCatalog, TenantContext } from './routing/context.js' +export type { WebhookSourceCatalog, TenantContext } from './routing/context.js' export { streamRootDurableStreamsRoutingAdapter, pathPrefixedSingleTenantDurableStreamsRoutingAdapter, diff --git a/packages/agents-server/src/routing/context.ts b/packages/agents-server/src/routing/context.ts index bb58be4ffd..73a2e6f3cd 100644 --- a/packages/agents-server/src/routing/context.ts +++ b/packages/agents-server/src/routing/context.ts @@ -1,6 +1,6 @@ import type { Agent } from 'undici' import type { - EventSourceContract, + WebhookSourceContract, WebhookSignatureVerifierConfig, } from '@electric-ax/agents-runtime' import type { DrizzleDB } from '../db/index.js' @@ -15,16 +15,16 @@ import type { DurableStreamsBearerProvider } from '../stream-client.js' import type { WebhookSigner } from '../webhook-signing.js' import type { AuthorizeRequest } from '../electric-agents-types.js' -export interface EventSourceCatalog { - listEventSources: () => - | Array - | Promise> - getEventSource: ( - sourceKey: string +export interface WebhookSourceCatalog { + listWebhookSources: () => + | Array + | Promise> + getWebhookSource: ( + webhookKey: string ) => - | EventSourceContract + | WebhookSourceContract | undefined - | Promise + | Promise } /** @@ -56,8 +56,8 @@ export interface TenantContext { runtime: ElectricAgentsTenantRuntime entityBridgeManager: EntityBridgeCoordinator pgSyncBridgeManager?: PgSyncBridgeCoordinator - eventSources?: EventSourceCatalog - ensureEventSourceWakeSource?: (sourceUrl: string) => Promise | void + webhookSources?: WebhookSourceCatalog + ensureWebhookSourceWakeSource?: (sourceUrl: string) => Promise | void authorizeRequest?: AuthorizeRequest isShuttingDown: () => boolean } diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index 7a6189643b..cb1ef841fc 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -4,8 +4,8 @@ import { Type, type Static } from '@sinclair/typebox' import { - buildEventSourceManifestEntry, - resolveEventSourceSubscription, + buildWebhookSourceManifestEntry, + resolveWebhookSourceSubscription, } from '@electric-ax/agents-runtime' import { Router, json, status } from 'itty-router' import { apiError } from '../electric-agents-http.js' @@ -45,7 +45,7 @@ import type { import type { JsonRouteRequest } from './schema.js' import type { RouterType } from 'itty-router' import type { TenantContext } from './context.js' -import type { EventSourceSubscriptionInput } from '@electric-ax/agents-runtime' +import type { WebhookSourceSubscriptionInput } from '@electric-ax/agents-runtime' interface AgentsRouteRequest extends JsonRouteRequest { entityRoute?: ExistingEntityRoute @@ -318,8 +318,8 @@ const subscriptionLifetimeSchema = Type.Union([ Type.Object({ kind: Type.Literal(`manual`) }), ]) -const eventSourceSubscriptionBodySchema = Type.Object({ - sourceKey: Type.String(), +const webhookSourceSubscriptionBodySchema = Type.Object({ + webhookKey: Type.String(), bucketKey: Type.Optional(Type.String()), params: Type.Optional(Type.Record(Type.String(), Type.Unknown())), filterKey: Type.Optional(Type.String()), @@ -334,8 +334,8 @@ type ForkBody = Static type SetTagBody = Static type SignalBody = Static type ScheduleBody = Static -type EventSourceSubscriptionBody = Static< - typeof eventSourceSubscriptionBodySchema +type WebhookSourceSubscriptionBody = Static< + typeof webhookSourceSubscriptionBodySchema > type EntityPermissionGrantInput = Static< typeof entityPermissionGrantInputSchema @@ -473,17 +473,17 @@ entitiesRouter.delete( deleteSchedule ) entitiesRouter.put( - `/:type/:instanceId/event-source-subscriptions/:subscriptionId`, + `/:type/:instanceId/webhook-source-subscriptions/:subscriptionId`, withExistingEntity, - withSchema(eventSourceSubscriptionBodySchema), + withSchema(webhookSourceSubscriptionBodySchema), withEntityPermission(`write`), - upsertEventSourceSubscription + upsertWebhookSourceSubscription ) entitiesRouter.delete( - `/:type/:instanceId/event-source-subscriptions/:subscriptionId`, + `/:type/:instanceId/webhook-source-subscriptions/:subscriptionId`, withExistingEntity, withEntityPermission(`write`), - deleteEventSourceSubscription + deleteWebhookSourceSubscription ) entitiesRouter.get( `/:type/:instanceId/grants`, @@ -997,33 +997,33 @@ async function deleteSchedule( return json(result) } -async function upsertEventSourceSubscription( +async function upsertWebhookSourceSubscription( request: AgentsRouteRequest, ctx: TenantContext ): Promise { const principalMutationError = rejectPrincipalEntityMutation( request, - `subscribed to event sources` + `subscribed to webhook sources` ) if (principalMutationError) return principalMutationError - const catalog = ctx.eventSources + const catalog = ctx.webhookSources if (!catalog) { return apiError( 404, ErrCodeNotFound, - `No event source catalog is configured` + `No webhook source catalog is configured` ) } const { entityUrl } = requireExistingEntityRoute(request) - const parsed = routeBody(request) - const source = await catalog.getEventSource(parsed.sourceKey) + const parsed = routeBody(request) + const source = await catalog.getWebhookSource(parsed.webhookKey) if (!source) { return apiError( 404, ErrCodeNotFound, - `Event source "${parsed.sourceKey}" not found` + `Webhook source "${parsed.webhookKey}" not found` ) } @@ -1038,13 +1038,13 @@ async function upsertEventSourceSubscription( } } - let resolved: ReturnType + let resolved: ReturnType try { - resolved = resolveEventSourceSubscription({ + resolved = resolveWebhookSourceSubscription({ contract: source, entityUrl, request: { - ...(parsed as EventSourceSubscriptionInput), + ...(parsed as WebhookSourceSubscriptionInput), id: decodeURIComponent(request.params.subscriptionId), }, createdBy: `tool`, @@ -1057,30 +1057,30 @@ async function upsertEventSourceSubscription( ) } - await ctx.ensureEventSourceWakeSource?.(resolved.subscription.sourceUrl) + await ctx.ensureWebhookSourceWakeSource?.(resolved.subscription.sourceUrl) - const result = await ctx.entityManager.upsertEventSourceSubscription( + const result = await ctx.entityManager.upsertWebhookSourceSubscription( entityUrl, { subscription: resolved.subscription, - manifest: buildEventSourceManifestEntry(resolved), + manifest: buildWebhookSourceManifestEntry(resolved), } ) return json(result) } -async function deleteEventSourceSubscription( +async function deleteWebhookSourceSubscription( request: AgentsRouteRequest, ctx: TenantContext ): Promise { const principalMutationError = rejectPrincipalEntityMutation( request, - `unsubscribed from event sources` + `unsubscribed from webhook sources` ) if (principalMutationError) return principalMutationError const { entityUrl } = requireExistingEntityRoute(request) - const result = await ctx.entityManager.deleteEventSourceSubscription( + const result = await ctx.entityManager.deleteWebhookSourceSubscription( entityUrl, { id: decodeURIComponent(request.params.subscriptionId), diff --git a/packages/agents-server/src/routing/internal-router.ts b/packages/agents-server/src/routing/internal-router.ts index fe544d3b10..fed402b9f9 100644 --- a/packages/agents-server/src/routing/internal-router.ts +++ b/packages/agents-server/src/routing/internal-router.ts @@ -38,7 +38,7 @@ import { routeBody, validateOptionalJsonBody, withSchema } from './schema.js' import { withLeadingSlash } from './tenant-stream-paths.js' import type { IRequest, RouterType } from 'itty-router' import type { - EventSourceContract, + WebhookSourceContract, WebhookSignatureVerifierConfig, } from '@electric-ax/agents-runtime' import type { TenantContext } from './context.js' @@ -121,7 +121,7 @@ export const internalRouter: InternalRoutes = Router< }) internalRouter.get(`/health`, () => json({ status: `ok` })) -internalRouter.get(`/event-sources`, listEventSources) +internalRouter.get(`/webhook-sources`, listWebhookSources) internalRouter.post( `/wake`, withSchema(wakeRegistrationBodySchema), @@ -366,17 +366,17 @@ async function registerWake( return status(204) } -async function listEventSources( +async function listWebhookSources( _request: IRequest, ctx: TenantContext ): Promise { - const eventSources = ctx.eventSources - ? await ctx.eventSources.listEventSources() + const webhookSources = ctx.webhookSources + ? await ctx.webhookSources.listWebhookSources() : [] - return json({ eventSources: eventSources.filter(isAgentVisibleEventSource) }) + return json({ webhookSources: webhookSources.filter(isAgentVisibleWebhookSource) }) } -function isAgentVisibleEventSource(source: EventSourceContract): boolean { +function isAgentVisibleWebhookSource(source: WebhookSourceContract): boolean { return source.agentVisible === true && source.status === `active` } diff --git a/packages/agents-server/src/server.ts b/packages/agents-server/src/server.ts index 6a09239308..8e265dca4d 100644 --- a/packages/agents-server/src/server.ts +++ b/packages/agents-server/src/server.ts @@ -35,7 +35,7 @@ import type { Principal } from './principal.js' import type { EntityBridgeCoordinator } from './entity-bridge-manager.js' import type { DurableStreamsRoutingAdapter } from './routing/durable-streams-routing-adapter.js' import type { OssServerContext } from './routing/oss-server-router.js' -import type { EventSourceCatalog } from './routing/context.js' +import type { WebhookSourceCatalog } from './routing/context.js' import type { PgSyncBridgeManagerOptions } from './pg-sync-bridge-manager.js' import type { StartedStandaloneAgentsRuntime } from './standalone-runtime.js' import type { DurableStreamsBearerProvider } from './stream-client.js' @@ -71,8 +71,8 @@ export interface ElectricAgentsServerOptions { ) => Promise | Principal | null authorizeRequest?: AuthorizeRequest allowDevPrincipalFallback?: boolean - eventSources?: EventSourceCatalog - ensureEventSourceWakeSource?: (sourceUrl: string) => Promise | void + webhookSources?: WebhookSourceCatalog + ensureWebhookSourceWakeSource?: (sourceUrl: string) => Promise | void pgSync?: PgSyncBridgeManagerOptions /** * Disabled by default. When set to a positive interval, periodically @@ -450,13 +450,13 @@ export class ElectricAgentsServer { runtime: this.standaloneRuntime.runtime, entityBridgeManager: this.entityBridgeManager, pgSyncBridgeManager: this.standaloneRuntime.runtime.pgSyncBridgeManager, - ...(this.options.eventSources - ? { eventSources: this.options.eventSources } + ...(this.options.webhookSources + ? { webhookSources: this.options.webhookSources } : {}), - ...(this.options.ensureEventSourceWakeSource + ...(this.options.ensureWebhookSourceWakeSource ? { - ensureEventSourceWakeSource: - this.options.ensureEventSourceWakeSource, + ensureWebhookSourceWakeSource: + this.options.ensureWebhookSourceWakeSource, } : {}), ...(this.options.authorizeRequest diff --git a/packages/agents-server/test/electric-agents-manager-write-validation.test.ts b/packages/agents-server/test/electric-agents-manager-write-validation.test.ts index 4cab89f72b..7f3f114529 100644 --- a/packages/agents-server/test/electric-agents-manager-write-validation.test.ts +++ b/packages/agents-server/test/electric-agents-manager-write-validation.test.ts @@ -311,29 +311,29 @@ describe(`ElectricAgentsManager composer input validation`, () => { }) }) -describe(`ElectricAgentsManager event source subscriptions`, () => { +describe(`ElectricAgentsManager webhook source subscriptions`, () => { it(`persists the manifest before registering wake side effects`, async () => { const calls: Array = [] const manager = createManifestManager(calls) - await manager.upsertEventSourceSubscription(`/coder/session-1`, { + await manager.upsertWebhookSourceSubscription(`/coder/session-1`, { subscription: { id: `watch-pr-123`, entityUrl: `/coder/session-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, filterApplied: false, contractRevision: 1, sourceUrl: `/_webhooks/github-repo/prs/123`, sourceType: `webhook`, - manifestKey: `event-source:watch-pr-123`, + manifestKey: `webhook-source:watch-pr-123`, lifetime: { kind: `until_entity_stopped` }, createdBy: `tool`, createdAt: `2026-05-23T00:00:00.000Z`, }, manifest: { - key: `event-source:watch-pr-123`, + key: `webhook-source:watch-pr-123`, kind: `source`, sourceType: `webhook`, sourceRef: `github-repo/prs/123`, @@ -356,7 +356,7 @@ describe(`ElectricAgentsManager event source subscriptions`, () => { const calls: Array = [] const manager = createManifestManager(calls) - await manager.deleteEventSourceSubscription(`/coder/session-1`, { + await manager.deleteWebhookSourceSubscription(`/coder/session-1`, { id: `watch-pr-123`, }) diff --git a/packages/agents-server/test/event-source-subscriptions-route.test.ts b/packages/agents-server/test/webhook-source-subscriptions-route.test.ts similarity index 60% rename from packages/agents-server/test/event-source-subscriptions-route.test.ts rename to packages/agents-server/test/webhook-source-subscriptions-route.test.ts index 7a477f90f3..076dd0b7af 100644 --- a/packages/agents-server/test/event-source-subscriptions-route.test.ts +++ b/packages/agents-server/test/webhook-source-subscriptions-route.test.ts @@ -1,18 +1,18 @@ import { describe, expect, it, vi } from 'vitest' import { internalRouter } from '../src/routing/internal-router' import type { - EventSourceContract, - EventSourceSubscription, + WebhookSourceContract, + WebhookSourceSubscription, TenantContext, } from '../src/index' -describe(`event source subscription routes`, () => { +describe(`webhook source subscription routes`, () => { it(`creates a manifest-backed webhook wake subscription`, async () => { - const upsertEventSourceSubscription = vi.fn( + const upsertWebhookSourceSubscription = vi.fn( async ( _entityUrl: string, req: { - subscription: EventSourceSubscription + subscription: WebhookSourceSubscription manifest: Record } ) => ({ @@ -20,20 +20,20 @@ describe(`event source subscription routes`, () => { subscription: req.subscription, }) ) - const ensureEventSourceWakeSource = vi.fn(async () => {}) + const ensureWebhookSourceWakeSource = vi.fn(async () => {}) const ctx = tenantContext({ - upsertEventSourceSubscription, - ensureEventSourceWakeSource, + upsertWebhookSourceSubscription, + ensureWebhookSourceWakeSource, }) const response = await internalRouter.fetch( new Request( - `http://agents.test/_electric/entities/coder/session-1/event-source-subscriptions/watch-pr-123`, + `http://agents.test/_electric/entities/coder/session-1/webhook-source-subscriptions/watch-pr-123`, { method: `PUT`, headers: { 'content-type': `application/json` }, body: JSON.stringify({ - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: 123 }, lifetime: { kind: `until_entity_stopped` }, @@ -50,20 +50,20 @@ describe(`event source subscription routes`, () => { subscription: { id: `watch-pr-123`, entityUrl: `/coder/session-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, sourceUrl: `/_webhooks/github-repo/prs/123`, - manifestKey: `event-source:watch-pr-123`, + manifestKey: `webhook-source:watch-pr-123`, filterApplied: false, }, }) - expect(ensureEventSourceWakeSource).toHaveBeenCalledWith( + expect(ensureWebhookSourceWakeSource).toHaveBeenCalledWith( `/_webhooks/github-repo/prs/123` ) - expect(upsertEventSourceSubscription).toHaveBeenCalledWith( + expect(upsertWebhookSourceSubscription).toHaveBeenCalledWith( `/coder/session-1`, expect.objectContaining({ manifest: expect.objectContaining({ - key: `event-source:watch-pr-123`, + key: `webhook-source:watch-pr-123`, sourceType: `webhook`, config: expect.objectContaining({ endpointKey: `github-repo`, @@ -74,72 +74,72 @@ describe(`event source subscription routes`, () => { ) }) - it(`lists configured event source contracts`, async () => { + it(`lists configured webhook source contracts`, async () => { const ctx = tenantContext() const response = await internalRouter.fetch( - new Request(`http://agents.test/_electric/event-sources`), + new Request(`http://agents.test/_electric/webhook-sources`), ctx ) expect(response?.status).toBe(200) await expect(response!.json()).resolves.toEqual({ - eventSources: [githubContract], + webhookSources: [githubContract], }) }) - it(`hides disabled and agent-invisible event sources from discovery`, async () => { - const hiddenContract: EventSourceContract = { + it(`hides disabled and agent-invisible webhook sources from discovery`, async () => { + const hiddenContract: WebhookSourceContract = { ...githubContract, - sourceKey: `hidden-repo`, + webhookKey: `hidden-repo`, agentVisible: false, } - const disabledContract: EventSourceContract = { + const disabledContract: WebhookSourceContract = { ...githubContract, - sourceKey: `disabled-repo`, + webhookKey: `disabled-repo`, status: `disabled`, } const ctx = tenantContext({ - eventSources: { - listEventSources: () => [ + webhookSources: { + listWebhookSources: () => [ githubContract, hiddenContract, disabledContract, ], - getEventSource: (sourceKey: string) => + getWebhookSource: (webhookKey: string) => [githubContract, hiddenContract, disabledContract].find( - (source) => source.sourceKey === sourceKey + (source) => source.webhookKey === webhookKey ), }, }) const response = await internalRouter.fetch( - new Request(`http://agents.test/_electric/event-sources`), + new Request(`http://agents.test/_electric/webhook-sources`), ctx ) expect(response?.status).toBe(200) await expect(response!.json()).resolves.toEqual({ - eventSources: [githubContract], + webhookSources: [githubContract], }) }) it(`rejects subscriptions whose params do not match the bucket schema`, async () => { - const upsertEventSourceSubscription = vi.fn() - const ensureEventSourceWakeSource = vi.fn(async () => {}) + const upsertWebhookSourceSubscription = vi.fn() + const ensureWebhookSourceWakeSource = vi.fn(async () => {}) const ctx = tenantContext({ - upsertEventSourceSubscription, - ensureEventSourceWakeSource, + upsertWebhookSourceSubscription, + ensureWebhookSourceWakeSource, }) const response = await internalRouter.fetch( new Request( - `http://agents.test/_electric/entities/coder/session-1/event-source-subscriptions/watch-pr-bad`, + `http://agents.test/_electric/entities/coder/session-1/webhook-source-subscriptions/watch-pr-bad`, { method: `PUT`, headers: { 'content-type': `application/json` }, body: JSON.stringify({ - sourceKey: `github-repo`, + webhookKey: `github-repo`, bucketKey: `pull_request`, params: { number: `123` }, }), @@ -154,17 +154,17 @@ describe(`event source subscription routes`, () => { message: expect.stringMatching(/paramsSchema.*number/), }, }) - expect(ensureEventSourceWakeSource).not.toHaveBeenCalled() - expect(upsertEventSourceSubscription).not.toHaveBeenCalled() + expect(ensureWebhookSourceWakeSource).not.toHaveBeenCalled() + expect(upsertWebhookSourceSubscription).not.toHaveBeenCalled() }) }) function tenantContext( overrides: { - upsertEventSourceSubscription?: unknown - deleteEventSourceSubscription?: unknown - ensureEventSourceWakeSource?: TenantContext[`ensureEventSourceWakeSource`] - eventSources?: TenantContext[`eventSources`] + upsertWebhookSourceSubscription?: unknown + deleteWebhookSourceSubscription?: unknown + ensureWebhookSourceWakeSource?: TenantContext[`ensureWebhookSourceWakeSource`] + webhookSources?: TenantContext[`webhookSources`] } = {} ): TenantContext { const registry = { @@ -195,41 +195,41 @@ function tenantContext( pgDb: {} as never, entityManager: { registry, - upsertEventSourceSubscription: vi.fn(async () => ({ + upsertWebhookSourceSubscription: vi.fn(async () => ({ txid: `tx-1`, })), - deleteEventSourceSubscription: vi.fn(async () => ({ txid: `tx-1` })), - ...(overrides.upsertEventSourceSubscription + deleteWebhookSourceSubscription: vi.fn(async () => ({ txid: `tx-1` })), + ...(overrides.upsertWebhookSourceSubscription ? { - upsertEventSourceSubscription: - overrides.upsertEventSourceSubscription, + upsertWebhookSourceSubscription: + overrides.upsertWebhookSourceSubscription, } : {}), - ...(overrides.deleteEventSourceSubscription + ...(overrides.deleteWebhookSourceSubscription ? { - deleteEventSourceSubscription: - overrides.deleteEventSourceSubscription, + deleteWebhookSourceSubscription: + overrides.deleteWebhookSourceSubscription, } : {}), } as never, streamClient: {} as never, runtime: {} as never, entityBridgeManager: {} as never, - eventSources: overrides.eventSources ?? { - listEventSources: () => [githubContract], - getEventSource: (sourceKey: string) => - sourceKey === githubContract.sourceKey ? githubContract : undefined, + webhookSources: overrides.webhookSources ?? { + listWebhookSources: () => [githubContract], + getWebhookSource: (webhookKey: string) => + webhookKey === githubContract.webhookKey ? githubContract : undefined, }, - ...(overrides.ensureEventSourceWakeSource - ? { ensureEventSourceWakeSource: overrides.ensureEventSourceWakeSource } + ...(overrides.ensureWebhookSourceWakeSource + ? { ensureWebhookSourceWakeSource: overrides.ensureWebhookSourceWakeSource } : {}), isShuttingDown: () => false, } } -const githubContract: EventSourceContract = { +const githubContract: WebhookSourceContract = { serviceId: `svc-agent-1`, - sourceKey: `github-repo`, + webhookKey: `github-repo`, sourceType: `webhook`, endpointKey: `github-repo`, status: `active`, diff --git a/packages/agents/CHANGELOG.md b/packages/agents/CHANGELOG.md index 1e88f14dd7..f65e029c64 100644 --- a/packages/agents/CHANGELOG.md +++ b/packages/agents/CHANGELOG.md @@ -171,7 +171,7 @@ mcp.servers` block. ### Patch Changes -- 833a1cb: Add agent event source contracts and dynamic event source subscription tools. Agents can list active, agent-visible webhook-backed event sources, subscribe entities to resolved bucket streams with explicit lifetimes, and persist those subscriptions as manifest-backed wake registrations. Bucket params are validated against the advertised `paramsSchema` before a subscription is accepted. Horton now receives these tools through the built-in agents runtime by default. Runtime-managed event source wakes now hydrate matching webhook rows into the agent trigger message so tool-created subscriptions include the event payload that caused the wake. +- 833a1cb: Add agent webhook source contracts and dynamic webhook source subscription tools. Agents can list active, agent-visible webhook-backed webhook sources, subscribe entities to resolved bucket streams with explicit lifetimes, and persist those subscriptions as manifest-backed wake registrations. Bucket params are validated against the advertised `paramsSchema` before a subscription is accepted. Horton now receives these tools through the built-in agents runtime by default. Runtime-managed webhook source wakes now hydrate matching webhook rows into the agent trigger message so tool-created subscriptions include the event payload that caused the wake. - Updated dependencies [833a1cb] - Updated dependencies [833a1cb] - @electric-ax/agents-runtime@0.3.4 diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index 7c95f88b54..337c7d0683 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -202,7 +202,7 @@ export function buildHortonSystemPrompt( workingDirectory: string, opts: { hasDocsSupport?: boolean - hasEventSourceTools?: boolean + hasWebhookSourceTools?: boolean hasScheduleTools?: boolean hasSkills?: boolean docsUrl?: string @@ -213,8 +213,8 @@ export function buildHortonSystemPrompt( const docsTools = opts.hasDocsSupport ? `\n- search_electric_agents_docs: hybrid search over the built-in Electric Agents docs index` : `` - const eventSourceTools = opts.hasEventSourceTools - ? `\n- list_event_sources: list external webhook/event feeds you can subscribe to, including available buckets and parameters\n- subscribe_event_source: subscribe yourself to one of those feeds or buckets so matching future events wake you\n- list_event_source_subscriptions: list your active event source subscriptions\n- unsubscribe_event_source: remove one of your event source subscriptions by id` + const webhookSourceTools = opts.hasWebhookSourceTools + ? `\n- list_webhook_sources: list external webhook feeds you can subscribe to, including available buckets and parameters\n- subscribe_webhook_source: subscribe yourself to one of those feeds or buckets so matching future webhooks wake you\n- list_webhook_source_subscriptions: list your active webhook source subscriptions\n- unsubscribe_webhook_source: remove one of your webhook source subscriptions by id` : `` const titleTool = `\n- set_title: set or rename this chat session's UI title` const scheduleTools = opts.hasScheduleTools @@ -287,7 +287,7 @@ When a user opens with a greeting ("hi", "hello", "hey", etc.) or a broad statem - fork: spawn a child session that inherits this conversation's history up to the latest completed response. Same parent-ownership model as spawn_worker — when the fork's next run finishes, you'll wake with its response. - observe_pg_sync: observe an Electric Postgres sync stream and wake on matching changes - send: send a message to an Electric Agent/entity. To schedule future work for yourself, call send with self: true and afterMs. -${eventSourceTools}${titleTool}${scheduleTools}${docsTools}${skillsTools} +${webhookSourceTools}${titleTool}${scheduleTools}${docsTools}${skillsTools} # Working with files - Prefer edit over write when modifying existing files. @@ -556,8 +556,8 @@ function createAssistantHandler(options: { ...loadedSkills.tools, ...mcp.tools(), ] - const hasEventSourceTools = tools.some( - (tool) => getToolName(tool) === `list_event_sources` + const hasWebhookSourceTools = tools.some( + (tool) => getToolName(tool) === `list_webhook_sources` ) const hasScheduleTools = tools.some( (tool) => getToolName(tool) === `upsert_cron_schedule` @@ -693,7 +693,7 @@ function createAssistantHandler(options: { docsUrl, modelProvider: modelConfig.provider, modelId: String(modelConfig.model), - hasEventSourceTools, + hasWebhookSourceTools, hasScheduleTools, }), ...modelConfig, diff --git a/packages/agents/src/bootstrap.ts b/packages/agents/src/bootstrap.ts index 236224b542..1179789613 100644 --- a/packages/agents/src/bootstrap.ts +++ b/packages/agents/src/bootstrap.ts @@ -9,7 +9,7 @@ import { createRuntimeHandler, } from '@electric-ax/agents-runtime' import { - createEventSourceTools, + createWebhookSourceTools, createScheduleTools, } from '@electric-ax/agents-runtime/tools' import { @@ -97,7 +97,7 @@ export function createBuiltinElectricTools( ): BuiltinElectricToolsFactory { return async (context) => { const builtinTools = [ - ...createEventSourceTools(context), + ...createWebhookSourceTools(context), ...createScheduleTools({ ...context, db: context.db as any }), ] const customTools = custom ? await custom(context) : [] diff --git a/packages/agents/test/horton-system-prompt.test.ts b/packages/agents/test/horton-system-prompt.test.ts index d582b18962..e901db8d55 100644 --- a/packages/agents/test/horton-system-prompt.test.ts +++ b/packages/agents/test/horton-system-prompt.test.ts @@ -22,21 +22,21 @@ describe(`buildHortonSystemPrompt`, () => { expect(prompt).not.toContain(`# Electric Agents documentation`) }) - it(`describes event source tools when they are available`, () => { + it(`describes webhook source tools when they are available`, () => { const prompt = buildHortonSystemPrompt(`/tmp/test`, { - hasEventSourceTools: true, + hasWebhookSourceTools: true, }) - expect(prompt).toContain(`list_event_sources`) - expect(prompt).toContain(`subscribe_event_source`) - expect(prompt).toContain(`external webhook/event feeds`) + expect(prompt).toContain(`list_webhook_sources`) + expect(prompt).toContain(`subscribe_webhook_source`) + expect(prompt).toContain(`external webhook feeds`) expect(prompt).toContain(`subscribe yourself`) }) - it(`omits event source tools when they are unavailable`, () => { + it(`omits webhook source tools when they are unavailable`, () => { const prompt = buildHortonSystemPrompt(`/tmp/test`) - expect(prompt).not.toContain(`list_event_sources`) - expect(prompt).not.toContain(`subscribe_event_source`) + expect(prompt).not.toContain(`list_webhook_sources`) + expect(prompt).not.toContain(`subscribe_webhook_source`) }) it(`includes docs URL guidance alongside local docs support`, () => { diff --git a/packages/agents/test/horton-tool-composition.test.ts b/packages/agents/test/horton-tool-composition.test.ts index 24f3f2c8f8..23edb301a5 100644 --- a/packages/agents/test/horton-tool-composition.test.ts +++ b/packages/agents/test/horton-tool-composition.test.ts @@ -95,25 +95,25 @@ function createElectricToolsContext() { upsertCronSchedule: vi.fn(async () => ({ txid: `tx-cron` })), upsertFutureSendSchedule: vi.fn(async () => ({ txid: `tx-future` })), deleteSchedule: vi.fn(async () => ({ txid: `tx-delete` })), - listEventSources: vi.fn(async () => []), - subscribeToEventSource: vi.fn(async () => ({ + listWebhookSources: vi.fn(async () => []), + subscribeToWebhookSource: vi.fn(async () => ({ txid: `tx-subscribe`, subscription: { id: `subscription`, entityUrl: `/horton/smoke/main`, - sourceKey: `github`, + webhookKey: `github`, params: {}, filterApplied: false, contractRevision: 1, sourceUrl: `/_webhooks/github`, sourceType: `webhook`, - manifestKey: `event-source:subscription`, + manifestKey: `webhook-source:subscription`, lifetime: { kind: `until_entity_stopped` }, createdBy: `tool`, createdAt: new Date(0).toISOString(), }, })), - unsubscribeFromEventSource: vi.fn(async () => ({ txid: `tx-unsubscribe` })), + unsubscribeFromWebhookSource: vi.fn(async () => ({ txid: `tx-unsubscribe` })), } as any } @@ -219,7 +219,7 @@ describe(`horton tool composition`, () => { ) }) - it(`adds event source and schedule tools through the built-in electric tool factory`, async () => { + it(`adds webhook source and schedule tools through the built-in electric tool factory`, async () => { const tools = await createBuiltinElectricTools()( createElectricToolsContext() ) @@ -227,28 +227,28 @@ describe(`horton tool composition`, () => { expect(names).toEqual( expect.arrayContaining([ - `list_event_sources`, - `subscribe_event_source`, - `list_event_source_subscriptions`, - `unsubscribe_event_source`, + `list_webhook_sources`, + `subscribe_webhook_source`, + `list_webhook_source_subscriptions`, + `unsubscribe_webhook_source`, `upsert_cron_schedule`, `delete_schedule`, `list_schedules`, ]) ) expect( - tools.find((tool) => tool.name === `list_event_sources`)?.description - ).toContain(`external event feeds`) + tools.find((tool) => tool.name === `list_webhook_sources`)?.description + ).toContain(`external webhook feeds`) expect( - tools.find((tool) => tool.name === `list_event_sources`)?.description + tools.find((tool) => tool.name === `list_webhook_sources`)?.description ).not.toContain(`this entity`) expect( - tools.find((tool) => tool.name === `list_event_source_subscriptions`) + tools.find((tool) => tool.name === `list_webhook_source_subscriptions`) ?.description ).not.toContain(`manifest-backed`) }) - it(`includes event source and schedule electric tools in Horton and describes them in the prompt`, async () => { + it(`includes webhook source and schedule electric tools in Horton and describes them in the prompt`, async () => { const electricTools = await createBuiltinElectricTools()( createElectricToolsContext() ) @@ -257,13 +257,13 @@ describe(`horton tool composition`, () => { .filter((t) => !isMcpToolsSentinel(t)) .map((t) => (t as { name: string }).name) - expect(names).toContain(`list_event_sources`) - expect(names).toContain(`subscribe_event_source`) + expect(names).toContain(`list_webhook_sources`) + expect(names).toContain(`subscribe_webhook_source`) expect(names).toContain(`upsert_cron_schedule`) expect(names).toContain(`delete_schedule`) expect(names).toContain(`list_schedules`) - expect(cfg.systemPrompt).toContain(`list_event_sources`) - expect(cfg.systemPrompt).toContain(`subscribe_event_source`) + expect(cfg.systemPrompt).toContain(`list_webhook_sources`) + expect(cfg.systemPrompt).toContain(`subscribe_webhook_source`) expect(cfg.systemPrompt).toContain(`upsert_cron_schedule`) expect(cfg.systemPrompt).toContain(`delete_schedule`) expect(cfg.systemPrompt).toContain(`list_schedules`) From bb59c0f8d62410aa88a8bbd4f8375866046f9642 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 08:52:19 -0600 Subject: [PATCH 2/3] Add changeset for webhook source rename --- .changeset/rename-webhook-sources.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/rename-webhook-sources.md diff --git a/.changeset/rename-webhook-sources.md b/.changeset/rename-webhook-sources.md new file mode 100644 index 0000000000..109368c557 --- /dev/null +++ b/.changeset/rename-webhook-sources.md @@ -0,0 +1,7 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server': patch +'@electric-ax/agents': patch +--- + +Rename agent-facing webhook subscription APIs from generic event source terminology to webhook source terminology. This is a breaking rename for the experimental webhook-source tools, runtime/server types, routes, manifest metadata, and wake payload names. From 318c7942d72c8c316865d298953540886e1f6900 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 09:04:32 -0600 Subject: [PATCH 3/3] Format webhook source rename changes --- packages/agents-runtime/src/runtime-server-client.ts | 4 +++- packages/agents-runtime/src/tools/webhook-sources.ts | 3 ++- packages/agents-runtime/test/webhook-sources.test.ts | 8 ++++---- packages/agents-server/src/routing/internal-router.ts | 4 +++- .../test/webhook-source-subscriptions-route.test.ts | 5 ++++- packages/agents/test/horton-tool-composition.test.ts | 4 +++- 6 files changed, 19 insertions(+), 9 deletions(-) diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index 4fb8dbb828..c2fd03155e 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -711,7 +711,9 @@ export function createRuntimeServerClient( return (await response.json()) as { streamUrl: string; sourceRef: string } } - const listWebhookSources = async (): Promise> => { + const listWebhookSources = async (): Promise< + Array + > => { const response = await request(`/_electric/webhook-sources`, { method: `GET`, }) diff --git a/packages/agents-runtime/src/tools/webhook-sources.ts b/packages/agents-runtime/src/tools/webhook-sources.ts index 9b345e7d7c..bcfbf98c6c 100644 --- a/packages/agents-runtime/src/tools/webhook-sources.ts +++ b/packages/agents-runtime/src/tools/webhook-sources.ts @@ -266,7 +266,8 @@ export function createWebhookSourceTools(opts: { const { txid, subscription } = await subscribeToWebhookSource({ ...parsed, id, - lifetime: parsed.lifetime ?? defaultWebhookSourceSubscriptionLifetime(), + lifetime: + parsed.lifetime ?? defaultWebhookSourceSubscriptionLifetime(), }) await db.utils.awaitTxId(txid, 10_000) return asToolResult( diff --git a/packages/agents-runtime/test/webhook-sources.test.ts b/packages/agents-runtime/test/webhook-sources.test.ts index 57ecfe9e42..a436d174e3 100644 --- a/packages/agents-runtime/test/webhook-sources.test.ts +++ b/packages/agents-runtime/test/webhook-sources.test.ts @@ -161,10 +161,10 @@ describe(`webhook source helpers`, () => { }, }, }) - const hydrated = buildHydratedWebhookSourceWake(info as WebhookSourceWakeInfo, [ - webhookEvent({ key: `event-0` }), - event, - ]) + const hydrated = buildHydratedWebhookSourceWake( + info as WebhookSourceWakeInfo, + [webhookEvent({ key: `event-0` }), event] + ) expect(hydrated).toMatchObject({ type: `webhook_source_wake`, diff --git a/packages/agents-server/src/routing/internal-router.ts b/packages/agents-server/src/routing/internal-router.ts index fed402b9f9..4e187e6510 100644 --- a/packages/agents-server/src/routing/internal-router.ts +++ b/packages/agents-server/src/routing/internal-router.ts @@ -373,7 +373,9 @@ async function listWebhookSources( const webhookSources = ctx.webhookSources ? await ctx.webhookSources.listWebhookSources() : [] - return json({ webhookSources: webhookSources.filter(isAgentVisibleWebhookSource) }) + return json({ + webhookSources: webhookSources.filter(isAgentVisibleWebhookSource), + }) } function isAgentVisibleWebhookSource(source: WebhookSourceContract): boolean { diff --git a/packages/agents-server/test/webhook-source-subscriptions-route.test.ts b/packages/agents-server/test/webhook-source-subscriptions-route.test.ts index 076dd0b7af..d962cebbd7 100644 --- a/packages/agents-server/test/webhook-source-subscriptions-route.test.ts +++ b/packages/agents-server/test/webhook-source-subscriptions-route.test.ts @@ -221,7 +221,10 @@ function tenantContext( webhookKey === githubContract.webhookKey ? githubContract : undefined, }, ...(overrides.ensureWebhookSourceWakeSource - ? { ensureWebhookSourceWakeSource: overrides.ensureWebhookSourceWakeSource } + ? { + ensureWebhookSourceWakeSource: + overrides.ensureWebhookSourceWakeSource, + } : {}), isShuttingDown: () => false, } diff --git a/packages/agents/test/horton-tool-composition.test.ts b/packages/agents/test/horton-tool-composition.test.ts index 23edb301a5..293a3c2b29 100644 --- a/packages/agents/test/horton-tool-composition.test.ts +++ b/packages/agents/test/horton-tool-composition.test.ts @@ -113,7 +113,9 @@ function createElectricToolsContext() { createdAt: new Date(0).toISOString(), }, })), - unsubscribeFromWebhookSource: vi.fn(async () => ({ txid: `tx-unsubscribe` })), + unsubscribeFromWebhookSource: vi.fn(async () => ({ + txid: `tx-unsubscribe`, + })), } as any }