From c0e7f0852f3aa7d81cff4784e1c83bc4c59925d7 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 13:40:57 -0600 Subject: [PATCH 1/8] Require pg sync source URL --- packages/agents-runtime/src/setup-context.ts | 7 +- .../agents-runtime/test/process-wake.test.ts | 3 +- .../runtime-server-client-pg-sync.test.ts | 6 +- .../src/pg-sync-bridge-manager.ts | 14 ++-- .../src/routing/pg-sync-router.ts | 6 +- .../test/pg-sync-bridge-manager.test.ts | 64 ++++++++++++------- packages/agents/src/tools/observe-pg-sync.ts | 14 ++-- .../agents/test/observe-pg-sync-tool.test.ts | 34 ++++++++-- 8 files changed, 97 insertions(+), 51 deletions(-) diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index ce9edfff48..bf375db8ec 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -886,10 +886,9 @@ export function createSetupContext( source.ensureStream.contentType ) } - const sourceStreamUrl = - source.sourceType === `pgSync` || !source.streamUrl.startsWith(`/`) - ? source.streamUrl - : appendPathToUrl(config.serverBaseUrl, source.streamUrl) + const sourceStreamUrl = source.streamUrl.startsWith(`/`) + ? appendPathToUrl(config.serverBaseUrl, source.streamUrl) + : source.streamUrl sourceDb = await wiring.createSourceDb( sourceStreamUrl, source.schema, diff --git a/packages/agents-runtime/test/process-wake.test.ts b/packages/agents-runtime/test/process-wake.test.ts index e2ccf280dc..10500600b0 100644 --- a/packages/agents-runtime/test/process-wake.test.ts +++ b/packages/agents-runtime/test/process-wake.test.ts @@ -1619,6 +1619,7 @@ describe(`processWake`, () => { it(`pgSync observe registers pgSync source before source DB preload`, async () => { const source = pgSync({ + url: `http://localhost:30000/v1/shape`, table: `todos`, where: `priority = $1`, params: [`high`], @@ -1640,7 +1641,7 @@ describe(`processWake`, () => { expect(mockCreateStreamDB).toHaveBeenCalledWith( expect.objectContaining({ streamOptions: expect.objectContaining({ - url: `/_electric/pg-sync/default/pg-source-1`, + url: `http://localhost:3000/_electric/pg-sync/default/pg-source-1`, contentType: `application/json`, }), state: expect.objectContaining({ diff --git a/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts b/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts index 652d4342fc..a049090a90 100644 --- a/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts +++ b/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts @@ -26,6 +26,7 @@ describe(`runtime-server-client.registerPgSyncSource`, () => { }) const options = { + url: `http://localhost:30000/v1/shape`, table: `todos`, columns: [`id`, `text`], where: `priority = $1`, @@ -59,7 +60,10 @@ describe(`runtime-server-client.registerPgSyncSource`, () => { }) await expect( - client.registerPgSyncSource({ table: `todos` }) + client.registerPgSyncSource({ + url: `http://localhost:30000/v1/shape`, + table: `todos`, + }) ).rejects.toThrow(/registerPgSyncSource failed \(400\): bad table/) }) }) diff --git a/packages/agents-server/src/pg-sync-bridge-manager.ts b/packages/agents-server/src/pg-sync-bridge-manager.ts index 73953d473d..c7ddfd1a86 100644 --- a/packages/agents-server/src/pg-sync-bridge-manager.ts +++ b/packages/agents-server/src/pg-sync-bridge-manager.ts @@ -21,10 +21,6 @@ import type { ShapeStreamInterface, } from '@electric-sql/client' -export const PG_SYNC_ELECTRIC_SHAPE_URL = - process.env.ELECTRIC_AGENTS_PG_SYNC_ELECTRIC_URL ?? - `http://localhost:3000/v1/shape` - type PgSyncOperation = `insert` | `update` | `delete` type WakeEvaluator = ( sourceUrl: string, @@ -36,7 +32,6 @@ export type PgSyncResolvedSource = { } export interface PgSyncBridgeManagerOptions { - url?: string retry?: { initialDelayMs?: number maxDelayMs?: number @@ -425,7 +420,6 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { private bridges = new Map() private starting = new Map>() - private readonly url: string private readonly retry: Required< NonNullable > @@ -436,7 +430,6 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { private registry?: PostgresRegistry, options: PgSyncBridgeManagerOptions = {} ) { - this.url = options.url ?? PG_SYNC_ELECTRIC_SHAPE_URL this.retry = { initialDelayMs: options.retry?.initialDelayMs ?? DEFAULT_RETRY_INITIAL_DELAY_MS, @@ -541,7 +534,12 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { } private resolveSource(options: CanonicalPgSyncConfig): PgSyncResolvedSource { - return { url: options.url ?? this.url } + if (!options.url) { + throw new Error( + `pgSync source url is required; no server default is configured` + ) + } + return { url: options.url } } async stop(): Promise { diff --git a/packages/agents-server/src/routing/pg-sync-router.ts b/packages/agents-server/src/routing/pg-sync-router.ts index ed4be80b16..c4f6f3dfd8 100644 --- a/packages/agents-server/src/routing/pg-sync-router.ts +++ b/packages/agents-server/src/routing/pg-sync-router.ts @@ -16,7 +16,7 @@ import type { RouterType } from 'itty-router' import type { TenantContext } from './context.js' const pgSyncOptionsSchema = Type.Object({ - url: Type.Optional(Type.String()), + url: Type.String(), table: Type.String(), columns: Type.Optional(Type.Array(Type.String())), where: Type.Optional(Type.String()), @@ -72,6 +72,10 @@ async function registerPgSync( ): Promise { const { options, metadata } = routeBody(request) + if (options.url.trim() === ``) { + return apiError(400, ErrCodeInvalidRequest, `pgSync url must be non-empty`) + } + if (options.table.trim() === ``) { return apiError( 400, diff --git a/packages/agents-server/test/pg-sync-bridge-manager.test.ts b/packages/agents-server/test/pg-sync-bridge-manager.test.ts index 007d42075d..60e1a5cb76 100644 --- a/packages/agents-server/test/pg-sync-bridge-manager.test.ts +++ b/packages/agents-server/test/pg-sync-bridge-manager.test.ts @@ -7,7 +7,6 @@ import { buildElectricShapeParams, pgSyncMessageToDurableEvent, PgSyncBridgeManager, - PG_SYNC_ELECTRIC_SHAPE_URL, } from '../src/pg-sync-bridge-manager' const { mockState } = vi.hoisted(() => ({ @@ -57,6 +56,8 @@ vi.mock(`@durable-streams/client`, () => ({ }, })) +const SHAPE_URL = `https://electric.example/v1/shape` + beforeEach(() => { mockState.callbacks = [] mockState.constructedOptions = [] @@ -86,7 +87,7 @@ describe(`pg-sync bridge helpers`, () => { }) it(`converts insert/update/delete messages with stable keys`, () => { - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const insert = pgSyncMessageToDurableEvent( { headers: { operation: `insert`, offset: `1_0` }, @@ -116,7 +117,7 @@ describe(`pg-sync bridge helpers`, () => { }) it(`rejects messages without stable string offsets`, () => { - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } expect( pgSyncMessageToDurableEvent( @@ -137,7 +138,7 @@ describe(`pg-sync bridge helpers`, () => { }) it(`converts BigInt values to strings so durable events are JSON serializable`, () => { - const options = { table: `entities` } + const options = { url: SHAPE_URL, table: `entities` } const event = pgSyncMessageToDurableEvent( { headers: { operation: `insert`, offset: `12_0` }, @@ -155,6 +156,17 @@ describe(`pg-sync bridge helpers`, () => { }) describe(`PgSyncBridgeManager`, () => { + it(`requires a source URL at registration time`, async () => { + const manager = new PgSyncBridgeManager({ + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any) + + await expect(manager.register({ table: `todos` })).rejects.toThrow( + /pgSync source url is required/ + ) + }) + it(`starts one stream per sourceRef and appends change events`, async () => { const streamClient = { baseUrl: `http://durable`, @@ -162,13 +174,13 @@ describe(`PgSyncBridgeManager`, () => { } const manager = new PgSyncBridgeManager(streamClient as any) - await manager.register({ table: `todos` }) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) expect(streamClient.ensure).toHaveBeenCalledTimes(2) expect(mockState.constructedOptions).toHaveLength(1) expect(mockState.constructedOptions[0]).toMatchObject({ - url: PG_SYNC_ELECTRIC_SHAPE_URL, + url: SHAPE_URL, params: { table: `todos` }, offset: `now`, log: `changes_only`, @@ -193,7 +205,7 @@ describe(`PgSyncBridgeManager`, () => { } const manager = new PgSyncBridgeManager(streamClient as any, evaluateWakes) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([ { headers: { operation: `insert`, offset: `1_0` }, value: { id: 1 } }, { headers: { control: `up-to-date` } }, @@ -216,7 +228,7 @@ describe(`PgSyncBridgeManager`, () => { ensure: vi.fn(async () => undefined), } const manager = new PgSyncBridgeManager(streamClient as any, evaluateWakes) - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) await manager.register(options) @@ -250,7 +262,7 @@ describe(`PgSyncBridgeManager`, () => { undefined, registry as any ) - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) const result = await manager.register(options) @@ -279,7 +291,11 @@ describe(`PgSyncBridgeManager`, () => { undefined, registry as any ) - const options = { table: `todos`, params: { b: `2`, a: `1` } } + const options = { + url: SHAPE_URL, + table: `todos`, + params: { b: `2`, a: `1` }, + } const sourceRef = sourceRefForPgSync(options) await manager.register(options) @@ -287,6 +303,7 @@ describe(`PgSyncBridgeManager`, () => { sourceRef, streamUrl: getPgSyncStreamPath(sourceRef), options: { + url: SHAPE_URL, table: `todos`, params: { a: `1`, b: `2` }, replica: `default`, @@ -323,7 +340,7 @@ describe(`PgSyncBridgeManager`, () => { registry as any ) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) registry.updatePgSyncBridgeCursor.mockClear() mockState.appendError = new Error(`append failed`) @@ -336,7 +353,7 @@ describe(`PgSyncBridgeManager`, () => { }) it(`startup resumes existing pgSync bridges from stored cursor`, async () => { - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) const registry = { listPgSyncBridges: vi.fn(async () => [ @@ -370,7 +387,7 @@ describe(`PgSyncBridgeManager`, () => { }) it(`invalid stored shape cursor falls back to bootstrap and clears cursor`, async () => { - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) const registry = { listPgSyncBridges: vi.fn(async () => [ @@ -404,7 +421,7 @@ describe(`PgSyncBridgeManager`, () => { }) it(`must-refetch clears persisted cursor and restarts bootstrap`, async () => { - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) const registry = { upsertPgSyncBridge: vi.fn(async (row) => ({ ...row })), @@ -437,7 +454,7 @@ describe(`PgSyncBridgeManager`, () => { baseUrl: `http://durable`, ensure: vi.fn(async () => undefined), } as any) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([{ headers: { control: `must-refetch` } }]) @@ -488,7 +505,7 @@ describe(`external review red tests`, () => { undefined, registry as any ) - await first.register({ table: `todos` }) + await first.register({ url: SHAPE_URL, table: `todos` }) mockState.streams[0]!.shapeHandle = `shape-a` mockState.streams[0]!.lastOffset = `1_0` await mockState.callbacks[0]!([ @@ -528,7 +545,7 @@ describe(`external review red tests`, () => { registry as any ) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) mockState.streams[0]!.shapeHandle = `shape-a` mockState.streams[0]!.lastOffset = `2_0` @@ -547,14 +564,14 @@ describe(`external review red tests`, () => { expect( pgSyncMessageToDurableEvent( { headers: { operation: `insert` }, value: { id: 1 } } as any, - { table: `todos` } + { url: SHAPE_URL, table: `todos` } ) ).toBeNull() }) }) describe(`pg-sync production hardening`, () => { - it(`uses configured URL and forwards request metadata as shape params`, async () => { + it(`uses the source URL from registration options and forwards request metadata as shape params`, async () => { const manager = new PgSyncBridgeManager( { baseUrl: `http://durable`, @@ -563,13 +580,12 @@ describe(`pg-sync production hardening`, () => { undefined, undefined, { - url: `https://electric.example/v1/shape`, retry: { initialDelayMs: 0, maxDelayMs: 0 }, } ) await manager.register( - { table: `todos` }, + { url: SHAPE_URL, table: `todos` }, { tenantId: `tenant-a`, principalKind: `agent`, @@ -585,7 +601,7 @@ describe(`pg-sync production hardening`, () => { ) expect(mockState.constructedOptions[0]).toMatchObject({ - url: `https://electric.example/v1/shape`, + url: SHAPE_URL, params: { table: `todos`, replica: `default`, @@ -624,7 +640,7 @@ describe(`pg-sync production hardening`, () => { } ) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) mockState.appendError = new Error(`append failed`) await mockState.callbacks[0]!([ diff --git a/packages/agents/src/tools/observe-pg-sync.ts b/packages/agents/src/tools/observe-pg-sync.ts index 4bb7ee3989..c37da3f791 100644 --- a/packages/agents/src/tools/observe-pg-sync.ts +++ b/packages/agents/src/tools/observe-pg-sync.ts @@ -27,11 +27,9 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { label: `Observe Postgres Sync`, description: `Observe an Electric Postgres shape stream and wake this agent when matching row changes arrive.`, parameters: Type.Object({ - url: Type.Optional( - Type.String({ - description: `Optional Electric shape endpoint URL. Defaults to the server-configured pg-sync URL.`, - }) - ), + url: Type.String({ + description: `Electric shape endpoint URL.`, + }), table: Type.String({ minLength: 1, pattern: `\\S`, @@ -66,7 +64,7 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { }), execute: async (_toolCallId, params) => { const args = params as { - url?: string + url: string table: string columns?: string[] where?: string @@ -78,6 +76,10 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { } } + if (typeof args.url !== `string` || args.url.trim().length === 0) { + throw new Error(`url is required`) + } + if (typeof args.table !== `string` || args.table.trim().length === 0) { throw new Error(`table is required`) } diff --git a/packages/agents/test/observe-pg-sync-tool.test.ts b/packages/agents/test/observe-pg-sync-tool.test.ts index fa7cbd0f87..69db4a2f6d 100644 --- a/packages/agents/test/observe-pg-sync-tool.test.ts +++ b/packages/agents/test/observe-pg-sync-tool.test.ts @@ -10,11 +10,15 @@ function textResult(result: unknown): any { } describe(`observe_pg_sync tool`, () => { - it(`validates required table`, async () => { + it(`validates required url and table`, async () => { const tool = createObservePgSyncTool({ observe: vi.fn() } as any) expect(Value.Check(tool.parameters as any, {})).toBe(false) - await expect(tool.execute(`call`, {})).rejects.toThrow(/table is required/) + expect(Value.Check(tool.parameters as any, { table: `todos` })).toBe(false) + await expect(tool.execute(`call`, {})).rejects.toThrow(/url is required/) + await expect(tool.execute(`call`, { table: `todos` })).rejects.toThrow( + /url is required/ + ) }) it(`rejects invalid ops and unsupported timeoutMs when schema validates`, () => { @@ -39,6 +43,7 @@ describe(`observe_pg_sync tool`, () => { const tool = createObservePgSyncTool({ observe } as any) await tool.execute(`call`, { + url: `http://localhost:30000/v1/shape`, table: `todos`, columns: [`id`, `text`], where: `priority = $1`, @@ -48,6 +53,7 @@ describe(`observe_pg_sync tool`, () => { }) const expectedSource = pgSync({ + url: `http://localhost:30000/v1/shape`, table: `todos`, columns: [`id`, `text`], where: `priority = $1`, @@ -78,7 +84,11 @@ describe(`observe_pg_sync tool`, () => { const tool = createObservePgSyncTool({ observe } as any) const result = textResult( - await tool.execute(`call`, { table: `todos`, wake: { debounceMs: 0 } }) + await tool.execute(`call`, { + url: `http://localhost:30000/v1/shape`, + table: `todos`, + wake: { debounceMs: 0 }, + }) ) expect(observe).toHaveBeenCalledWith(expect.anything(), { @@ -92,9 +102,16 @@ describe(`observe_pg_sync tool`, () => { const tool = createObservePgSyncTool({ observe } as any) const result = textResult( - await tool.execute(`call`, { table: `todos`, wake: { ops: [`delete`] } }) + await tool.execute(`call`, { + url: `http://localhost:30000/v1/shape`, + table: `todos`, + wake: { ops: [`delete`] }, + }) ) - const source = pgSync({ table: `todos` }) + const source = pgSync({ + url: `http://localhost:30000/v1/shape`, + table: `todos`, + }) expect(result).toEqual({ sourceRef: source.sourceRef, @@ -107,7 +124,12 @@ describe(`observe_pg_sync tool`, () => { const observe = vi.fn(async () => {}) const tool = createObservePgSyncTool({ observe } as any) - const result = textResult(await tool.execute(`call`, { table: `todos` })) + const result = textResult( + await tool.execute(`call`, { + url: `http://localhost:30000/v1/shape`, + table: `todos`, + }) + ) expect(observe).toHaveBeenCalledWith(expect.anything(), { wake: { on: `change` }, From 2e2563780af8d6b2fd36355c286c7b3c0f6c2510 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 14:05:27 -0600 Subject: [PATCH 2/8] Fix pg sync observe source identity --- packages/agents-runtime/src/process-wake.ts | 15 ++++++++ packages/agents-runtime/src/setup-context.ts | 1 + packages/agents-runtime/src/types.ts | 1 + .../agents-runtime/test/process-wake.test.ts | 12 ++++++ packages/agents/src/tools/observe-pg-sync.ts | 12 ++++-- .../agents/test/observe-pg-sync-tool.test.ts | 38 +++++++++++++------ 6 files changed, 64 insertions(+), 15 deletions(-) diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index c419a3114a..3549c3191a 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -1748,10 +1748,25 @@ export async function processWake( wakeId, } ) + const originalEntry = source.toManifestEntry() as Record< + string, + unknown + > observedSource = { ...source, sourceRef: registeredPgSync.sourceRef, streamUrl: registeredPgSync.streamUrl, + toManifestEntry() { + return { + ...originalEntry, + key: `source:pgSync:${registeredPgSync!.sourceRef}`, + sourceRef: registeredPgSync!.sourceRef, + config: { + ...((originalEntry.config as Record) ?? {}), + streamUrl: registeredPgSync!.streamUrl, + }, + } as unknown as ReturnType + }, } } diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index bf375db8ec..e4187c3ba5 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -911,6 +911,7 @@ export function createSetupContext( return { sourceType: source.sourceType, sourceRef: source.sourceRef, + streamUrl: source.streamUrl, db: sourceDb, events, } diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 7f07f57fad..dbda6794a7 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -618,6 +618,7 @@ export interface SourceWakeConfig { export interface ObservationHandle { sourceType: string sourceRef: string + streamUrl?: string db?: EntityStreamDB | ObservationStreamDB events: Array } diff --git a/packages/agents-runtime/test/process-wake.test.ts b/packages/agents-runtime/test/process-wake.test.ts index 10500600b0..f2f965dff6 100644 --- a/packages/agents-runtime/test/process-wake.test.ts +++ b/packages/agents-runtime/test/process-wake.test.ts @@ -1671,6 +1671,18 @@ describe(`processWake`, () => { wakeId: `wake-abc`, }, }) + const wakeCall = fetchMock.mock.calls.find( + ([url, opts]) => + String(url).includes(`/_electric/wake`) && + !String(url).includes(`wake-abc`) && + (opts as RequestInit | undefined)?.method === `POST` + ) + const wakeBody = JSON.parse(wakeCall![1]!.body as string) as Record< + string, + unknown + > + expect(wakeBody.sourceUrl).toBe(`/_electric/pg-sync/default/pg-source-1`) + expect(wakeBody.manifestKey).toBe(`source:pgSync:pg-source-1`) expect(fetchMock.mock.invocationCallOrder[pgSyncCallIndex]).toBeLessThan( mockSourceDbPreload.mock.invocationCallOrder[0] ) diff --git a/packages/agents/src/tools/observe-pg-sync.ts b/packages/agents/src/tools/observe-pg-sync.ts index c37da3f791..2d212d978f 100644 --- a/packages/agents/src/tools/observe-pg-sync.ts +++ b/packages/agents/src/tools/observe-pg-sync.ts @@ -1,5 +1,9 @@ import { Type } from '@sinclair/typebox' -import { pgSync, type HandlerContext } from '@electric-ax/agents-runtime' +import { + getPgSyncStreamPath, + pgSync, + type HandlerContext, +} from '@electric-ax/agents-runtime' import type { AgentTool } from '@mariozechner/pi-agent-core' function asToolResult(value: unknown) { @@ -100,11 +104,11 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { : {}), } - await ctx.observe(source, { wake }) + const handle = await ctx.observe(source, { wake }) return asToolResult({ - sourceRef: source.sourceRef, - streamUrl: source.streamUrl, + sourceRef: handle.sourceRef, + streamUrl: handle.streamUrl ?? getPgSyncStreamPath(handle.sourceRef), wake, }) }, diff --git a/packages/agents/test/observe-pg-sync-tool.test.ts b/packages/agents/test/observe-pg-sync-tool.test.ts index 69db4a2f6d..ba01ff0b7e 100644 --- a/packages/agents/test/observe-pg-sync-tool.test.ts +++ b/packages/agents/test/observe-pg-sync-tool.test.ts @@ -39,7 +39,12 @@ describe(`observe_pg_sync tool`, () => { }) it(`calls ctx.observe with pgSync source and wake options`, async () => { - const observe = vi.fn(async () => {}) + const observe = vi.fn(async () => ({ + sourceType: `pgSync`, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, + events: [], + })) const tool = createObservePgSyncTool({ observe } as any) await tool.execute(`call`, { @@ -80,7 +85,12 @@ describe(`observe_pg_sync tool`, () => { }) it(`preserves debounceMs: 0`, async () => { - const observe = vi.fn(async () => {}) + const observe = vi.fn(async () => ({ + sourceType: `pgSync`, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, + events: [], + })) const tool = createObservePgSyncTool({ observe } as any) const result = textResult( @@ -97,8 +107,13 @@ describe(`observe_pg_sync tool`, () => { expect(result.wake).toEqual({ on: `change`, debounceMs: 0 }) }) - it(`returns sourceRef, streamUrl, and wake`, async () => { - const observe = vi.fn(async () => {}) + it(`returns the observed sourceRef, streamUrl, and wake`, async () => { + const observe = vi.fn(async () => ({ + sourceType: `pgSync`, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, + events: [], + })) const tool = createObservePgSyncTool({ observe } as any) const result = textResult( @@ -108,20 +123,21 @@ describe(`observe_pg_sync tool`, () => { wake: { ops: [`delete`] }, }) ) - const source = pgSync({ - url: `http://localhost:30000/v1/shape`, - table: `todos`, - }) expect(result).toEqual({ - sourceRef: source.sourceRef, - streamUrl: source.streamUrl, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, wake: { on: `change`, ops: [`delete`] }, }) }) it(`defaults wake when wake.ops is omitted`, async () => { - const observe = vi.fn(async () => {}) + const observe = vi.fn(async () => ({ + sourceType: `pgSync`, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, + events: [], + })) const tool = createObservePgSyncTool({ observe } as any) const result = textResult( From 5ea1d005e30736623c6b3fa8c1393d9617209fc6 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 14:52:08 -0600 Subject: [PATCH 3/8] Handle Electric pg sync lsn offsets --- .../src/pg-sync-bridge-manager.ts | 24 +++++++++++++++++-- .../test/pg-sync-bridge-manager.test.ts | 24 ++++++++++++++++--- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/packages/agents-server/src/pg-sync-bridge-manager.ts b/packages/agents-server/src/pg-sync-bridge-manager.ts index c7ddfd1a86..d73cbc0bff 100644 --- a/packages/agents-server/src/pg-sync-bridge-manager.ts +++ b/packages/agents-server/src/pg-sync-bridge-manager.ts @@ -47,6 +47,8 @@ type PgSyncChangeMessage = { headers: Record & { operation?: PgSyncOperation | string offset?: unknown + lsn?: unknown + op_position?: unknown key?: unknown rowKey?: unknown } @@ -163,6 +165,24 @@ function rowKeyForMessage(message: PgSyncChangeMessage): string | undefined { return candidate === undefined ? undefined : stableJson(candidate) } +function offsetForMessage(message: PgSyncChangeMessage): string | null { + const offset = message.headers.offset + if (typeof offset === `string` && offset.length > 0) { + return offset + } + + const lsn = message.headers.lsn + const opPosition = message.headers.op_position + if ( + (typeof lsn === `string` || typeof lsn === `number`) && + (typeof opPosition === `string` || typeof opPosition === `number`) + ) { + return `${lsn}_${opPosition}` + } + + return null +} + export function pgSyncMessageToDurableEvent( message: PgSyncChangeMessage, optionsOrSourceRef: PgSyncOptions | string @@ -185,8 +205,8 @@ export function pgSyncMessageToDurableEvent( ? optionsOrSourceRef : sourceRefForPgSync(optionsOrSourceRef) const rowKey = rowKeyForMessage(message) - const offset = message.headers.offset - if (typeof offset !== `string` || offset.length === 0) return null + const offset = offsetForMessage(message) + if (!offset) return null const messageKeyPart = offset const messageKey = `${sourceRef}:${operation}:${messageKeyPart}` const timestamp = new Date().toISOString() diff --git a/packages/agents-server/test/pg-sync-bridge-manager.test.ts b/packages/agents-server/test/pg-sync-bridge-manager.test.ts index 60e1a5cb76..b16cfa79f4 100644 --- a/packages/agents-server/test/pg-sync-bridge-manager.test.ts +++ b/packages/agents-server/test/pg-sync-bridge-manager.test.ts @@ -90,7 +90,7 @@ describe(`pg-sync bridge helpers`, () => { const options = { url: SHAPE_URL, table: `todos` } const insert = pgSyncMessageToDurableEvent( { - headers: { operation: `insert`, offset: `1_0` }, + headers: { operation: `insert`, lsn: `1`, op_position: 0 }, value: { id: 1, text: `a` }, } as any, options @@ -116,7 +116,22 @@ describe(`pg-sync bridge helpers`, () => { expect(del.value.rowKey).toBe(`1`) }) - it(`rejects messages without stable string offsets`, () => { + it(`derives an offset from lsn and op_position when Electric omits headers.offset`, () => { + const options = { url: SHAPE_URL, table: `todos` } + + const event = pgSyncMessageToDurableEvent( + { + headers: { operation: `insert`, lsn: `28517568`, op_position: 0 }, + value: { id: 32, text: `testing` }, + } as any, + options + )! + + expect(event.key).toBe(`${sourceRefForPgSync(options)}:insert:28517568_0`) + expect(event.value.offset).toBe(`28517568_0`) + }) + + it(`rejects messages without stable offsets or lsn/op_position`, () => { const options = { url: SHAPE_URL, table: `todos` } expect( @@ -131,7 +146,10 @@ describe(`pg-sync bridge helpers`, () => { ).toBeNull() expect( pgSyncMessageToDurableEvent( - { headers: { operation: `insert` }, value: { id: 1 } } as any, + { + headers: { operation: `insert`, lsn: `28517568` }, + value: { id: 1 }, + } as any, options ) ).toBeNull() From 81597477f94276e8a58c715608949209e7026056 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 15:04:54 -0600 Subject: [PATCH 4/8] Copy pg sync shape messages directly --- .../src/pg-sync-bridge-manager.ts | 71 +++++-------------- packages/agents-server/src/wake-registry.ts | 2 + .../test/pg-sync-bridge-manager.test.ts | 68 ++++++++++-------- 3 files changed, 61 insertions(+), 80 deletions(-) diff --git a/packages/agents-server/src/pg-sync-bridge-manager.ts b/packages/agents-server/src/pg-sync-bridge-manager.ts index d73cbc0bff..e243ce64e5 100644 --- a/packages/agents-server/src/pg-sync-bridge-manager.ts +++ b/packages/agents-server/src/pg-sync-bridge-manager.ts @@ -46,9 +46,6 @@ const DEFAULT_RETRY_MAX_DELAY_MS = 30_000 type PgSyncChangeMessage = { headers: Record & { operation?: PgSyncOperation | string - offset?: unknown - lsn?: unknown - op_position?: unknown key?: unknown rowKey?: unknown } @@ -165,74 +162,44 @@ function rowKeyForMessage(message: PgSyncChangeMessage): string | undefined { return candidate === undefined ? undefined : stableJson(candidate) } -function offsetForMessage(message: PgSyncChangeMessage): string | null { - const offset = message.headers.offset - if (typeof offset === `string` && offset.length > 0) { - return offset - } - - const lsn = message.headers.lsn - const opPosition = message.headers.op_position - if ( - (typeof lsn === `string` || typeof lsn === `number`) && - (typeof opPosition === `string` || typeof opPosition === `number`) - ) { - return `${lsn}_${opPosition}` - } - - return null -} - export function pgSyncMessageToDurableEvent( message: PgSyncChangeMessage, - optionsOrSourceRef: PgSyncOptions | string + _optionsOrSourceRef: PgSyncOptions | string ): { type: `pg_sync_change` key: string value: Record - headers: { operation: PgSyncOperation; timestamp: string } + headers: Record & { operation: PgSyncOperation } } | null { const operation = message.headers.operation if ( operation !== `insert` && operation !== `update` && operation !== `delete` - ) + ) { return null + } + + const key = + message.key ?? + (typeof message.headers.key === `string` + ? message.headers.key + : undefined) ?? + rowKeyForMessage(message) + if (!key) { + return null + } - const sourceRef = - typeof optionsOrSourceRef === `string` - ? optionsOrSourceRef - : sourceRefForPgSync(optionsOrSourceRef) - const rowKey = rowKeyForMessage(message) - const offset = offsetForMessage(message) - if (!offset) return null - const messageKeyPart = offset - const messageKey = `${sourceRef}:${operation}:${messageKeyPart}` - const timestamp = new Date().toISOString() - const oldValue = message.old_value - const safeValue = jsonSafe(message.value) - const safeOldValue = jsonSafe(oldValue) - const safeHeaders = jsonSafe(message.headers) + const safeMessage = jsonSafe(message) as Record return { type: `pg_sync_change`, - key: messageKey, - value: { - key: messageKey, - table: - typeof optionsOrSourceRef === `string` - ? undefined - : optionsOrSourceRef.table, + key, + value: safeMessage, + headers: { + ...(jsonSafe(message.headers) as Record), operation, - ...(rowKey !== undefined ? { rowKey } : {}), - ...(message.value !== undefined ? { value: safeValue } : {}), - ...(oldValue !== undefined ? { oldValue: safeOldValue } : {}), - headers: safeHeaders, - ...(typeof offset === `string` ? { offset } : {}), - receivedAt: timestamp, }, - headers: { operation, timestamp }, } } diff --git a/packages/agents-server/src/wake-registry.ts b/packages/agents-server/src/wake-registry.ts index 65be089e83..ff8ef6e75b 100644 --- a/packages/agents-server/src/wake-registry.ts +++ b/packages/agents-server/src/wake-registry.ts @@ -967,6 +967,8 @@ export class WakeRegistry { } if (value && `oldValue` in value) { change.oldValue = value.oldValue + } else if (value && `old_value` in value) { + change.oldValue = value.old_value } if (eventType === `inbox`) { diff --git a/packages/agents-server/test/pg-sync-bridge-manager.test.ts b/packages/agents-server/test/pg-sync-bridge-manager.test.ts index b16cfa79f4..7f57de2023 100644 --- a/packages/agents-server/test/pg-sync-bridge-manager.test.ts +++ b/packages/agents-server/test/pg-sync-bridge-manager.test.ts @@ -86,10 +86,11 @@ describe(`pg-sync bridge helpers`, () => { }) }) - it(`converts insert/update/delete messages with stable keys`, () => { + it(`copies shape messages directly while preserving the row key`, () => { const options = { url: SHAPE_URL, table: `todos` } const insert = pgSyncMessageToDurableEvent( { + key: `"public"."todos"/"1"`, headers: { operation: `insert`, lsn: `1`, op_position: 0 }, value: { id: 1, text: `a` }, } as any, @@ -97,58 +98,61 @@ describe(`pg-sync bridge helpers`, () => { )! const update = pgSyncMessageToDurableEvent( { - headers: { operation: `update`, offset: `2_0` }, + key: `"public"."todos"/"1"`, + headers: { operation: `update` }, value: { id: 1, text: `b` }, + old_value: { id: 1, text: `a` }, } as any, options )! const del = pgSyncMessageToDurableEvent( { - headers: { operation: `delete`, offset: `3_0` }, - value: { id: 1 }, + key: `"public"."todos"/"1"`, + headers: { operation: `delete` }, + old_value: { id: 1 }, } as any, options )! - expect(insert.key).toBe(`${sourceRefForPgSync(options)}:insert:1_0`) + expect(insert.key).toBe(`"public"."todos"/"1"`) + expect(insert.value).toMatchObject({ + key: `"public"."todos"/"1"`, + value: { id: 1, text: `a` }, + }) expect(update.headers.operation).toBe(`update`) - expect(del.value.operation).toBe(`delete`) - expect(del.value.rowKey).toBe(`1`) + expect(update.value).toMatchObject({ + key: `"public"."todos"/"1"`, + value: { id: 1, text: `b` }, + old_value: { id: 1, text: `a` }, + }) + expect(del.value).toMatchObject({ + key: `"public"."todos"/"1"`, + old_value: { id: 1 }, + }) }) - it(`derives an offset from lsn and op_position when Electric omits headers.offset`, () => { + it(`falls back to row identity when Electric omits the top-level key`, () => { const options = { url: SHAPE_URL, table: `todos` } const event = pgSyncMessageToDurableEvent( { - headers: { operation: `insert`, lsn: `28517568`, op_position: 0 }, + headers: { operation: `insert` }, value: { id: 32, text: `testing` }, } as any, options )! - expect(event.key).toBe(`${sourceRefForPgSync(options)}:insert:28517568_0`) - expect(event.value.offset).toBe(`28517568_0`) + expect(event.key).toBe(`32`) }) - it(`rejects messages without stable offsets or lsn/op_position`, () => { + it(`rejects messages without a row key`, () => { const options = { url: SHAPE_URL, table: `todos` } expect( pgSyncMessageToDurableEvent( { - key: `shape-key-1`, headers: { operation: `insert` }, - value: { id: 1 }, - } as any, - options - ) - ).toBeNull() - expect( - pgSyncMessageToDurableEvent( - { - headers: { operation: `insert`, lsn: `28517568` }, - value: { id: 1 }, + value: { text: `missing id` }, } as any, options ) @@ -168,7 +172,7 @@ describe(`pg-sync bridge helpers`, () => { expect(JSON.stringify(event)).toContain(`"1"`) expect(event.value.value).toEqual({ id: `1`, nested: { count: `2` } }) - expect(event.value.oldValue).toEqual({ id: `0` }) + expect(event.value.old_value).toEqual({ id: `0` }) expect(event.value.headers).toEqual({ operation: `insert`, offset: `12_0` }) }) }) @@ -210,8 +214,12 @@ describe(`PgSyncBridgeManager`, () => { ]) expect(JSON.parse(mockState.appends[0]!)).toMatchObject({ type: `pg_sync_change`, + key: `1`, headers: { operation: `insert` }, - value: { table: `todos`, operation: `insert`, rowKey: `1` }, + value: { + headers: { operation: `insert` }, + value: { id: 1 }, + }, }) }) @@ -578,13 +586,17 @@ describe(`external review red tests`, () => { }) }) - it(`rejects pg-sync change messages without a stable per-change offset`, () => { + it(`accepts pg-sync change messages without a per-change offset when the row key is present`, () => { expect( pgSyncMessageToDurableEvent( - { headers: { operation: `insert` }, value: { id: 1 } } as any, + { + key: `"public"."todos"/"1"`, + headers: { operation: `insert` }, + value: { id: 1 }, + } as any, { url: SHAPE_URL, table: `todos` } ) - ).toBeNull() + ).toMatchObject({ key: `"public"."todos"/"1"` }) }) }) From 28e7befe9cad520d9ec6238495037d6ae7074bc8 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 19:18:47 -0600 Subject: [PATCH 5/8] Use registered pg sync stream URLs for wakes --- .../src/manifest-side-effects.ts | 3 +++ .../test/manifest-side-effects.test.ts | 27 ++++++++++++++++--- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/packages/agents-server/src/manifest-side-effects.ts b/packages/agents-server/src/manifest-side-effects.ts index 65c160e04a..1c8b8ab963 100644 --- a/packages/agents-server/src/manifest-side-effects.ts +++ b/packages/agents-server/src/manifest-side-effects.ts @@ -62,6 +62,9 @@ export function extractManifestSourceUrl( } if (manifest.sourceType === `pgSync`) { + if (typeof config?.streamUrl === `string`) { + return config.streamUrl + } return typeof manifest.sourceRef === `string` ? getPgSyncManifestStreamPath(manifest.sourceRef) : undefined diff --git a/packages/agents-server/test/manifest-side-effects.test.ts b/packages/agents-server/test/manifest-side-effects.test.ts index 6a6ccb91c2..4469f7a232 100644 --- a/packages/agents-server/test/manifest-side-effects.test.ts +++ b/packages/agents-server/test/manifest-side-effects.test.ts @@ -51,7 +51,22 @@ describe(`manifest side effects`, () => { }) }) - it(`maps pgSync source manifest sourceRef to pg-sync stream path`, () => { + it(`prefers pgSync streamUrl from manifest config when present`, () => { + const sourceRef = `pg_abc123` + + expect( + extractManifestSourceUrl({ + kind: `source`, + sourceType: `pgSync`, + sourceRef, + config: { + streamUrl: `/_electric/pg-sync/default/${sourceRef}`, + }, + }) + ).toBe(`/_electric/pg-sync/default/${sourceRef}`) + }) + + it(`falls back to pgSync sourceRef-derived stream path`, () => { const sourceRef = `pg_abc123` expect( @@ -71,6 +86,9 @@ describe(`manifest side effects`, () => { kind: `source`, sourceType: `pgSync`, sourceRef, + config: { + streamUrl: `/_electric/pg-sync/default/${sourceRef}`, + }, wake: { on: `change`, ops: [`delete`] }, }, `source:pgSync:${sourceRef}` @@ -78,7 +96,7 @@ describe(`manifest side effects`, () => { expect(registration).toEqual({ subscriberUrl: `/parent/p1`, - sourceUrl: getPgSyncStreamPath(sourceRef), + sourceUrl: `/_electric/pg-sync/default/${sourceRef}`, condition: { on: `change`, ops: [`delete`], @@ -107,6 +125,9 @@ describe(`manifest side effects`, () => { kind: `source`, sourceType: `pgSync`, sourceRef, + config: { + streamUrl: `/_electric/pg-sync/default/${sourceRef}`, + }, wake: { on: `change`, collections: [`pg_sync_change`], @@ -118,7 +139,7 @@ describe(`manifest side effects`, () => { expect(registration).toEqual({ subscriberUrl: `/parent/p1`, - sourceUrl: getPgSyncStreamPath(sourceRef), + sourceUrl: `/_electric/pg-sync/default/${sourceRef}`, condition: { on: `change`, collections: [`pg_sync_change`], From d0c1bf592f2ba7f354ddc8ead2b660f27fe0bdd2 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 21:07:09 -0600 Subject: [PATCH 6/8] Stabilize pg sync source identity and probe sources at registration Exclude per-request metadata (wakeId, principal, tenant) from the sourceRef hash so re-registrations reuse the same bridge and stream, and fetch the shape log once before registering so bad URLs fail the registration with Electric's error instead of dying silently in the bridge retry loop. Add Horton prompt guidance for the now-required shape URL. Co-Authored-By: Claude Fable 5 --- .../agents-runtime/src/observation-sources.ts | 5 +- .../src/pg-sync-bridge-manager.ts | 90 +++++++++++- .../src/routing/pg-sync-router.ts | 4 + .../test/oss-server-router.test.ts | 8 +- .../test/pg-sync-bridge-manager.test.ts | 133 ++++++++++++++++++ .../agents-server/test/pg-sync-router.test.ts | 91 +++++++++--- packages/agents/src/agents/horton.ts | 9 +- packages/agents/src/tools/observe-pg-sync.ts | 4 +- 8 files changed, 321 insertions(+), 23 deletions(-) diff --git a/packages/agents-runtime/src/observation-sources.ts b/packages/agents-runtime/src/observation-sources.ts index 2bb4f2b649..cb8a9c1c84 100644 --- a/packages/agents-runtime/src/observation-sources.ts +++ b/packages/agents-runtime/src/observation-sources.ts @@ -107,7 +107,10 @@ export function canonicalPgSyncOptions( } export function sourceRefForPgSync(options: PgSyncOptions): string { - return hashString(JSON.stringify(canonicalPgSyncOptions(options))) + // metadata is per-request context (principal, wakeId, ...) and must not + // change the identity of the observed shape. + const { metadata: _metadata, ...identity } = canonicalPgSyncOptions(options) + return hashString(JSON.stringify(identity)) } export interface EntityObservationSource extends ObservationSource { diff --git a/packages/agents-server/src/pg-sync-bridge-manager.ts b/packages/agents-server/src/pg-sync-bridge-manager.ts index e243ce64e5..dfbfe48f19 100644 --- a/packages/agents-server/src/pg-sync-bridge-manager.ts +++ b/packages/agents-server/src/pg-sync-bridge-manager.ts @@ -38,10 +38,16 @@ export interface PgSyncBridgeManagerOptions { random?: () => number sleep?: (ms: number) => Promise } + fetchFn?: typeof fetch + probeTimeoutMs?: number } +/** Registration was rejected because the source itself is invalid — map to a 4xx. */ +export class PgSyncSourceValidationError extends Error {} + const DEFAULT_RETRY_INITIAL_DELAY_MS = 1_000 const DEFAULT_RETRY_MAX_DELAY_MS = 30_000 +const DEFAULT_PROBE_TIMEOUT_MS = 10_000 type PgSyncChangeMessage = { headers: Record & { @@ -120,6 +126,52 @@ export function buildElectricShapeParams( } } +/** + * Build the one-shot URL used to validate a shape source at registration + * time. Mirrors the query-param encoding of the Electric TS client: arrays + * are comma-joined, where-clause params become `params[n]`. + */ +export function buildShapeProbeUrl( + sourceUrl: string, + options: PgSyncOptions +): URL { + let url: URL + try { + url = new URL(sourceUrl) + } catch { + throw new PgSyncSourceValidationError( + `pgSync url "${sourceUrl}" is not a valid URL` + ) + } + if (url.protocol !== `http:` && url.protocol !== `https:`) { + throw new PgSyncSourceValidationError( + `pgSync url "${sourceUrl}" must be an HTTP(S) Electric shape endpoint, not a database connection string` + ) + } + for (const [key, value] of Object.entries( + buildElectricShapeParams(options) + )) { + if (value === undefined || value === null) continue + if (Array.isArray(value)) { + if (key === `params`) { + value.forEach((item, index) => + url.searchParams.set(`params[${index + 1}]`, String(item)) + ) + } else { + url.searchParams.set(key, value.join(`,`)) + } + } else if (typeof value === `object`) { + for (const [k, v] of Object.entries(value)) { + url.searchParams.set(`${key}[${k}]`, String(v)) + } + } else { + url.searchParams.set(key, String(value)) + } + } + url.searchParams.set(`offset`, `now`) + return url +} + function jsonSafe(value: unknown): unknown { if (typeof value === `bigint`) return value.toString() if (value === null || typeof value !== `object`) return value @@ -410,6 +462,8 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { private readonly retry: Required< NonNullable > + private readonly fetchFn?: typeof fetch + private readonly probeTimeoutMs: number constructor( private streamClient: StreamClient, @@ -417,6 +471,8 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { private registry?: PostgresRegistry, options: PgSyncBridgeManagerOptions = {} ) { + this.fetchFn = options.fetchFn + this.probeTimeoutMs = options.probeTimeoutMs ?? DEFAULT_PROBE_TIMEOUT_MS this.retry = { initialDelayMs: options.retry?.initialDelayMs ?? DEFAULT_RETRY_INITIAL_DELAY_MS, @@ -458,6 +514,9 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { const resolvedSource = this.resolveSource(canonicalOptions) const sourceRef = sourceRefForPgSync(canonicalOptions) const streamUrl = getPgSyncStreamPath(sourceRef, this.registry?.tenantId) + if (!this.bridges.has(sourceRef) && !this.starting.has(sourceRef)) { + await this.probeSource(resolvedSource, canonicalOptions) + } const row = await this.registry?.upsertPgSyncBridge({ sourceRef, options: canonicalOptions, @@ -522,13 +581,42 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { private resolveSource(options: CanonicalPgSyncConfig): PgSyncResolvedSource { if (!options.url) { - throw new Error( + throw new PgSyncSourceValidationError( `pgSync source url is required; no server default is configured` ) } return { url: options.url } } + /** + * One-shot fetch of the shape log before a bridge is created, so a bad + * URL or rejected shape fails the registration instead of dying silently + * in the bridge's retry loop. + */ + private async probeSource( + source: PgSyncResolvedSource, + options: CanonicalPgSyncConfig + ): Promise { + const probeUrl = buildShapeProbeUrl(source.url, options) + const fetchFn = this.fetchFn ?? globalThis.fetch + let response: Response + try { + response = await fetchFn(probeUrl, { + signal: AbortSignal.timeout(this.probeTimeoutMs), + }) + } catch (error) { + throw new PgSyncSourceValidationError( + `pgSync source at ${source.url} is unreachable: ${error instanceof Error ? error.message : String(error)}` + ) + } + if (!response.ok) { + const body = (await response.text().catch(() => ``)).slice(0, 500) + throw new PgSyncSourceValidationError( + `pgSync source at ${source.url} rejected the shape request (${response.status})${body ? `: ${body}` : ``}` + ) + } + } + async stop(): Promise { await Promise.allSettled(this.starting.values()) await Promise.all([...this.bridges.values()].map((bridge) => bridge.stop())) diff --git a/packages/agents-server/src/routing/pg-sync-router.ts b/packages/agents-server/src/routing/pg-sync-router.ts index c4f6f3dfd8..f4cbf56fb0 100644 --- a/packages/agents-server/src/routing/pg-sync-router.ts +++ b/packages/agents-server/src/routing/pg-sync-router.ts @@ -8,6 +8,7 @@ import type { } from '@electric-ax/agents-runtime' import { Type, type Static } from '@sinclair/typebox' import { Router, json } from 'itty-router' +import { PgSyncSourceValidationError } from '../pg-sync-bridge-manager.js' import { apiError } from '../electric-agents-http.js' import { ErrCodeInvalidRequest } from '../electric-agents-types.js' import { routeBody, withSchema } from './schema.js' @@ -108,6 +109,9 @@ async function registerPgSync( return json(result) } catch (error) { + if (error instanceof PgSyncSourceValidationError) { + return apiError(400, ErrCodeInvalidRequest, error.message) + } return apiError( 500, ErrCodeInvalidRequest, diff --git a/packages/agents-server/test/oss-server-router.test.ts b/packages/agents-server/test/oss-server-router.test.ts index 51c733d4f2..149abe7c6c 100644 --- a/packages/agents-server/test/oss-server-router.test.ts +++ b/packages/agents-server/test/oss-server-router.test.ts @@ -115,7 +115,12 @@ describe(`OSS server routing wrapper`, () => { new Request(`http://server/_electric/pg-sync/register`, { method: `POST`, headers: { 'content-type': `application/json` }, - body: JSON.stringify({ options: { table: `entities` } }), + body: JSON.stringify({ + options: { + url: `https://electric.example/v1/shape`, + table: `entities`, + }, + }), }), buildTenantContext({ pgSyncBridgeManager: pgSyncBridgeManager as any }) ) @@ -123,6 +128,7 @@ describe(`OSS server routing wrapper`, () => { expect(registerResponse.status).toBe(200) expect(pgSyncBridgeManager.register).toHaveBeenCalledWith( { + url: `https://electric.example/v1/shape`, table: `entities`, }, { diff --git a/packages/agents-server/test/pg-sync-bridge-manager.test.ts b/packages/agents-server/test/pg-sync-bridge-manager.test.ts index 7f57de2023..72b86bc4e8 100644 --- a/packages/agents-server/test/pg-sync-bridge-manager.test.ts +++ b/packages/agents-server/test/pg-sync-bridge-manager.test.ts @@ -64,6 +64,10 @@ beforeEach(() => { mockState.appends = [] mockState.appendError = null mockState.streams = [] + vi.stubGlobal( + `fetch`, + vi.fn(async () => new Response(`[]`, { status: 200 })) + ) }) describe(`pg-sync bridge helpers`, () => { @@ -649,6 +653,135 @@ describe(`pg-sync production hardening`, () => { }) }) + it(`probes the shape endpoint before starting a new bridge`, async () => { + const fetchFn = vi.fn( + async (_input: RequestInfo | URL) => new Response(`[]`, { status: 200 }) + ) + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + const options = { + url: SHAPE_URL, + table: `todos`, + columns: [`id`, `text`], + where: `done = $1`, + params: [`false`], + } + + await manager.register(options) + + expect(fetchFn).toHaveBeenCalledTimes(1) + const probeUrl = new URL(String(fetchFn.mock.calls[0]![0])) + expect(probeUrl.origin + probeUrl.pathname).toBe(SHAPE_URL) + expect(probeUrl.searchParams.get(`table`)).toBe(`todos`) + expect(probeUrl.searchParams.get(`columns`)).toBe(`id,text`) + expect(probeUrl.searchParams.get(`where`)).toBe(`done = $1`) + expect(probeUrl.searchParams.get(`params[1]`)).toBe(`false`) + expect(probeUrl.searchParams.get(`offset`)).toBe(`now`) + + await manager.register(options) + expect(fetchFn).toHaveBeenCalledTimes(1) + }) + + it(`fails registration when the shape endpoint rejects the probe`, async () => { + const ensure = vi.fn(async () => undefined) + const upsertPgSyncBridge = vi.fn(async () => undefined) + const fetchFn = vi.fn( + async () => new Response(`{"message":"table not found"}`, { status: 400 }) + ) + const manager = new PgSyncBridgeManager( + { baseUrl: `http://durable`, ensure } as any, + undefined, + { upsertPgSyncBridge } as any, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + + await expect( + manager.register({ url: SHAPE_URL, table: `missing` }) + ).rejects.toThrow(/400.*table not found/s) + expect(mockState.constructedOptions).toHaveLength(0) + expect(upsertPgSyncBridge).not.toHaveBeenCalled() + expect(ensure).not.toHaveBeenCalled() + }) + + it(`fails registration when the shape endpoint is unreachable`, async () => { + const fetchFn = vi.fn(async () => { + throw new Error(`ECONNREFUSED`) + }) + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + + await expect( + manager.register({ url: SHAPE_URL, table: `todos` }) + ).rejects.toThrow(/unreachable.*ECONNREFUSED/s) + expect(mockState.constructedOptions).toHaveLength(0) + }) + + it(`fails registration when the url is not an HTTP(S) endpoint`, async () => { + const fetchFn = vi.fn(async () => new Response(`[]`, { status: 200 })) + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + + await expect( + manager.register({ + url: `postgres://app:secret@localhost:5432/app`, + table: `todos`, + }) + ).rejects.toThrow(/HTTP/) + expect(fetchFn).not.toHaveBeenCalled() + }) + + it(`keeps the sourceRef stable across registrations with different request metadata`, async () => { + const ensure = vi.fn(async () => undefined) + const manager = new PgSyncBridgeManager( + { baseUrl: `http://durable`, ensure } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 } } + ) + const options = { url: SHAPE_URL, table: `todos` } + const metadata = { + tenantId: `tenant-a`, + principalKind: `agent`, + principalId: `horton`, + entityUrl: `/horton/abc`, + runtimeConsumerId: `runner-1`, + } + + const first = await manager.register(options, { + ...metadata, + wakeId: `wake-1`, + }) + const second = await manager.register(options, { + ...metadata, + wakeId: `wake-2`, + }) + + expect(first.sourceRef).toBe(sourceRefForPgSync(options)) + expect(second).toEqual(first) + expect(mockState.constructedOptions).toHaveLength(1) + }) + it(`backs off before recovery retries`, async () => { const sleeps: number[] = [] const manager = new PgSyncBridgeManager( diff --git a/packages/agents-server/test/pg-sync-router.test.ts b/packages/agents-server/test/pg-sync-router.test.ts index 585f1eae87..17a85209e4 100644 --- a/packages/agents-server/test/pg-sync-router.test.ts +++ b/packages/agents-server/test/pg-sync-router.test.ts @@ -4,6 +4,7 @@ import { sourceRefForPgSync, } from '@electric-ax/agents-runtime' import { globalRouter } from '../src/routing/global-router' +import { PgSyncSourceValidationError } from '../src/pg-sync-bridge-manager' import type { TenantContext } from '../src/routing/context' function request(method: string, path: string, body?: unknown): Request { @@ -50,13 +51,15 @@ function buildContext(overrides: Partial = {}): TenantContext { describe(`pg-sync routes`, () => { it(`registers a pg-sync source and returns its stream path`, async () => { const ctx = buildContext() - const expectedSourceRef = sourceRefForPgSync({ table: `todos` }) + const options = { + url: `https://electric.example/v1/shape`, + table: `todos`, + } + const expectedSourceRef = sourceRefForPgSync(options) const expectedStreamUrl = getPgSyncStreamPath(expectedSourceRef) const response = await globalRouter.fetch( - request(`POST`, `/_electric/pg-sync/register`, { - options: { table: `todos` }, - }), + request(`POST`, `/_electric/pg-sync/register`, { options }), ctx ) @@ -65,18 +68,13 @@ describe(`pg-sync routes`, () => { sourceRef: expectedSourceRef, streamUrl: expectedStreamUrl, }) - expect(ctx.pgSyncBridgeManager!.register).toHaveBeenCalledWith( - { - table: `todos`, - }, - { - tenantId: `tenant-test`, - principalKind: `user`, - principalId: `owner@example.com`, - principalKey: `user:owner@example.com`, - principalUrl: `/principal/user%3Aowner%40example.com`, - } - ) + expect(ctx.pgSyncBridgeManager!.register).toHaveBeenCalledWith(options, { + tenantId: `tenant-test`, + principalKind: `user`, + principalId: `owner@example.com`, + principalKey: `user:owner@example.com`, + principalUrl: `/principal/user%3Aowner%40example.com`, + }) }) it(`rejects an empty table`, async () => { @@ -84,7 +82,7 @@ describe(`pg-sync routes`, () => { const response = await globalRouter.fetch( request(`POST`, `/_electric/pg-sync/register`, { - options: { table: ` ` }, + options: { url: `https://electric.example/v1/shape`, table: ` ` }, }), ctx ) @@ -93,9 +91,68 @@ describe(`pg-sync routes`, () => { expect(ctx.pgSyncBridgeManager!.register).not.toHaveBeenCalled() }) + it(`rejects a missing or empty url`, async () => { + const ctx = buildContext() + + const missing = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { + options: { table: `todos` }, + }), + ctx + ) + const empty = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { + options: { url: ` `, table: `todos` }, + }), + ctx + ) + + expect(missing.status).toBe(400) + expect(empty.status).toBe(400) + expect(ctx.pgSyncBridgeManager!.register).not.toHaveBeenCalled() + }) + + it(`returns 400 when source validation fails and 500 for other errors`, async () => { + const invalidSource = buildContext({ + pgSyncBridgeManager: { + register: vi.fn(async () => { + throw new PgSyncSourceValidationError( + `pgSync source at https://nope.example rejected the shape request (404)` + ) + }), + stop: vi.fn(async () => undefined), + } as any, + }) + const internalFailure = buildContext({ + pgSyncBridgeManager: { + register: vi.fn(async () => { + throw new Error(`registry unavailable`) + }), + stop: vi.fn(async () => undefined), + } as any, + }) + const options = { url: `https://nope.example/v1/shape`, table: `todos` } + + const invalidResponse = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { options }), + invalidSource + ) + const failureResponse = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { options }), + internalFailure + ) + + expect(invalidResponse.status).toBe(400) + await expect(invalidResponse.text()).resolves.toContain( + `rejected the shape request (404)` + ) + expect(failureResponse.status).toBe(500) + }) + it(`computes the same sourceRef as the runtime pgSync helper`, async () => { const ctx = buildContext() const options = { + url: `https://electric.example/v1/shape`, table: `todos`, where: `priority = 'high'`, params: { b: `2`, a: `1` }, diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index 7c95f88b54..c74ed293f9 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -285,7 +285,7 @@ When a user opens with a greeting ("hi", "hello", "hey", etc.) or a broad statem - fetch_url: fetch and convert a URL to markdown - spawn_worker: dispatch a subagent for an isolated task - fork: spawn a child session that inherits this conversation's history up to the latest completed response. Same parent-ownership model as spawn_worker — when the fork's next run finishes, you'll wake with its response. -- observe_pg_sync: observe an Electric Postgres sync stream and wake on matching changes +- observe_pg_sync: observe an Electric Postgres sync stream and wake on matching changes (see "Observing Postgres tables") - send: send a message to an Electric Agent/entity. To schedule future work for yourself, call send with self: true and afterMs. ${eventSourceTools}${titleTool}${scheduleTools}${docsTools}${skillsTools} @@ -295,6 +295,13 @@ ${eventSourceTools}${titleTool}${scheduleTools}${docsTools}${skillsTools} - Use absolute paths or paths relative to the current working directory. ${modelGuidance}${docsGuidance}${skillsGuidance}${onboardingGuidance}${docsUrlGuidance} +# Observing Postgres tables +observe_pg_sync subscribes you to row changes in a Postgres table via an Electric shape stream: +- The \`url\` parameter is the HTTP(S) URL of an Electric shape endpoint (e.g. \`http://localhost:3000/v1/shape\`). It is NOT a \`postgres://\` connection string and there is no default — if the user hasn't given you the endpoint URL, ask for it. Never guess or invent one. +- Registration validates the endpoint by fetching the shape log first. If it fails, the error includes Electric's response — use it to correct the table name, where clause, or URL, or relay it to the user. +- Use \`where\` and \`columns\` to narrow the shape so you only wake on changes you care about; use \`wake.ops\` to filter by operation and \`wake.debounceMs\` to batch bursts. +- The observation persists across wakes — register it once, don't re-register on every wake. + # Risky actions Pause and confirm with the user before: - Destructive operations (deleting files, rm -rf, dropping data, force-pushing) diff --git a/packages/agents/src/tools/observe-pg-sync.ts b/packages/agents/src/tools/observe-pg-sync.ts index 2d212d978f..fd157ca75f 100644 --- a/packages/agents/src/tools/observe-pg-sync.ts +++ b/packages/agents/src/tools/observe-pg-sync.ts @@ -29,10 +29,10 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { return { name: `observe_pg_sync`, label: `Observe Postgres Sync`, - description: `Observe an Electric Postgres shape stream and wake this agent when matching row changes arrive.`, + description: `Observe an Electric Postgres shape stream and wake this agent when matching row changes arrive. Requires the HTTP(S) URL of an Electric shape endpoint — ask the user for it if you don't know it. Registration validates the endpoint up front and fails with Electric's error if the shape can't be fetched.`, parameters: Type.Object({ url: Type.String({ - description: `Electric shape endpoint URL.`, + description: `HTTP(S) URL of the Electric shape endpoint, e.g. http://localhost:3000/v1/shape. Not a postgres:// connection string. Never guess this — ask the user if it hasn't been provided.`, }), table: Type.String({ minLength: 1, From d572d13c5988ed750a5de8c0f5bb29a7e9d0690a Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 21:10:25 -0600 Subject: [PATCH 7/8] Add changeset for pg sync source URL changes Co-Authored-By: Claude Fable 5 --- .changeset/require-pg-sync-source-urls.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/require-pg-sync-source-urls.md diff --git a/.changeset/require-pg-sync-source-urls.md b/.changeset/require-pg-sync-source-urls.md new file mode 100644 index 0000000000..e22c50a723 --- /dev/null +++ b/.changeset/require-pg-sync-source-urls.md @@ -0,0 +1,7 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server': patch +'@electric-ax/agents': patch +--- + +Require an explicit Electric shape endpoint URL for pg-sync observations. Source identity is now derived from the shape options alone (not per-request metadata) so re-registrations reuse the same bridge and stream, and registration validates the endpoint by fetching the shape log up front, failing with Electric's error instead of retrying silently. From cd8c46750a87570b256df39d2b1b05b6a0ff19b1 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 11 Jun 2026 22:03:59 -0600 Subject: [PATCH 8/8] Require shape log headers in pg sync probe Electric returns 200 on non-shape paths like its root, so an ok status alone can't validate a source URL. The probe now requires the electric-handle header and suggests the /v1/shape path when missing. Co-Authored-By: Claude Fable 5 --- .../src/pg-sync-bridge-manager.ts | 10 +++++ .../test/pg-sync-bridge-manager.test.ts | 42 +++++++++++++++++-- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/packages/agents-server/src/pg-sync-bridge-manager.ts b/packages/agents-server/src/pg-sync-bridge-manager.ts index dfbfe48f19..07a8b95322 100644 --- a/packages/agents-server/src/pg-sync-bridge-manager.ts +++ b/packages/agents-server/src/pg-sync-bridge-manager.ts @@ -615,6 +615,16 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { `pgSync source at ${source.url} rejected the shape request (${response.status})${body ? `: ${body}` : ``}` ) } + // Electric answers 200 on paths that aren't the shape API (e.g. its + // root), so an ok status alone doesn't prove the URL is right. Real + // shape responses always carry the electric-handle header. + if (!response.headers.get(`electric-handle`)) { + const suggestion = new URL(source.url) + suggestion.pathname = `/v1/shape` + throw new PgSyncSourceValidationError( + `pgSync source at ${source.url} responded but is not a shape log (missing electric-handle header) — the Electric shape API is usually served at ${suggestion.origin}/v1/shape` + ) + } } async stop(): Promise { diff --git a/packages/agents-server/test/pg-sync-bridge-manager.test.ts b/packages/agents-server/test/pg-sync-bridge-manager.test.ts index 72b86bc4e8..6d644fa6ea 100644 --- a/packages/agents-server/test/pg-sync-bridge-manager.test.ts +++ b/packages/agents-server/test/pg-sync-bridge-manager.test.ts @@ -66,7 +66,13 @@ beforeEach(() => { mockState.streams = [] vi.stubGlobal( `fetch`, - vi.fn(async () => new Response(`[]`, { status: 200 })) + vi.fn( + async () => + new Response(`[]`, { + status: 200, + headers: { 'electric-handle': `handle-1` }, + }) + ) ) }) @@ -655,7 +661,11 @@ describe(`pg-sync production hardening`, () => { it(`probes the shape endpoint before starting a new bridge`, async () => { const fetchFn = vi.fn( - async (_input: RequestInfo | URL) => new Response(`[]`, { status: 200 }) + async (_input: RequestInfo | URL) => + new Response(`[]`, { + status: 200, + headers: { 'electric-handle': `handle-1` }, + }) ) const manager = new PgSyncBridgeManager( { @@ -730,8 +740,34 @@ describe(`pg-sync production hardening`, () => { expect(mockState.constructedOptions).toHaveLength(0) }) + it(`fails registration when the endpoint responds 200 but is not a shape log`, async () => { + // Electric answers 200 with an empty body on its root path, so a bare + // host URL passes an ok-check while the shape API lives at /v1/shape. + const fetchFn = vi.fn(async () => new Response(``, { status: 200 })) + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + + await expect( + manager.register({ url: `http://localhost:30000`, table: `todos` }) + ).rejects.toThrow(/not a shape log.*\/v1\/shape/s) + expect(mockState.constructedOptions).toHaveLength(0) + }) + it(`fails registration when the url is not an HTTP(S) endpoint`, async () => { - const fetchFn = vi.fn(async () => new Response(`[]`, { status: 200 })) + const fetchFn = vi.fn( + async () => + new Response(`[]`, { + status: 200, + headers: { 'electric-handle': `handle-1` }, + }) + ) const manager = new PgSyncBridgeManager( { baseUrl: `http://durable`,