Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/require-pg-sync-source-urls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@electric-ax/agents-runtime': patch
'@electric-ax/agents-server': patch
'@electric-ax/agents': patch
---

Require an explicit Electric shape endpoint URL for pg-sync observations. Source identity is now derived from the shape options alone (not per-request metadata) so re-registrations reuse the same bridge and stream, and registration validates the endpoint by fetching the shape log up front, failing with Electric's error instead of retrying silently.
5 changes: 4 additions & 1 deletion packages/agents-runtime/src/observation-sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ export function canonicalPgSyncOptions(
}

export function sourceRefForPgSync(options: PgSyncOptions): string {
return hashString(JSON.stringify(canonicalPgSyncOptions(options)))
// metadata is per-request context (principal, wakeId, ...) and must not
// change the identity of the observed shape.
const { metadata: _metadata, ...identity } = canonicalPgSyncOptions(options)
return hashString(JSON.stringify(identity))
}

export interface EntityObservationSource extends ObservationSource {
Expand Down
15 changes: 15 additions & 0 deletions packages/agents-runtime/src/process-wake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1748,10 +1748,25 @@ export async function processWake(
wakeId,
}
)
const originalEntry = source.toManifestEntry() as Record<
string,
unknown
>
observedSource = {
...source,
sourceRef: registeredPgSync.sourceRef,
streamUrl: registeredPgSync.streamUrl,
toManifestEntry() {
return {
...originalEntry,
key: `source:pgSync:${registeredPgSync!.sourceRef}`,
sourceRef: registeredPgSync!.sourceRef,
config: {
...((originalEntry.config as Record<string, unknown>) ?? {}),
streamUrl: registeredPgSync!.streamUrl,
},
} as unknown as ReturnType<ObservationSource[`toManifestEntry`]>
},
}
}

