diff --git a/.changeset/require-pg-sync-source-urls.md b/.changeset/require-pg-sync-source-urls.md new file mode 100644 index 0000000000..e22c50a723 --- /dev/null +++ b/.changeset/require-pg-sync-source-urls.md @@ -0,0 +1,7 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server': patch +'@electric-ax/agents': patch +--- + +Require an explicit Electric shape endpoint URL for pg-sync observations. Source identity is now derived from the shape options alone (not per-request metadata) so re-registrations reuse the same bridge and stream, and registration validates the endpoint by fetching the shape log up front, failing with Electric's error instead of retrying silently. diff --git a/packages/agents-runtime/src/observation-sources.ts b/packages/agents-runtime/src/observation-sources.ts index 2bb4f2b649..cb8a9c1c84 100644 --- a/packages/agents-runtime/src/observation-sources.ts +++ b/packages/agents-runtime/src/observation-sources.ts @@ -107,7 +107,10 @@ export function canonicalPgSyncOptions( } export function sourceRefForPgSync(options: PgSyncOptions): string { - return hashString(JSON.stringify(canonicalPgSyncOptions(options))) + // metadata is per-request context (principal, wakeId, ...) and must not + // change the identity of the observed shape. + const { metadata: _metadata, ...identity } = canonicalPgSyncOptions(options) + return hashString(JSON.stringify(identity)) } export interface EntityObservationSource extends ObservationSource { diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index c419a3114a..3549c3191a 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -1748,10 +1748,25 @@ export async function processWake( wakeId, } ) + const originalEntry = source.toManifestEntry() as Record< + string, + unknown + > observedSource = { ...source, sourceRef: registeredPgSync.sourceRef, streamUrl: registeredPgSync.streamUrl, + toManifestEntry() { + return { + ...originalEntry, + key: `source:pgSync:${registeredPgSync!.sourceRef}`, + sourceRef: registeredPgSync!.sourceRef, + config: { + ...((originalEntry.config as Record) ?? {}), + streamUrl: registeredPgSync!.streamUrl, + }, + } as unknown as ReturnType + }, } } diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index ce9edfff48..e4187c3ba5 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -886,10 +886,9 @@ export function createSetupContext( source.ensureStream.contentType ) } - const sourceStreamUrl = - source.sourceType === `pgSync` || !source.streamUrl.startsWith(`/`) - ? source.streamUrl - : appendPathToUrl(config.serverBaseUrl, source.streamUrl) + const sourceStreamUrl = source.streamUrl.startsWith(`/`) + ? appendPathToUrl(config.serverBaseUrl, source.streamUrl) + : source.streamUrl sourceDb = await wiring.createSourceDb( sourceStreamUrl, source.schema, @@ -912,6 +911,7 @@ export function createSetupContext( return { sourceType: source.sourceType, sourceRef: source.sourceRef, + streamUrl: source.streamUrl, db: sourceDb, events, } diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 7f07f57fad..dbda6794a7 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -618,6 +618,7 @@ export interface SourceWakeConfig { export interface ObservationHandle { sourceType: string sourceRef: string + streamUrl?: string db?: EntityStreamDB | ObservationStreamDB events: Array } diff --git a/packages/agents-runtime/test/process-wake.test.ts b/packages/agents-runtime/test/process-wake.test.ts index e2ccf280dc..f2f965dff6 100644 --- a/packages/agents-runtime/test/process-wake.test.ts +++ b/packages/agents-runtime/test/process-wake.test.ts @@ -1619,6 +1619,7 @@ describe(`processWake`, () => { it(`pgSync observe registers pgSync source before source DB preload`, async () => { const source = pgSync({ + url: `http://localhost:30000/v1/shape`, table: `todos`, where: `priority = $1`, params: [`high`], @@ -1640,7 +1641,7 @@ describe(`processWake`, () => { expect(mockCreateStreamDB).toHaveBeenCalledWith( expect.objectContaining({ streamOptions: expect.objectContaining({ - url: `/_electric/pg-sync/default/pg-source-1`, + url: `http://localhost:3000/_electric/pg-sync/default/pg-source-1`, contentType: `application/json`, }), state: expect.objectContaining({ @@ -1670,6 +1671,18 @@ describe(`processWake`, () => { wakeId: `wake-abc`, }, }) + const wakeCall = fetchMock.mock.calls.find( + ([url, opts]) => + String(url).includes(`/_electric/wake`) && + !String(url).includes(`wake-abc`) && + (opts as RequestInit | undefined)?.method === `POST` + ) + const wakeBody = JSON.parse(wakeCall![1]!.body as string) as Record< + string, + unknown + > + expect(wakeBody.sourceUrl).toBe(`/_electric/pg-sync/default/pg-source-1`) + expect(wakeBody.manifestKey).toBe(`source:pgSync:pg-source-1`) expect(fetchMock.mock.invocationCallOrder[pgSyncCallIndex]).toBeLessThan( mockSourceDbPreload.mock.invocationCallOrder[0] ) diff --git a/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts b/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts index 652d4342fc..a049090a90 100644 --- a/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts +++ b/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts @@ -26,6 +26,7 @@ describe(`runtime-server-client.registerPgSyncSource`, () => { }) const options = { + url: `http://localhost:30000/v1/shape`, table: `todos`, columns: [`id`, `text`], where: `priority = $1`, @@ -59,7 +60,10 @@ describe(`runtime-server-client.registerPgSyncSource`, () => { }) await expect( - client.registerPgSyncSource({ table: `todos` }) + client.registerPgSyncSource({ + url: `http://localhost:30000/v1/shape`, + table: `todos`, + }) ).rejects.toThrow(/registerPgSyncSource failed \(400\): bad table/) }) }) diff --git a/packages/agents-server/src/manifest-side-effects.ts b/packages/agents-server/src/manifest-side-effects.ts index 65c160e04a..1c8b8ab963 100644 --- a/packages/agents-server/src/manifest-side-effects.ts +++ b/packages/agents-server/src/manifest-side-effects.ts @@ -62,6 +62,9 @@ export function extractManifestSourceUrl( } if (manifest.sourceType === `pgSync`) { + if (typeof config?.streamUrl === `string`) { + return config.streamUrl + } return typeof manifest.sourceRef === `string` ? getPgSyncManifestStreamPath(manifest.sourceRef) : undefined diff --git a/packages/agents-server/src/pg-sync-bridge-manager.ts b/packages/agents-server/src/pg-sync-bridge-manager.ts index 73953d473d..07a8b95322 100644 --- a/packages/agents-server/src/pg-sync-bridge-manager.ts +++ b/packages/agents-server/src/pg-sync-bridge-manager.ts @@ -21,10 +21,6 @@ import type { ShapeStreamInterface, } from '@electric-sql/client' -export const PG_SYNC_ELECTRIC_SHAPE_URL = - process.env.ELECTRIC_AGENTS_PG_SYNC_ELECTRIC_URL ?? - `http://localhost:3000/v1/shape` - type PgSyncOperation = `insert` | `update` | `delete` type WakeEvaluator = ( sourceUrl: string, @@ -36,22 +32,26 @@ export type PgSyncResolvedSource = { } export interface PgSyncBridgeManagerOptions { - url?: string retry?: { initialDelayMs?: number maxDelayMs?: number random?: () => number sleep?: (ms: number) => Promise } + fetchFn?: typeof fetch + probeTimeoutMs?: number } +/** Registration was rejected because the source itself is invalid — map to a 4xx. */ +export class PgSyncSourceValidationError extends Error {} + const DEFAULT_RETRY_INITIAL_DELAY_MS = 1_000 const DEFAULT_RETRY_MAX_DELAY_MS = 30_000 +const DEFAULT_PROBE_TIMEOUT_MS = 10_000 type PgSyncChangeMessage = { headers: Record & { operation?: PgSyncOperation | string - offset?: unknown key?: unknown rowKey?: unknown } @@ -126,6 +126,52 @@ export function buildElectricShapeParams( } } +/** + * Build the one-shot URL used to validate a shape source at registration + * time. Mirrors the query-param encoding of the Electric TS client: arrays + * are comma-joined, where-clause params become `params[n]`. + */ +export function buildShapeProbeUrl( + sourceUrl: string, + options: PgSyncOptions +): URL { + let url: URL + try { + url = new URL(sourceUrl) + } catch { + throw new PgSyncSourceValidationError( + `pgSync url "${sourceUrl}" is not a valid URL` + ) + } + if (url.protocol !== `http:` && url.protocol !== `https:`) { + throw new PgSyncSourceValidationError( + `pgSync url "${sourceUrl}" must be an HTTP(S) Electric shape endpoint, not a database connection string` + ) + } + for (const [key, value] of Object.entries( + buildElectricShapeParams(options) + )) { + if (value === undefined || value === null) continue + if (Array.isArray(value)) { + if (key === `params`) { + value.forEach((item, index) => + url.searchParams.set(`params[${index + 1}]`, String(item)) + ) + } else { + url.searchParams.set(key, value.join(`,`)) + } + } else if (typeof value === `object`) { + for (const [k, v] of Object.entries(value)) { + url.searchParams.set(`${key}[${k}]`, String(v)) + } + } else { + url.searchParams.set(key, String(value)) + } + } + url.searchParams.set(`offset`, `now`) + return url +} + function jsonSafe(value: unknown): unknown { if (typeof value === `bigint`) return value.toString() if (value === null || typeof value !== `object`) return value @@ -170,54 +216,42 @@ function rowKeyForMessage(message: PgSyncChangeMessage): string | undefined { export function pgSyncMessageToDurableEvent( message: PgSyncChangeMessage, - optionsOrSourceRef: PgSyncOptions | string + _optionsOrSourceRef: PgSyncOptions | string ): { type: `pg_sync_change` key: string value: Record - headers: { operation: PgSyncOperation; timestamp: string } + headers: Record & { operation: PgSyncOperation } } | null { const operation = message.headers.operation if ( operation !== `insert` && operation !== `update` && operation !== `delete` - ) + ) { return null + } - const sourceRef = - typeof optionsOrSourceRef === `string` - ? optionsOrSourceRef - : sourceRefForPgSync(optionsOrSourceRef) - const rowKey = rowKeyForMessage(message) - const offset = message.headers.offset - if (typeof offset !== `string` || offset.length === 0) return null - const messageKeyPart = offset - const messageKey = `${sourceRef}:${operation}:${messageKeyPart}` - const timestamp = new Date().toISOString() - const oldValue = message.old_value - const safeValue = jsonSafe(message.value) - const safeOldValue = jsonSafe(oldValue) - const safeHeaders = jsonSafe(message.headers) + const key = + message.key ?? + (typeof message.headers.key === `string` + ? message.headers.key + : undefined) ?? + rowKeyForMessage(message) + if (!key) { + return null + } + + const safeMessage = jsonSafe(message) as Record return { type: `pg_sync_change`, - key: messageKey, - value: { - key: messageKey, - table: - typeof optionsOrSourceRef === `string` - ? undefined - : optionsOrSourceRef.table, + key, + value: safeMessage, + headers: { + ...(jsonSafe(message.headers) as Record), operation, - ...(rowKey !== undefined ? { rowKey } : {}), - ...(message.value !== undefined ? { value: safeValue } : {}), - ...(oldValue !== undefined ? { oldValue: safeOldValue } : {}), - headers: safeHeaders, - ...(typeof offset === `string` ? { offset } : {}), - receivedAt: timestamp, }, - headers: { operation, timestamp }, } } @@ -425,10 +459,11 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { private bridges = new Map() private starting = new Map>() - private readonly url: string private readonly retry: Required< NonNullable > + private readonly fetchFn?: typeof fetch + private readonly probeTimeoutMs: number constructor( private streamClient: StreamClient, @@ -436,7 +471,8 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { private registry?: PostgresRegistry, options: PgSyncBridgeManagerOptions = {} ) { - this.url = options.url ?? PG_SYNC_ELECTRIC_SHAPE_URL + this.fetchFn = options.fetchFn + this.probeTimeoutMs = options.probeTimeoutMs ?? DEFAULT_PROBE_TIMEOUT_MS this.retry = { initialDelayMs: options.retry?.initialDelayMs ?? DEFAULT_RETRY_INITIAL_DELAY_MS, @@ -478,6 +514,9 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { const resolvedSource = this.resolveSource(canonicalOptions) const sourceRef = sourceRefForPgSync(canonicalOptions) const streamUrl = getPgSyncStreamPath(sourceRef, this.registry?.tenantId) + if (!this.bridges.has(sourceRef) && !this.starting.has(sourceRef)) { + await this.probeSource(resolvedSource, canonicalOptions) + } const row = await this.registry?.upsertPgSyncBridge({ sourceRef, options: canonicalOptions, @@ -541,7 +580,51 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { } private resolveSource(options: CanonicalPgSyncConfig): PgSyncResolvedSource { - return { url: options.url ?? this.url } + if (!options.url) { + throw new PgSyncSourceValidationError( + `pgSync source url is required; no server default is configured` + ) + } + return { url: options.url } + } + + /** + * One-shot fetch of the shape log before a bridge is created, so a bad + * URL or rejected shape fails the registration instead of dying silently + * in the bridge's retry loop. + */ + private async probeSource( + source: PgSyncResolvedSource, + options: CanonicalPgSyncConfig + ): Promise { + const probeUrl = buildShapeProbeUrl(source.url, options) + const fetchFn = this.fetchFn ?? globalThis.fetch + let response: Response + try { + response = await fetchFn(probeUrl, { + signal: AbortSignal.timeout(this.probeTimeoutMs), + }) + } catch (error) { + throw new PgSyncSourceValidationError( + `pgSync source at ${source.url} is unreachable: ${error instanceof Error ? error.message : String(error)}` + ) + } + if (!response.ok) { + const body = (await response.text().catch(() => ``)).slice(0, 500) + throw new PgSyncSourceValidationError( + `pgSync source at ${source.url} rejected the shape request (${response.status})${body ? `: ${body}` : ``}` + ) + } + // Electric answers 200 on paths that aren't the shape API (e.g. its + // root), so an ok status alone doesn't prove the URL is right. Real + // shape responses always carry the electric-handle header. + if (!response.headers.get(`electric-handle`)) { + const suggestion = new URL(source.url) + suggestion.pathname = `/v1/shape` + throw new PgSyncSourceValidationError( + `pgSync source at ${source.url} responded but is not a shape log (missing electric-handle header) — the Electric shape API is usually served at ${suggestion.origin}/v1/shape` + ) + } } async stop(): Promise { diff --git a/packages/agents-server/src/routing/pg-sync-router.ts b/packages/agents-server/src/routing/pg-sync-router.ts index ed4be80b16..f4cbf56fb0 100644 --- a/packages/agents-server/src/routing/pg-sync-router.ts +++ b/packages/agents-server/src/routing/pg-sync-router.ts @@ -8,6 +8,7 @@ import type { } from '@electric-ax/agents-runtime' import { Type, type Static } from '@sinclair/typebox' import { Router, json } from 'itty-router' +import { PgSyncSourceValidationError } from '../pg-sync-bridge-manager.js' import { apiError } from '../electric-agents-http.js' import { ErrCodeInvalidRequest } from '../electric-agents-types.js' import { routeBody, withSchema } from './schema.js' @@ -16,7 +17,7 @@ import type { RouterType } from 'itty-router' import type { TenantContext } from './context.js' const pgSyncOptionsSchema = Type.Object({ - url: Type.Optional(Type.String()), + url: Type.String(), table: Type.String(), columns: Type.Optional(Type.Array(Type.String())), where: Type.Optional(Type.String()), @@ -72,6 +73,10 @@ async function registerPgSync( ): Promise { const { options, metadata } = routeBody(request) + if (options.url.trim() === ``) { + return apiError(400, ErrCodeInvalidRequest, `pgSync url must be non-empty`) + } + if (options.table.trim() === ``) { return apiError( 400, @@ -104,6 +109,9 @@ async function registerPgSync( return json(result) } catch (error) { + if (error instanceof PgSyncSourceValidationError) { + return apiError(400, ErrCodeInvalidRequest, error.message) + } return apiError( 500, ErrCodeInvalidRequest, diff --git a/packages/agents-server/src/wake-registry.ts b/packages/agents-server/src/wake-registry.ts index 65be089e83..ff8ef6e75b 100644 --- a/packages/agents-server/src/wake-registry.ts +++ b/packages/agents-server/src/wake-registry.ts @@ -967,6 +967,8 @@ export class WakeRegistry { } if (value && `oldValue` in value) { change.oldValue = value.oldValue + } else if (value && `old_value` in value) { + change.oldValue = value.old_value } if (eventType === `inbox`) { diff --git a/packages/agents-server/test/manifest-side-effects.test.ts b/packages/agents-server/test/manifest-side-effects.test.ts index 6a6ccb91c2..4469f7a232 100644 --- a/packages/agents-server/test/manifest-side-effects.test.ts +++ b/packages/agents-server/test/manifest-side-effects.test.ts @@ -51,7 +51,22 @@ describe(`manifest side effects`, () => { }) }) - it(`maps pgSync source manifest sourceRef to pg-sync stream path`, () => { + it(`prefers pgSync streamUrl from manifest config when present`, () => { + const sourceRef = `pg_abc123` + + expect( + extractManifestSourceUrl({ + kind: `source`, + sourceType: `pgSync`, + sourceRef, + config: { + streamUrl: `/_electric/pg-sync/default/${sourceRef}`, + }, + }) + ).toBe(`/_electric/pg-sync/default/${sourceRef}`) + }) + + it(`falls back to pgSync sourceRef-derived stream path`, () => { const sourceRef = `pg_abc123` expect( @@ -71,6 +86,9 @@ describe(`manifest side effects`, () => { kind: `source`, sourceType: `pgSync`, sourceRef, + config: { + streamUrl: `/_electric/pg-sync/default/${sourceRef}`, + }, wake: { on: `change`, ops: [`delete`] }, }, `source:pgSync:${sourceRef}` @@ -78,7 +96,7 @@ describe(`manifest side effects`, () => { expect(registration).toEqual({ subscriberUrl: `/parent/p1`, - sourceUrl: getPgSyncStreamPath(sourceRef), + sourceUrl: `/_electric/pg-sync/default/${sourceRef}`, condition: { on: `change`, ops: [`delete`], @@ -107,6 +125,9 @@ describe(`manifest side effects`, () => { kind: `source`, sourceType: `pgSync`, sourceRef, + config: { + streamUrl: `/_electric/pg-sync/default/${sourceRef}`, + }, wake: { on: `change`, collections: [`pg_sync_change`], @@ -118,7 +139,7 @@ describe(`manifest side effects`, () => { expect(registration).toEqual({ subscriberUrl: `/parent/p1`, - sourceUrl: getPgSyncStreamPath(sourceRef), + sourceUrl: `/_electric/pg-sync/default/${sourceRef}`, condition: { on: `change`, collections: [`pg_sync_change`], diff --git a/packages/agents-server/test/oss-server-router.test.ts b/packages/agents-server/test/oss-server-router.test.ts index 51c733d4f2..149abe7c6c 100644 --- a/packages/agents-server/test/oss-server-router.test.ts +++ b/packages/agents-server/test/oss-server-router.test.ts @@ -115,7 +115,12 @@ describe(`OSS server routing wrapper`, () => { new Request(`http://server/_electric/pg-sync/register`, { method: `POST`, headers: { 'content-type': `application/json` }, - body: JSON.stringify({ options: { table: `entities` } }), + body: JSON.stringify({ + options: { + url: `https://electric.example/v1/shape`, + table: `entities`, + }, + }), }), buildTenantContext({ pgSyncBridgeManager: pgSyncBridgeManager as any }) ) @@ -123,6 +128,7 @@ describe(`OSS server routing wrapper`, () => { expect(registerResponse.status).toBe(200) expect(pgSyncBridgeManager.register).toHaveBeenCalledWith( { + url: `https://electric.example/v1/shape`, table: `entities`, }, { diff --git a/packages/agents-server/test/pg-sync-bridge-manager.test.ts b/packages/agents-server/test/pg-sync-bridge-manager.test.ts index 007d42075d..6d644fa6ea 100644 --- a/packages/agents-server/test/pg-sync-bridge-manager.test.ts +++ b/packages/agents-server/test/pg-sync-bridge-manager.test.ts @@ -7,7 +7,6 @@ import { buildElectricShapeParams, pgSyncMessageToDurableEvent, PgSyncBridgeManager, - PG_SYNC_ELECTRIC_SHAPE_URL, } from '../src/pg-sync-bridge-manager' const { mockState } = vi.hoisted(() => ({ @@ -57,12 +56,24 @@ vi.mock(`@durable-streams/client`, () => ({ }, })) +const SHAPE_URL = `https://electric.example/v1/shape` + beforeEach(() => { mockState.callbacks = [] mockState.constructedOptions = [] mockState.appends = [] mockState.appendError = null mockState.streams = [] + vi.stubGlobal( + `fetch`, + vi.fn( + async () => + new Response(`[]`, { + status: 200, + headers: { 'electric-handle': `handle-1` }, + }) + ) + ) }) describe(`pg-sync bridge helpers`, () => { @@ -85,59 +96,81 @@ describe(`pg-sync bridge helpers`, () => { }) }) - it(`converts insert/update/delete messages with stable keys`, () => { - const options = { table: `todos` } + it(`copies shape messages directly while preserving the row key`, () => { + const options = { url: SHAPE_URL, table: `todos` } const insert = pgSyncMessageToDurableEvent( { - headers: { operation: `insert`, offset: `1_0` }, + key: `"public"."todos"/"1"`, + headers: { operation: `insert`, lsn: `1`, op_position: 0 }, value: { id: 1, text: `a` }, } as any, options )! const update = pgSyncMessageToDurableEvent( { - headers: { operation: `update`, offset: `2_0` }, + key: `"public"."todos"/"1"`, + headers: { operation: `update` }, value: { id: 1, text: `b` }, + old_value: { id: 1, text: `a` }, } as any, options )! const del = pgSyncMessageToDurableEvent( { - headers: { operation: `delete`, offset: `3_0` }, - value: { id: 1 }, + key: `"public"."todos"/"1"`, + headers: { operation: `delete` }, + old_value: { id: 1 }, } as any, options )! - expect(insert.key).toBe(`${sourceRefForPgSync(options)}:insert:1_0`) + expect(insert.key).toBe(`"public"."todos"/"1"`) + expect(insert.value).toMatchObject({ + key: `"public"."todos"/"1"`, + value: { id: 1, text: `a` }, + }) expect(update.headers.operation).toBe(`update`) - expect(del.value.operation).toBe(`delete`) - expect(del.value.rowKey).toBe(`1`) + expect(update.value).toMatchObject({ + key: `"public"."todos"/"1"`, + value: { id: 1, text: `b` }, + old_value: { id: 1, text: `a` }, + }) + expect(del.value).toMatchObject({ + key: `"public"."todos"/"1"`, + old_value: { id: 1 }, + }) + }) + + it(`falls back to row identity when Electric omits the top-level key`, () => { + const options = { url: SHAPE_URL, table: `todos` } + + const event = pgSyncMessageToDurableEvent( + { + headers: { operation: `insert` }, + value: { id: 32, text: `testing` }, + } as any, + options + )! + + expect(event.key).toBe(`32`) }) - it(`rejects messages without stable string offsets`, () => { - const options = { table: `todos` } + it(`rejects messages without a row key`, () => { + const options = { url: SHAPE_URL, table: `todos` } expect( pgSyncMessageToDurableEvent( { - key: `shape-key-1`, headers: { operation: `insert` }, - value: { id: 1 }, + value: { text: `missing id` }, } as any, options ) ).toBeNull() - expect( - pgSyncMessageToDurableEvent( - { headers: { operation: `insert` }, value: { id: 1 } } as any, - options - ) - ).toBeNull() }) it(`converts BigInt values to strings so durable events are JSON serializable`, () => { - const options = { table: `entities` } + const options = { url: SHAPE_URL, table: `entities` } const event = pgSyncMessageToDurableEvent( { headers: { operation: `insert`, offset: `12_0` }, @@ -149,12 +182,23 @@ describe(`pg-sync bridge helpers`, () => { expect(JSON.stringify(event)).toContain(`"1"`) expect(event.value.value).toEqual({ id: `1`, nested: { count: `2` } }) - expect(event.value.oldValue).toEqual({ id: `0` }) + expect(event.value.old_value).toEqual({ id: `0` }) expect(event.value.headers).toEqual({ operation: `insert`, offset: `12_0` }) }) }) describe(`PgSyncBridgeManager`, () => { + it(`requires a source URL at registration time`, async () => { + const manager = new PgSyncBridgeManager({ + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any) + + await expect(manager.register({ table: `todos` })).rejects.toThrow( + /pgSync source url is required/ + ) + }) + it(`starts one stream per sourceRef and appends change events`, async () => { const streamClient = { baseUrl: `http://durable`, @@ -162,13 +206,13 @@ describe(`PgSyncBridgeManager`, () => { } const manager = new PgSyncBridgeManager(streamClient as any) - await manager.register({ table: `todos` }) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) expect(streamClient.ensure).toHaveBeenCalledTimes(2) expect(mockState.constructedOptions).toHaveLength(1) expect(mockState.constructedOptions[0]).toMatchObject({ - url: PG_SYNC_ELECTRIC_SHAPE_URL, + url: SHAPE_URL, params: { table: `todos` }, offset: `now`, log: `changes_only`, @@ -180,8 +224,12 @@ describe(`PgSyncBridgeManager`, () => { ]) expect(JSON.parse(mockState.appends[0]!)).toMatchObject({ type: `pg_sync_change`, + key: `1`, headers: { operation: `insert` }, - value: { table: `todos`, operation: `insert`, rowKey: `1` }, + value: { + headers: { operation: `insert` }, + value: { id: 1 }, + }, }) }) @@ -193,7 +241,7 @@ describe(`PgSyncBridgeManager`, () => { } const manager = new PgSyncBridgeManager(streamClient as any, evaluateWakes) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([ { headers: { operation: `insert`, offset: `1_0` }, value: { id: 1 } }, { headers: { control: `up-to-date` } }, @@ -216,7 +264,7 @@ describe(`PgSyncBridgeManager`, () => { ensure: vi.fn(async () => undefined), } const manager = new PgSyncBridgeManager(streamClient as any, evaluateWakes) - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) await manager.register(options) @@ -250,7 +298,7 @@ describe(`PgSyncBridgeManager`, () => { undefined, registry as any ) - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) const result = await manager.register(options) @@ -279,7 +327,11 @@ describe(`PgSyncBridgeManager`, () => { undefined, registry as any ) - const options = { table: `todos`, params: { b: `2`, a: `1` } } + const options = { + url: SHAPE_URL, + table: `todos`, + params: { b: `2`, a: `1` }, + } const sourceRef = sourceRefForPgSync(options) await manager.register(options) @@ -287,6 +339,7 @@ describe(`PgSyncBridgeManager`, () => { sourceRef, streamUrl: getPgSyncStreamPath(sourceRef), options: { + url: SHAPE_URL, table: `todos`, params: { a: `1`, b: `2` }, replica: `default`, @@ -323,7 +376,7 @@ describe(`PgSyncBridgeManager`, () => { registry as any ) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) registry.updatePgSyncBridgeCursor.mockClear() mockState.appendError = new Error(`append failed`) @@ -336,7 +389,7 @@ describe(`PgSyncBridgeManager`, () => { }) it(`startup resumes existing pgSync bridges from stored cursor`, async () => { - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) const registry = { listPgSyncBridges: vi.fn(async () => [ @@ -370,7 +423,7 @@ describe(`PgSyncBridgeManager`, () => { }) it(`invalid stored shape cursor falls back to bootstrap and clears cursor`, async () => { - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) const registry = { listPgSyncBridges: vi.fn(async () => [ @@ -404,7 +457,7 @@ describe(`PgSyncBridgeManager`, () => { }) it(`must-refetch clears persisted cursor and restarts bootstrap`, async () => { - const options = { table: `todos` } + const options = { url: SHAPE_URL, table: `todos` } const sourceRef = sourceRefForPgSync(options) const registry = { upsertPgSyncBridge: vi.fn(async (row) => ({ ...row })), @@ -437,7 +490,7 @@ describe(`PgSyncBridgeManager`, () => { baseUrl: `http://durable`, ensure: vi.fn(async () => undefined), } as any) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([{ headers: { control: `must-refetch` } }]) @@ -488,7 +541,7 @@ describe(`external review red tests`, () => { undefined, registry as any ) - await first.register({ table: `todos` }) + await first.register({ url: SHAPE_URL, table: `todos` }) mockState.streams[0]!.shapeHandle = `shape-a` mockState.streams[0]!.lastOffset = `1_0` await mockState.callbacks[0]!([ @@ -528,7 +581,7 @@ describe(`external review red tests`, () => { registry as any ) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) mockState.streams[0]!.shapeHandle = `shape-a` mockState.streams[0]!.lastOffset = `2_0` @@ -543,18 +596,22 @@ describe(`external review red tests`, () => { }) }) - it(`rejects pg-sync change messages without a stable per-change offset`, () => { + it(`accepts pg-sync change messages without a per-change offset when the row key is present`, () => { expect( pgSyncMessageToDurableEvent( - { headers: { operation: `insert` }, value: { id: 1 } } as any, - { table: `todos` } + { + key: `"public"."todos"/"1"`, + headers: { operation: `insert` }, + value: { id: 1 }, + } as any, + { url: SHAPE_URL, table: `todos` } ) - ).toBeNull() + ).toMatchObject({ key: `"public"."todos"/"1"` }) }) }) describe(`pg-sync production hardening`, () => { - it(`uses configured URL and forwards request metadata as shape params`, async () => { + it(`uses the source URL from registration options and forwards request metadata as shape params`, async () => { const manager = new PgSyncBridgeManager( { baseUrl: `http://durable`, @@ -563,13 +620,12 @@ describe(`pg-sync production hardening`, () => { undefined, undefined, { - url: `https://electric.example/v1/shape`, retry: { initialDelayMs: 0, maxDelayMs: 0 }, } ) await manager.register( - { table: `todos` }, + { url: SHAPE_URL, table: `todos` }, { tenantId: `tenant-a`, principalKind: `agent`, @@ -585,7 +641,7 @@ describe(`pg-sync production hardening`, () => { ) expect(mockState.constructedOptions[0]).toMatchObject({ - url: `https://electric.example/v1/shape`, + url: SHAPE_URL, params: { table: `todos`, replica: `default`, @@ -603,6 +659,165 @@ describe(`pg-sync production hardening`, () => { }) }) + it(`probes the shape endpoint before starting a new bridge`, async () => { + const fetchFn = vi.fn( + async (_input: RequestInfo | URL) => + new Response(`[]`, { + status: 200, + headers: { 'electric-handle': `handle-1` }, + }) + ) + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + const options = { + url: SHAPE_URL, + table: `todos`, + columns: [`id`, `text`], + where: `done = $1`, + params: [`false`], + } + + await manager.register(options) + + expect(fetchFn).toHaveBeenCalledTimes(1) + const probeUrl = new URL(String(fetchFn.mock.calls[0]![0])) + expect(probeUrl.origin + probeUrl.pathname).toBe(SHAPE_URL) + expect(probeUrl.searchParams.get(`table`)).toBe(`todos`) + expect(probeUrl.searchParams.get(`columns`)).toBe(`id,text`) + expect(probeUrl.searchParams.get(`where`)).toBe(`done = $1`) + expect(probeUrl.searchParams.get(`params[1]`)).toBe(`false`) + expect(probeUrl.searchParams.get(`offset`)).toBe(`now`) + + await manager.register(options) + expect(fetchFn).toHaveBeenCalledTimes(1) + }) + + it(`fails registration when the shape endpoint rejects the probe`, async () => { + const ensure = vi.fn(async () => undefined) + const upsertPgSyncBridge = vi.fn(async () => undefined) + const fetchFn = vi.fn( + async () => new Response(`{"message":"table not found"}`, { status: 400 }) + ) + const manager = new PgSyncBridgeManager( + { baseUrl: `http://durable`, ensure } as any, + undefined, + { upsertPgSyncBridge } as any, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + + await expect( + manager.register({ url: SHAPE_URL, table: `missing` }) + ).rejects.toThrow(/400.*table not found/s) + expect(mockState.constructedOptions).toHaveLength(0) + expect(upsertPgSyncBridge).not.toHaveBeenCalled() + expect(ensure).not.toHaveBeenCalled() + }) + + it(`fails registration when the shape endpoint is unreachable`, async () => { + const fetchFn = vi.fn(async () => { + throw new Error(`ECONNREFUSED`) + }) + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + + await expect( + manager.register({ url: SHAPE_URL, table: `todos` }) + ).rejects.toThrow(/unreachable.*ECONNREFUSED/s) + expect(mockState.constructedOptions).toHaveLength(0) + }) + + it(`fails registration when the endpoint responds 200 but is not a shape log`, async () => { + // Electric answers 200 with an empty body on its root path, so a bare + // host URL passes an ok-check while the shape API lives at /v1/shape. + const fetchFn = vi.fn(async () => new Response(``, { status: 200 })) + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + + await expect( + manager.register({ url: `http://localhost:30000`, table: `todos` }) + ).rejects.toThrow(/not a shape log.*\/v1\/shape/s) + expect(mockState.constructedOptions).toHaveLength(0) + }) + + it(`fails registration when the url is not an HTTP(S) endpoint`, async () => { + const fetchFn = vi.fn( + async () => + new Response(`[]`, { + status: 200, + headers: { 'electric-handle': `handle-1` }, + }) + ) + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 }, fetchFn } + ) + + await expect( + manager.register({ + url: `postgres://app:secret@localhost:5432/app`, + table: `todos`, + }) + ).rejects.toThrow(/HTTP/) + expect(fetchFn).not.toHaveBeenCalled() + }) + + it(`keeps the sourceRef stable across registrations with different request metadata`, async () => { + const ensure = vi.fn(async () => undefined) + const manager = new PgSyncBridgeManager( + { baseUrl: `http://durable`, ensure } as any, + undefined, + undefined, + { retry: { initialDelayMs: 0, maxDelayMs: 0 } } + ) + const options = { url: SHAPE_URL, table: `todos` } + const metadata = { + tenantId: `tenant-a`, + principalKind: `agent`, + principalId: `horton`, + entityUrl: `/horton/abc`, + runtimeConsumerId: `runner-1`, + } + + const first = await manager.register(options, { + ...metadata, + wakeId: `wake-1`, + }) + const second = await manager.register(options, { + ...metadata, + wakeId: `wake-2`, + }) + + expect(first.sourceRef).toBe(sourceRefForPgSync(options)) + expect(second).toEqual(first) + expect(mockState.constructedOptions).toHaveLength(1) + }) + it(`backs off before recovery retries`, async () => { const sleeps: number[] = [] const manager = new PgSyncBridgeManager( @@ -624,7 +839,7 @@ describe(`pg-sync production hardening`, () => { } ) - await manager.register({ table: `todos` }) + await manager.register({ url: SHAPE_URL, table: `todos` }) await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) mockState.appendError = new Error(`append failed`) await mockState.callbacks[0]!([ diff --git a/packages/agents-server/test/pg-sync-router.test.ts b/packages/agents-server/test/pg-sync-router.test.ts index 585f1eae87..17a85209e4 100644 --- a/packages/agents-server/test/pg-sync-router.test.ts +++ b/packages/agents-server/test/pg-sync-router.test.ts @@ -4,6 +4,7 @@ import { sourceRefForPgSync, } from '@electric-ax/agents-runtime' import { globalRouter } from '../src/routing/global-router' +import { PgSyncSourceValidationError } from '../src/pg-sync-bridge-manager' import type { TenantContext } from '../src/routing/context' function request(method: string, path: string, body?: unknown): Request { @@ -50,13 +51,15 @@ function buildContext(overrides: Partial = {}): TenantContext { describe(`pg-sync routes`, () => { it(`registers a pg-sync source and returns its stream path`, async () => { const ctx = buildContext() - const expectedSourceRef = sourceRefForPgSync({ table: `todos` }) + const options = { + url: `https://electric.example/v1/shape`, + table: `todos`, + } + const expectedSourceRef = sourceRefForPgSync(options) const expectedStreamUrl = getPgSyncStreamPath(expectedSourceRef) const response = await globalRouter.fetch( - request(`POST`, `/_electric/pg-sync/register`, { - options: { table: `todos` }, - }), + request(`POST`, `/_electric/pg-sync/register`, { options }), ctx ) @@ -65,18 +68,13 @@ describe(`pg-sync routes`, () => { sourceRef: expectedSourceRef, streamUrl: expectedStreamUrl, }) - expect(ctx.pgSyncBridgeManager!.register).toHaveBeenCalledWith( - { - table: `todos`, - }, - { - tenantId: `tenant-test`, - principalKind: `user`, - principalId: `owner@example.com`, - principalKey: `user:owner@example.com`, - principalUrl: `/principal/user%3Aowner%40example.com`, - } - ) + expect(ctx.pgSyncBridgeManager!.register).toHaveBeenCalledWith(options, { + tenantId: `tenant-test`, + principalKind: `user`, + principalId: `owner@example.com`, + principalKey: `user:owner@example.com`, + principalUrl: `/principal/user%3Aowner%40example.com`, + }) }) it(`rejects an empty table`, async () => { @@ -84,7 +82,7 @@ describe(`pg-sync routes`, () => { const response = await globalRouter.fetch( request(`POST`, `/_electric/pg-sync/register`, { - options: { table: ` ` }, + options: { url: `https://electric.example/v1/shape`, table: ` ` }, }), ctx ) @@ -93,9 +91,68 @@ describe(`pg-sync routes`, () => { expect(ctx.pgSyncBridgeManager!.register).not.toHaveBeenCalled() }) + it(`rejects a missing or empty url`, async () => { + const ctx = buildContext() + + const missing = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { + options: { table: `todos` }, + }), + ctx + ) + const empty = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { + options: { url: ` `, table: `todos` }, + }), + ctx + ) + + expect(missing.status).toBe(400) + expect(empty.status).toBe(400) + expect(ctx.pgSyncBridgeManager!.register).not.toHaveBeenCalled() + }) + + it(`returns 400 when source validation fails and 500 for other errors`, async () => { + const invalidSource = buildContext({ + pgSyncBridgeManager: { + register: vi.fn(async () => { + throw new PgSyncSourceValidationError( + `pgSync source at https://nope.example rejected the shape request (404)` + ) + }), + stop: vi.fn(async () => undefined), + } as any, + }) + const internalFailure = buildContext({ + pgSyncBridgeManager: { + register: vi.fn(async () => { + throw new Error(`registry unavailable`) + }), + stop: vi.fn(async () => undefined), + } as any, + }) + const options = { url: `https://nope.example/v1/shape`, table: `todos` } + + const invalidResponse = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { options }), + invalidSource + ) + const failureResponse = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { options }), + internalFailure + ) + + expect(invalidResponse.status).toBe(400) + await expect(invalidResponse.text()).resolves.toContain( + `rejected the shape request (404)` + ) + expect(failureResponse.status).toBe(500) + }) + it(`computes the same sourceRef as the runtime pgSync helper`, async () => { const ctx = buildContext() const options = { + url: `https://electric.example/v1/shape`, table: `todos`, where: `priority = 'high'`, params: { b: `2`, a: `1` }, diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index 7c95f88b54..c74ed293f9 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -285,7 +285,7 @@ When a user opens with a greeting ("hi", "hello", "hey", etc.) or a broad statem - fetch_url: fetch and convert a URL to markdown - spawn_worker: dispatch a subagent for an isolated task - fork: spawn a child session that inherits this conversation's history up to the latest completed response. Same parent-ownership model as spawn_worker — when the fork's next run finishes, you'll wake with its response. -- observe_pg_sync: observe an Electric Postgres sync stream and wake on matching changes +- observe_pg_sync: observe an Electric Postgres sync stream and wake on matching changes (see "Observing Postgres tables") - send: send a message to an Electric Agent/entity. To schedule future work for yourself, call send with self: true and afterMs. ${eventSourceTools}${titleTool}${scheduleTools}${docsTools}${skillsTools} @@ -295,6 +295,13 @@ ${eventSourceTools}${titleTool}${scheduleTools}${docsTools}${skillsTools} - Use absolute paths or paths relative to the current working directory. ${modelGuidance}${docsGuidance}${skillsGuidance}${onboardingGuidance}${docsUrlGuidance} +# Observing Postgres tables +observe_pg_sync subscribes you to row changes in a Postgres table via an Electric shape stream: +- The \`url\` parameter is the HTTP(S) URL of an Electric shape endpoint (e.g. \`http://localhost:3000/v1/shape\`). It is NOT a \`postgres://\` connection string and there is no default — if the user hasn't given you the endpoint URL, ask for it. Never guess or invent one. +- Registration validates the endpoint by fetching the shape log first. If it fails, the error includes Electric's response — use it to correct the table name, where clause, or URL, or relay it to the user. +- Use \`where\` and \`columns\` to narrow the shape so you only wake on changes you care about; use \`wake.ops\` to filter by operation and \`wake.debounceMs\` to batch bursts. +- The observation persists across wakes — register it once, don't re-register on every wake. + # Risky actions Pause and confirm with the user before: - Destructive operations (deleting files, rm -rf, dropping data, force-pushing) diff --git a/packages/agents/src/tools/observe-pg-sync.ts b/packages/agents/src/tools/observe-pg-sync.ts index 4bb7ee3989..fd157ca75f 100644 --- a/packages/agents/src/tools/observe-pg-sync.ts +++ b/packages/agents/src/tools/observe-pg-sync.ts @@ -1,5 +1,9 @@ import { Type } from '@sinclair/typebox' -import { pgSync, type HandlerContext } from '@electric-ax/agents-runtime' +import { + getPgSyncStreamPath, + pgSync, + type HandlerContext, +} from '@electric-ax/agents-runtime' import type { AgentTool } from '@mariozechner/pi-agent-core' function asToolResult(value: unknown) { @@ -25,13 +29,11 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { return { name: `observe_pg_sync`, label: `Observe Postgres Sync`, - description: `Observe an Electric Postgres shape stream and wake this agent when matching row changes arrive.`, + description: `Observe an Electric Postgres shape stream and wake this agent when matching row changes arrive. Requires the HTTP(S) URL of an Electric shape endpoint — ask the user for it if you don't know it. Registration validates the endpoint up front and fails with Electric's error if the shape can't be fetched.`, parameters: Type.Object({ - url: Type.Optional( - Type.String({ - description: `Optional Electric shape endpoint URL. Defaults to the server-configured pg-sync URL.`, - }) - ), + url: Type.String({ + description: `HTTP(S) URL of the Electric shape endpoint, e.g. http://localhost:3000/v1/shape. Not a postgres:// connection string. Never guess this — ask the user if it hasn't been provided.`, + }), table: Type.String({ minLength: 1, pattern: `\\S`, @@ -66,7 +68,7 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { }), execute: async (_toolCallId, params) => { const args = params as { - url?: string + url: string table: string columns?: string[] where?: string @@ -78,6 +80,10 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { } } + if (typeof args.url !== `string` || args.url.trim().length === 0) { + throw new Error(`url is required`) + } + if (typeof args.table !== `string` || args.table.trim().length === 0) { throw new Error(`table is required`) } @@ -98,11 +104,11 @@ export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { : {}), } - await ctx.observe(source, { wake }) + const handle = await ctx.observe(source, { wake }) return asToolResult({ - sourceRef: source.sourceRef, - streamUrl: source.streamUrl, + sourceRef: handle.sourceRef, + streamUrl: handle.streamUrl ?? getPgSyncStreamPath(handle.sourceRef), wake, }) }, diff --git a/packages/agents/test/observe-pg-sync-tool.test.ts b/packages/agents/test/observe-pg-sync-tool.test.ts index fa7cbd0f87..ba01ff0b7e 100644 --- a/packages/agents/test/observe-pg-sync-tool.test.ts +++ b/packages/agents/test/observe-pg-sync-tool.test.ts @@ -10,11 +10,15 @@ function textResult(result: unknown): any { } describe(`observe_pg_sync tool`, () => { - it(`validates required table`, async () => { + it(`validates required url and table`, async () => { const tool = createObservePgSyncTool({ observe: vi.fn() } as any) expect(Value.Check(tool.parameters as any, {})).toBe(false) - await expect(tool.execute(`call`, {})).rejects.toThrow(/table is required/) + expect(Value.Check(tool.parameters as any, { table: `todos` })).toBe(false) + await expect(tool.execute(`call`, {})).rejects.toThrow(/url is required/) + await expect(tool.execute(`call`, { table: `todos` })).rejects.toThrow( + /url is required/ + ) }) it(`rejects invalid ops and unsupported timeoutMs when schema validates`, () => { @@ -35,10 +39,16 @@ describe(`observe_pg_sync tool`, () => { }) it(`calls ctx.observe with pgSync source and wake options`, async () => { - const observe = vi.fn(async () => {}) + const observe = vi.fn(async () => ({ + sourceType: `pgSync`, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, + events: [], + })) const tool = createObservePgSyncTool({ observe } as any) await tool.execute(`call`, { + url: `http://localhost:30000/v1/shape`, table: `todos`, columns: [`id`, `text`], where: `priority = $1`, @@ -48,6 +58,7 @@ describe(`observe_pg_sync tool`, () => { }) const expectedSource = pgSync({ + url: `http://localhost:30000/v1/shape`, table: `todos`, columns: [`id`, `text`], where: `priority = $1`, @@ -74,11 +85,20 @@ describe(`observe_pg_sync tool`, () => { }) it(`preserves debounceMs: 0`, async () => { - const observe = vi.fn(async () => {}) + const observe = vi.fn(async () => ({ + sourceType: `pgSync`, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, + events: [], + })) const tool = createObservePgSyncTool({ observe } as any) const result = textResult( - await tool.execute(`call`, { table: `todos`, wake: { debounceMs: 0 } }) + await tool.execute(`call`, { + url: `http://localhost:30000/v1/shape`, + table: `todos`, + wake: { debounceMs: 0 }, + }) ) expect(observe).toHaveBeenCalledWith(expect.anything(), { @@ -87,27 +107,45 @@ describe(`observe_pg_sync tool`, () => { expect(result.wake).toEqual({ on: `change`, debounceMs: 0 }) }) - it(`returns sourceRef, streamUrl, and wake`, async () => { - const observe = vi.fn(async () => {}) + it(`returns the observed sourceRef, streamUrl, and wake`, async () => { + const observe = vi.fn(async () => ({ + sourceType: `pgSync`, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, + events: [], + })) const tool = createObservePgSyncTool({ observe } as any) const result = textResult( - await tool.execute(`call`, { table: `todos`, wake: { ops: [`delete`] } }) + await tool.execute(`call`, { + url: `http://localhost:30000/v1/shape`, + table: `todos`, + wake: { ops: [`delete`] }, + }) ) - const source = pgSync({ table: `todos` }) expect(result).toEqual({ - sourceRef: source.sourceRef, - streamUrl: source.streamUrl, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, wake: { on: `change`, ops: [`delete`] }, }) }) it(`defaults wake when wake.ops is omitted`, async () => { - const observe = vi.fn(async () => {}) + const observe = vi.fn(async () => ({ + sourceType: `pgSync`, + sourceRef: `registered-ref`, + streamUrl: `/_electric/pg-sync/default/registered-ref`, + events: [], + })) const tool = createObservePgSyncTool({ observe } as any) - const result = textResult(await tool.execute(`call`, { table: `todos` })) + const result = textResult( + await tool.execute(`call`, { + url: `http://localhost:30000/v1/shape`, + table: `todos`, + }) + ) expect(observe).toHaveBeenCalledWith(expect.anything(), { wake: { on: `change` },