Expand Down
8 changes: 4 additions & 4 deletions packages/agents-runtime/src/setup-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -886,10 +886,9 @@ export function createSetupContext(
source.ensureStream.contentType
)
}
const sourceStreamUrl =
source.sourceType === `pgSync` || !source.streamUrl.startsWith(`/`)
? source.streamUrl
: appendPathToUrl(config.serverBaseUrl, source.streamUrl)
const sourceStreamUrl = source.streamUrl.startsWith(`/`)
? appendPathToUrl(config.serverBaseUrl, source.streamUrl)
: source.streamUrl
sourceDb = await wiring.createSourceDb(
sourceStreamUrl,
source.schema,
Expand All @@ -912,6 +911,7 @@ export function createSetupContext(
return {
sourceType: source.sourceType,
sourceRef: source.sourceRef,
streamUrl: source.streamUrl,
db: sourceDb,
events,
}
Expand Down
1 change: 1 addition & 0 deletions packages/agents-runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ export interface SourceWakeConfig {
export interface ObservationHandle {
sourceType: string
sourceRef: string
streamUrl?: string
db?: EntityStreamDB | ObservationStreamDB
events: Array<ChangeEvent>
}
Expand Down
15 changes: 14 additions & 1 deletion packages/agents-runtime/test/process-wake.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,7 @@ describe(`processWake`, () => {

it(`pgSync observe registers pgSync source before source DB preload`, async () => {
const source = pgSync({
url: `http://localhost:30000/v1/shape`,
table: `todos`,
where: `priority = $1`,
params: [`high`],
Expand All @@ -1640,7 +1641,7 @@ describe(`processWake`, () => {
expect(mockCreateStreamDB).toHaveBeenCalledWith(
expect.objectContaining({
streamOptions: expect.objectContaining({
url: `/_electric/pg-sync/default/pg-source-1`,
url: `http://localhost:3000/_electric/pg-sync/default/pg-source-1`,
contentType: `application/json`,
}),
state: expect.objectContaining({
Expand Down Expand Up @@ -1670,6 +1671,18 @@ describe(`processWake`, () => {
wakeId: `wake-abc`,
},
})
const wakeCall = fetchMock.mock.calls.find(
([url, opts]) =>
String(url).includes(`/_electric/wake`) &&
!String(url).includes(`wake-abc`) &&
(opts as RequestInit | undefined)?.method === `POST`
)
const wakeBody = JSON.parse(wakeCall![1]!.body as string) as Record<
string,
unknown
>
expect(wakeBody.sourceUrl).toBe(`/_electric/pg-sync/default/pg-source-1`)
expect(wakeBody.manifestKey).toBe(`source:pgSync:pg-source-1`)
expect(fetchMock.mock.invocationCallOrder[pgSyncCallIndex]).toBeLessThan(
mockSourceDbPreload.mock.invocationCallOrder[0]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe(`runtime-server-client.registerPgSyncSource`, () => {
})

const options = {
url: `http://localhost:30000/v1/shape`,
table: `todos`,
columns: [`id`, `text`],
where: `priority = $1`,
Expand Down Expand Up @@ -59,7 +60,10 @@ describe(`runtime-server-client.registerPgSyncSource`, () => {
})

await expect(
client.registerPgSyncSource({ table: `todos` })
client.registerPgSyncSource({
url: `http://localhost:30000/v1/shape`,
table: `todos`,
})
).rejects.toThrow(/registerPgSyncSource failed \(400\): bad table/)
})
})
3 changes: 3 additions & 0 deletions packages/agents-server/src/manifest-side-effects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export function extractManifestSourceUrl(
}

if (manifest.sourceType === `pgSync`) {
if (typeof config?.streamUrl === `string`) {
return config.streamUrl
}
return typeof manifest.sourceRef === `string`
? getPgSyncManifestStreamPath(manifest.sourceRef)
: undefined
Expand Down
163 changes: 123 additions & 40 deletions packages/agents-server/src/pg-sync-bridge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ import type {
ShapeStreamInterface,
} from '@electric-sql/client'

export const PG_SYNC_ELECTRIC_SHAPE_URL =
process.env.ELECTRIC_AGENTS_PG_SYNC_ELECTRIC_URL ??
`http://localhost:3000/v1/shape`

type PgSyncOperation = `insert` | `update` | `delete`
type WakeEvaluator = (
sourceUrl: string,
Expand All @@ -36,22 +32,26 @@ export type PgSyncResolvedSource = {
}

export interface PgSyncBridgeManagerOptions {
url?: string
retry?: {
initialDelayMs?: number
maxDelayMs?: number
random?: () => number
sleep?: (ms: number) => Promise<void>
}
fetchFn?: typeof fetch
probeTimeoutMs?: number
}

/** Registration was rejected because the source itself is invalid — map to a 4xx. */
export class PgSyncSourceValidationError extends Error {}

const DEFAULT_RETRY_INITIAL_DELAY_MS = 1_000
const DEFAULT_RETRY_MAX_DELAY_MS = 30_000
const DEFAULT_PROBE_TIMEOUT_MS = 10_000

type PgSyncChangeMessage = {
headers: Record<string, unknown> & {
operation?: PgSyncOperation | string
offset?: unknown
key?: unknown
rowKey?: unknown
}
Expand Down Expand Up @@ -126,6 +126,52 @@ export function buildElectricShapeParams(
}
}

/**
* Build the one-shot URL used to validate a shape source at registration
* time. Mirrors the query-param encoding of the Electric TS client: arrays
* are comma-joined, where-clause params become `params[n]`.
*/
export function buildShapeProbeUrl(
sourceUrl: string,
options: PgSyncOptions
): URL {
let url: URL
try {
url = new URL(sourceUrl)
} catch {
throw new PgSyncSourceValidationError(
`pgSync url "${sourceUrl}" is not a valid URL`
)
}
if (url.protocol !== `http:` && url.protocol !== `https:`) {
throw new PgSyncSourceValidationError(
`pgSync url "${sourceUrl}" must be an HTTP(S) Electric shape endpoint, not a database connection string`
)
}
for (const [key, value] of Object.entries(
buildElectricShapeParams(options)
)) {
if (value === undefined || value === null) continue
if (Array.isArray(value)) {
if (key === `params`) {
value.forEach((item, index) =>
url.searchParams.set(`params[${index + 1}]`, String(item))
)
} else {
url.searchParams.set(key, value.join(`,`))
}
} else if (typeof value === `object`) {
for (const [k, v] of Object.entries(value)) {
url.searchParams.set(`${key}[${k}]`, String(v))
}
} else {
url.searchParams.set(key, String(value))
}
}
url.searchParams.set(`offset`, `now`)
return url
}

function jsonSafe(value: unknown): unknown {
if (typeof value === `bigint`) return value.toString()
if (value === null || typeof value !== `object`) return value
Expand Down Expand Up @@ -170,54 +216,42 @@ function rowKeyForMessage(message: PgSyncChangeMessage): string | undefined {

export function pgSyncMessageToDurableEvent(
message: PgSyncChangeMessage,
optionsOrSourceRef: PgSyncOptions | string
_optionsOrSourceRef: PgSyncOptions | string
): {
type: `pg_sync_change`
key: string
value: Record<string, unknown>
headers: { operation: PgSyncOperation; timestamp: string }
headers: Record<string, unknown> & { operation: PgSyncOperation }
} | null {
const operation = message.headers.operation
if (
operation !== `insert` &&
operation !== `update` &&
operation !== `delete`
)
) {
return null
}

const sourceRef =
typeof optionsOrSourceRef === `string`
? optionsOrSourceRef
: sourceRefForPgSync(optionsOrSourceRef)
const rowKey = rowKeyForMessage(message)
const offset = message.headers.offset
if (typeof offset !== `string` || offset.length === 0) return null
const messageKeyPart = offset
const messageKey = `${sourceRef}:${operation}:${messageKeyPart}`
const timestamp = new Date().toISOString()
const oldValue = message.old_value
const safeValue = jsonSafe(message.value)
const safeOldValue = jsonSafe(oldValue)
const safeHeaders = jsonSafe(message.headers)
const key =
message.key ??
(typeof message.headers.key === `string`
? message.headers.key
: undefined) ??
rowKeyForMessage(message)
if (!key) {
return null
}

const safeMessage = jsonSafe(message) as Record<string, unknown>

return {
type: `pg_sync_change`,
key: messageKey,
value: {
key: messageKey,
table:
typeof optionsOrSourceRef === `string`
? undefined
: optionsOrSourceRef.table,
key,
value: safeMessage,
headers: {
...(jsonSafe(message.headers) as Record<string, unknown>),
operation,
...(rowKey !== undefined ? { rowKey } : {}),
...(message.value !== undefined ? { value: safeValue } : {}),
...(oldValue !== undefined ? { oldValue: safeOldValue } : {}),
headers: safeHeaders,
...(typeof offset === `string` ? { offset } : {}),
receivedAt: timestamp,
},
headers: { operation, timestamp },
}
}

Expand Down Expand Up @@ -425,18 +459,20 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator {
private bridges = new Map<string, PgSyncBridge>()
private starting = new Map<string, Promise<void>>()

private readonly url: string
private readonly retry: Required<
NonNullable<PgSyncBridgeManagerOptions[`retry`]>
>
private readonly fetchFn?: typeof fetch
private readonly probeTimeoutMs: number

constructor(
private streamClient: StreamClient,
private evaluateWakes?: WakeEvaluator,
private registry?: PostgresRegistry,
options: PgSyncBridgeManagerOptions = {}
) {
this.url = options.url ?? PG_SYNC_ELECTRIC_SHAPE_URL
this.fetchFn = options.fetchFn
this.probeTimeoutMs = options.probeTimeoutMs ?? DEFAULT_PROBE_TIMEOUT_MS
this.retry = {
initialDelayMs:
options.retry?.initialDelayMs ?? DEFAULT_RETRY_INITIAL_DELAY_MS,
Expand Down Expand Up @@ -478,6 +514,9 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator {
const resolvedSource = this.resolveSource(canonicalOptions)
const sourceRef = sourceRefForPgSync(canonicalOptions)
const streamUrl = getPgSyncStreamPath(sourceRef, this.registry?.tenantId)
if (!this.bridges.has(sourceRef) && !this.starting.has(sourceRef)) {
await this.probeSource(resolvedSource, canonicalOptions)
}
const row = await this.registry?.upsertPgSyncBridge({
sourceRef,
options: canonicalOptions,
Expand Down Expand Up @@ -541,7 +580,51 @@ export class PgSyncBridgeManager implements PgSyncBridgeCoordinator {
}

private resolveSource(options: CanonicalPgSyncConfig): PgSyncResolvedSource {
return { url: options.url ?? this.url }
if (!options.url) {
throw new PgSyncSourceValidationError(
`pgSync source url is required; no server default is configured`
)
}
return { url: options.url }
}

/**
* One-shot fetch of the shape log before a bridge is created, so a bad
* URL or rejected shape fails the registration instead of dying silently
* in the bridge's retry loop.
*/
private async probeSource(
source: PgSyncResolvedSource,
options: CanonicalPgSyncConfig
): Promise<void> {
const probeUrl = buildShapeProbeUrl(source.url, options)
const fetchFn = this.fetchFn ?? globalThis.fetch
let response: Response
try {
response = await fetchFn(probeUrl, {
signal: AbortSignal.timeout(this.probeTimeoutMs),
})
} catch (error) {
throw new PgSyncSourceValidationError(
`pgSync source at ${source.url} is unreachable: ${error instanceof Error ? error.message : String(error)}`
)
}
if (!response.ok) {
const body = (await response.text().catch(() => ``)).slice(0, 500)
throw new PgSyncSourceValidationError(
`pgSync source at ${source.url} rejected the shape request (${response.status})${body ? `: ${body}` : ``}`
)
}
// Electric answers 200 on paths that aren't the shape API (e.g. its
// root), so an ok status alone doesn't prove the URL is right. Real
// shape responses always carry the electric-handle header.
if (!response.headers.get(`electric-handle`)) {
const suggestion = new URL(source.url)
suggestion.pathname = `/v1/shape`
throw new PgSyncSourceValidationError(
`pgSync source at ${source.url} responded but is not a shape log (missing electric-handle header) — the Electric shape API is usually served at ${suggestion.origin}/v1/shape`
)
}
}

async stop(): Promise<void> {
Expand Down
Loading
Loading