From ed306c0020fd2dca2140447278c12aa9ad753ede Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Fri, 10 Apr 2026 22:07:54 -0600 Subject: [PATCH 1/7] fix(typescript-client): quarantine aborted responses --- packages/typescript-client/SPEC.md | 77 +++++++++-- packages/typescript-client/src/client.ts | 55 +++++++- packages/typescript-client/src/fetch.ts | 22 +++- .../typescript-client/test/stream.test.ts | 120 ++++++++++++++++++ 4 files changed, 260 insertions(+), 14 deletions(-) diff --git a/packages/typescript-client/SPEC.md b/packages/typescript-client/SPEC.md index 2e8836eaf4..3635f7712e 100644 --- a/packages/typescript-client/SPEC.md +++ b/packages/typescript-client/SPEC.md @@ -86,6 +86,38 @@ CDN, or proxy) — not from the server itself. `packages/sync-service/lib/electric/shape_cache/shape_status/shape_db/connection.ex` (`shapes_handle_idx`). +## Client Transport Assumptions + +Properties of the fetch/abort layer that must hold before a network result is +allowed to enter the state machine as a `response`, `messages`, or `sseClose` +event. + +### T0: Aborted requests are quarantined + +Once a request's `AbortSignal` is aborted, that request is no longer allowed to +deliver success metadata or message batches into the state machine, even if the +underlying runtime later resolves the fetch successfully. + +This is a client-side requirement, not a server assumption. Some runtimes and +transport stacks can surface a late successful response after the caller has +already aborted the request, especially around pause/resume, refresh/reconnect, +or desktop-app lifecycle edges. If that late response is processed, it can race +with a newer request generation and violate the state machine's preconditions, +e.g. by delivering a `response` event after the stream has already transitioned +into `ErrorState`. + +Operationally: + +- Aborted requests must be converted into an abort outcome before returning from + the fetch wrapper chain +- `#onInitialResponse`, `#onMessages`, and SSE close handling must only run for + the currently active, non-aborted request generation + +**Enforcement**: runtime checks in `createFetchWithBackoff` and +`createFetchWithConsumedMessages`, plus regression test +`should ignore successful responses that arrive after a paused request was aborted` +in `test/stream.test.ts`. + ## Invariants Properties that must hold after every state transition. Checked automatically by @@ -292,6 +324,23 @@ back to Live, SSE state resets to defaults. **Enforcement**: Dedicated test (`SSE state is preserved through LiveState self-transitions`). +### C9: Aborted requests must not emit state-machine events + +The state machine may ignore `response/messages/sseClose` while in `ErrorState` +or `PausedState` (C3), but aborted requests must not rely on that behavior for +correctness. A request aborted by pause/resume, system wake, visibility change, +or explicit refresh is part of an old request generation and must be discarded +before it can emit a late `response` event. + +Without this constraint, a late success from an aborted request can be processed +after a newer request has already failed and moved the stream into `ErrorState`, +producing `"Response was ignored by state \"error\""` warnings and silently +dropping fresh data until another restart. + +**Enforcement**: Dedicated regression test +(`should ignore successful responses that arrive after a paused request was aborted`) +plus runtime abort checks in the fetch wrapper chain. + ## Shape notification semantics The `Shape` class (`shape.ts`) wraps a `ShapeStream` and notifies subscribers @@ -368,6 +417,7 @@ observing an intermediate empty-rows notification. The | C6 | - | - | yes | | C7 | - | yes | yes | | C8 | - | - | yes | +| C9 | - | - | yes | ### Code -> Doc: Is each test derived from the spec? @@ -408,6 +458,14 @@ change the next request URL via state advancement or an explicit cache buster. This is enforced by the path-specific guards listed below. Live requests (`live=true`) legitimately reuse URLs. +### Invariant: aborted-request quarantine + +Any request generation that has been aborted must terminate as an abort before +it can feed metadata or messages into the state machine. This guard sits below +the state machine itself: it preserves the assumption that every delivered +`response/messages/sseClose` event belongs to the currently active request +generation. + ### Invariant: unconditional 409 cache buster Every code path that handles a 409 response must unconditionally call @@ -437,15 +495,16 @@ Six sites in `client.ts` recurse or loop to issue a new fetch: ### Guard mechanisms -| Guard | Scope | How it works | -| ----------------------------- | ----------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `#checkFastLoop` | Non-live `#requestShape` only | Detects N requests at same offset within a time window. First: clears caches + resets. Persistent: exponential backoff → throws FetchError(502). | -| `maxStaleCacheRetries` | Stale response path (L3) | State machine counts stale retries. After 3 consecutive stale responses, clears expired entry and attempts one self-healing retry. Throws FetchError(502) if self-healing also fails. | -| `#expiredShapeRecoveryKey` | Self-healing (L3 extension) | Records shape key after first self-healing attempt. Second exhaustion on same key skips self-healing → FetchError(502). Cleared on up-to-date. | -| `#maxSnapshotRetries` | Snapshot 409 path (L6) | Counts consecutive snapshot 409s. Unconditional cache buster on every retry. Throws FetchError(502) after 5. Runtime-enforced by `Shape #fetchSnapshotWithRetry 409 loop PBT` in `test/pbt-micro.test.ts`. | -| `#maxConsecutiveErrorRetries` | `#start` onError retry (L5) | Counts consecutive error retries. Sends error to subscribers and tears down after 50. Reset on successful message batch. | -| Pause lock | `#requestShape` entry | Returns immediately if paused. Prevents fetches during snapshots. | -| Up-to-date exit | `#requestShape` entry | Returns if `!subscribe` and `isUpToDate`. Breaks loop for one-shot syncs. | +| Guard | Scope | How it works | +| ----------------------------- | ----------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `#checkFastLoop` | Non-live `#requestShape` only | Detects N requests at same offset within a time window. First: clears caches + resets. Persistent: exponential backoff → throws FetchError(502). | +| `maxStaleCacheRetries` | Stale response path (L3) | State machine counts stale retries. After 3 consecutive stale responses, clears expired entry and attempts one self-healing retry. Throws FetchError(502) if self-healing also fails. | +| `#expiredShapeRecoveryKey` | Self-healing (L3 extension) | Records shape key after first self-healing attempt. Second exhaustion on same key skips self-healing → FetchError(502). Cleared on up-to-date. | +| `#maxSnapshotRetries` | Snapshot 409 path (L6) | Counts consecutive snapshot 409s. Unconditional cache buster on every retry. Throws FetchError(502) after 5. Runtime-enforced by `Shape #fetchSnapshotWithRetry 409 loop PBT` in `test/pbt-micro.test.ts`. | +| `#maxConsecutiveErrorRetries` | `#start` onError retry (L5) | Counts consecutive error retries. Sends error to subscribers and tears down after 50. Reset on successful message batch. | +| Abort-aware fetch wrappers | All request paths | `createFetchWithBackoff` and `createFetchWithConsumedMessages` re-check `signal.aborted` after fetch resolution and after body consumption, converting late successes into aborts before state-machine delivery. | +| Pause lock | `#requestShape` entry | Returns immediately if paused. Prevents fetches during snapshots. | +| Up-to-date exit | `#requestShape` entry | Returns if `!subscribe` and `isUpToDate`. Breaks loop for one-shot syncs. | ### Coverage gaps diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 7f8e052fe6..2206a3b36c 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -739,8 +739,20 @@ export class ShapeStream = Row> await this.#requestShape() } catch (err) { this.#error = err + const previousState = this.#syncState if (err instanceof Error) { this.#syncState = this.#syncState.toErrorState(err) + if (!(previousState instanceof ErrorState)) { + console.warn( + `[Electric] Entered error state. ` + + `${this.#formatStateDiagnostics(this.#syncState, { + previousState: previousState.kind, + errorName: err.name, + errorMessage: err.message, + })}`, + new Error(`stack trace`) + ) + } } // Check if onError handler wants to retry @@ -775,7 +787,8 @@ export class ShapeStream = Row> console.warn( `[Electric] onError retry loop exhausted after ${this.#maxConsecutiveErrorRetries} consecutive retries. ` + `The error was never resolved by the onError handler. ` + - `Error: ${err instanceof Error ? err.message : String(err)}`, + `Error: ${err instanceof Error ? err.message : String(err)}. ` + + `${this.#formatStateDiagnostics(this.#syncState)}`, new Error(`stack trace`) ) if (err instanceof Error) { @@ -824,6 +837,40 @@ export class ShapeStream = Row> this.#unsubscribeFromWakeDetection?.() } + #formatStateDiagnostics( + state: ShapeStreamState = this.#syncState, + extra: Record = {} + ): string { + const currentUrl = this.#currentFetchUrl?.toString() + const shapeKey = this.#currentFetchUrl + ? canonicalShapeKey(this.#currentFetchUrl) + : undefined + const expiredHandle = shapeKey + ? expiredShapesCache.getExpiredHandle(shapeKey) + : null + + const fields: Record = + { + state: state.kind, + handle: state.handle, + offset: state.offset, + cursor: state.liveCacheBuster, + replayCursor: state.replayCursor, + paused: this.#pauseLock.isPaused, + connected: this.#connected, + started: this.#started, + currentUrl, + shapeKey, + expiredHandle, + ...extra, + } + + return Object.entries(fields) + .filter(([, value]) => value !== undefined && value !== null) + .map(([key, value]) => `${key}=${JSON.stringify(value)}`) + .join(` `) + } + async #requestShape(requestShapeCacheBuster?: string): Promise { // ErrorState should never reach the request loop — re-throw so // #start's catch block can route it through onError properly. @@ -1358,7 +1405,11 @@ export class ShapeStream = Row> console.warn( `[Electric] Response was ignored by state "${this.#syncState.kind}". ` + `The response body will be skipped. ` + - `This may indicate a proxy/CDN caching issue or a client state machine bug.`, + `This may indicate a proxy/CDN caching issue or a client state machine bug. ` + + `${this.#formatStateDiagnostics(this.#syncState, { + responseHandle: shapeHandle, + responseStatus: status, + })}`, new Error(`stack trace`) ) return false diff --git a/packages/typescript-client/src/fetch.ts b/packages/typescript-client/src/fetch.ts index 0c6b02096d..5fe1ce1dbd 100644 --- a/packages/typescript-client/src/fetch.ts +++ b/packages/typescript-client/src/fetch.ts @@ -24,6 +24,20 @@ import { // want to retry const HTTP_RETRY_STATUS_CODES = [429] +/** + * Throws a FetchBackoffAbortError if the signal has been aborted. + * + * Used to guard against late-resolving fetches: a custom fetch client + * (or upstream wrapper) may resolve normally after its abort signal + * fires. Without this check, the late response can flow into the + * stream after its state machine has already moved on. + */ +function throwIfAborted(signal: AbortSignal | null | undefined): void { + if (signal?.aborted) { + throw new FetchBackoffAbortError() + } +} + export interface BackoffOptions { /** * Initial delay before retrying in milliseconds @@ -99,6 +113,7 @@ export function createFetchWithBackoff( while (true) { try { const result = await fetchClient(...args) + throwIfAborted(options?.signal) if (result.ok) { return result } @@ -172,18 +187,19 @@ const NO_BODY_STATUS_CODES = [201, 204, 205] export function createFetchWithConsumedMessages(fetchClient: typeof fetch) { return async (...args: Parameters): Promise => { const url = args[0] + const signal = args[1]?.signal const res = await fetchClient(...args) try { + throwIfAborted(signal) if (res.status < 200 || NO_BODY_STATUS_CODES.includes(res.status)) { return res } const text = await res.text() + throwIfAborted(signal) return new Response(text, res) } catch (err) { - if (args[1]?.signal?.aborted) { - throw new FetchBackoffAbortError() - } + throwIfAborted(signal) throw new FetchError( res.status, diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index 772bb63ecb..08a54aba93 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -712,6 +712,126 @@ describe(`ShapeStream`, () => { warnSpy.mockRestore() }) + it(`should ignore successful responses that arrive after a paused request was aborted`, async () => { + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + + let streamRequestCount = 0 + let resolveAbortedRequest: ((response: Response) => void) | null = null + let resolveResumedRequest: ((response: Response) => void) | null = null + let subscriberError: Error | null = null + + const fetchMock = vi.fn((input: RequestInfo | URL): Promise => { + const url = input.toString() + const isSnapshotRequest = url.includes(`subset__limit=`) + + if (isSnapshotRequest) { + return Promise.resolve( + new Response( + JSON.stringify({ + metadata: { + snapshot_mark: 1, + xmin: `0`, + xmax: `0`, + xip_list: [], + database_lsn: `0`, + }, + data: [], + }), + { + status: 200, + headers: { + 'electric-offset': `0_0`, + 'electric-handle': `snapshot-handle`, + 'electric-schema': `{}`, + }, + } + ) + ) + } + + streamRequestCount++ + + // The first request is the one that gets aborted by the snapshot + // pause. We deliberately ignore the abort signal and resolve later + // to simulate a custom fetch client (or upstream wrapper) returning + // a late success after the stream has already moved on. + if (streamRequestCount === 1) { + return new Promise((resolve) => { + resolveAbortedRequest = resolve + }) + } + + if (streamRequestCount === 2) { + return new Promise((resolve) => { + resolveResumedRequest = resolve + }) + } + + return Promise.resolve(Response.error()) + }) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: aborter.signal, + fetchClient: fetchMock, + log: `changes_only`, + onError: () => {}, + }) + + stream.subscribe( + () => {}, + (error) => { + subscriberError = error + } + ) + + await vi.waitFor(() => { + expect(streamRequestCount).toBe(1) + }) + + await stream.requestSnapshot({ limit: 1 }) + + await vi.waitFor(() => { + expect(streamRequestCount).toBe(2) + }) + + resolveResumedRequest!( + new Response(`Bad Request`, { + status: 400, + statusText: `Bad Request`, + }) + ) + + await vi.waitFor(() => { + expect(subscriberError).not.toBeNull() + }) + + resolveAbortedRequest!( + new Response( + JSON.stringify([{ headers: { control: `up-to-date` }, offset: `0_0` }]), + { + status: 200, + headers: { + 'electric-handle': `late-handle`, + 'electric-offset': `0_0`, + 'electric-schema': `{}`, + 'electric-up-to-date': ``, + }, + } + ) + ) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(warnSpy).not.toHaveBeenCalledWith( + expect.stringContaining(`Response was ignored by state "error"`), + expect.any(Error) + ) + + warnSpy.mockRestore() + }) + it(`onError retry loop should be bounded for persistent errors`, async () => { // Regression: onError always returning retry for a persistent error // caused an unbounded retry loop. The consecutive error retry limit From 59d8b89a943aeb94e7f785d09323f1441d85e2bf Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Sat, 11 Apr 2026 09:46:32 -0600 Subject: [PATCH 2/7] feat(typescript-client): add opt-in stream diagnostics --- packages/typescript-client/SPEC.md | 21 ++ .../skills/electric-debugging/SKILL.md | 8 + packages/typescript-client/src/client.ts | 297 +++++++++++++++++- .../typescript-client/test/stream.test.ts | 93 ++++++ 4 files changed, 403 insertions(+), 16 deletions(-) diff --git a/packages/typescript-client/SPEC.md b/packages/typescript-client/SPEC.md index 3635f7712e..178da3d301 100644 --- a/packages/typescript-client/SPEC.md +++ b/packages/typescript-client/SPEC.md @@ -118,6 +118,27 @@ Operationally: `should ignore successful responses that arrive after a paused request was aborted` in `test/stream.test.ts`. +## Operational Diagnostics + +Client-side diagnostics controls that exist to make field failures observable +without changing the state machine's behavior. + +### D0: Diagnostics are observational only + +Verbose diagnostics may be enabled at stream construction time via client-side +storage, for example: + +- `localStorage.setItem('electric.debug', 'true')` +- `localStorage.setItem('debug', 'electric*')` + +When enabled, the client may emit detailed request/response/state logs, but +those diagnostics must not alter fetch sequencing, state transitions, retry +eligibility, or message delivery semantics. + +**Enforcement**: diagnostics are implemented as logging-only hooks in +`client.ts`, and the request/state behavior remains covered by the existing +state-machine tests. + ## Invariants Properties that must hold after every state transition. Checked automatically by diff --git a/packages/typescript-client/skills/electric-debugging/SKILL.md b/packages/typescript-client/skills/electric-debugging/SKILL.md index 0e34793ac8..a3daefa6d9 100644 --- a/packages/typescript-client/skills/electric-debugging/SKILL.md +++ b/packages/typescript-client/skills/electric-debugging/SKILL.md @@ -28,6 +28,14 @@ This skill builds on electric-shapes and electric-proxy-auth. Read those first. ## Setup +For field debugging, you can enable verbose client diagnostics and refresh: + +```js +localStorage.setItem('electric.debug', 'true') +// or, for debug-package compatibility: +localStorage.setItem('debug', 'electric*') +``` + Enable debug logging to see retry and state machine behavior: ```ts diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 2206a3b36c..446b015988 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -95,11 +95,124 @@ const RESERVED_PARAMS: Set = new Set([ ]) const TROUBLESHOOTING_URL = `https://electric-sql.com/docs/guides/troubleshooting` +const SHAPE_STREAM_DEBUG_LOCAL_STORAGE_KEY = `electric.debug` +const DEBUG_NAMESPACES_LOCAL_STORAGE_KEY = `debug` +const SHAPE_STREAM_DEBUG_NAMESPACES = [ + `electric`, + `electric:*`, + `electric:shape-stream`, + `electric:shape-stream:*`, + `@electric-sql/client`, + `@electric-sql/client:*`, +] as const + +type DiagnosticValue = string | number | boolean | null | undefined + +type ShapeStreamDiagnosticsConfig = { + enabled: boolean + source?: string +} function createCacheBuster(): string { return `${Date.now()}-${Math.random().toString(36).substring(2, 9)}` } +function readLocalStorageItem(key: string): string | null { + if (typeof localStorage === `undefined`) return null + try { + return localStorage.getItem(key) + } catch { + return null + } +} + +function isTruthyFlag(value: string | null): boolean { + if (value === null) return false + + const normalized = value.trim().toLowerCase() + return ( + normalized !== `` && + normalized !== `0` && + normalized !== `false` && + normalized !== `off` && + normalized !== `no` + ) +} + +function escapeRegExp(value: string): string { + return value.replace(/[.*+?^${}()|[\]\\]/g, `\\$&`) +} + +function matchesDebugNamespace(pattern: string, namespace: string): boolean { + if (pattern === `*`) return true + + const regex = new RegExp(`^${escapeRegExp(pattern).replace(/\\\*/g, `.*`)}$`) + return regex.test(namespace) +} + +function isShapeStreamDebugNamespaceEnabled(value: string): boolean { + const patterns = value + .split(/[\s,]+/) + .map((part) => part.trim()) + .filter(Boolean) + + if (patterns.length === 0) return false + + const negativePatterns = patterns + .filter((pattern) => pattern.startsWith(`-`)) + .map((pattern) => pattern.slice(1)) + const positivePatterns = patterns.filter( + (pattern) => !pattern.startsWith(`-`) + ) + + const matchesAnyNamespace = (pattern: string) => + SHAPE_STREAM_DEBUG_NAMESPACES.some((namespace) => + matchesDebugNamespace(pattern, namespace) + ) + + if (negativePatterns.some(matchesAnyNamespace)) { + return false + } + + return positivePatterns.some(matchesAnyNamespace) +} + +function getShapeStreamDiagnosticsConfig(): ShapeStreamDiagnosticsConfig { + const explicitFlag = readLocalStorageItem( + SHAPE_STREAM_DEBUG_LOCAL_STORAGE_KEY + ) + if (isTruthyFlag(explicitFlag)) { + return { + enabled: true, + source: `localStorage[${JSON.stringify(SHAPE_STREAM_DEBUG_LOCAL_STORAGE_KEY)}]=${JSON.stringify(explicitFlag)}`, + } + } + + const debugNamespaces = readLocalStorageItem( + DEBUG_NAMESPACES_LOCAL_STORAGE_KEY + ) + if (debugNamespaces && isShapeStreamDebugNamespaceEnabled(debugNamespaces)) { + return { + enabled: true, + source: `localStorage[${JSON.stringify(DEBUG_NAMESPACES_LOCAL_STORAGE_KEY)}]=${JSON.stringify(debugNamespaces)}`, + } + } + + return { enabled: false } +} + +function describeDiagnosticValue(value: unknown): string | undefined { + if (value === undefined || value === null) return undefined + if (typeof value === `string`) return value + if (value instanceof Error) return `${value.name}: ${value.message}` + + try { + return JSON.stringify(value) + } catch { + return String(value) + } +} + type Replica = `full` | `default` export type LogMode = `changes_only` | `full` @@ -561,6 +674,13 @@ export function canonicalShapeKey(url: URL): string { * // Later... * aborter.abort() * ``` + * + * To enable verbose client diagnostics after a refresh: + * ``` + * localStorage.setItem('electric.debug', 'true') + * // or, for `debug` package compatibility: + * localStorage.setItem('debug', 'electric*') + * ``` */ export class ShapeStream = Row> @@ -628,10 +748,16 @@ export class ShapeStream = Row> #pendingSelfHealCheck: { shapeKey: string; staleHandle: string } | null = null #consecutiveErrorRetries = 0 #maxConsecutiveErrorRetries = 50 + readonly #debugEnabled: boolean + readonly #debugSource?: string + #requestSequence = 0 constructor(options: ShapeStreamOptions>) { this.options = { subscribe: true, ...options } validateOptions(this.options) + const diagnosticsConfig = getShapeStreamDiagnosticsConfig() + this.#debugEnabled = diagnosticsConfig.enabled + this.#debugSource = diagnosticsConfig.source this.#syncState = createInitialState({ offset: this.options.offset ?? `-1`, handle: this.options.handle, @@ -692,6 +818,7 @@ export class ShapeStream = Row> const backOffOpts = { ...(options.backoffOptions ?? BackoffDefaults), + debug: this.#debugEnabled || options.backoffOptions?.debug === true, onFailedAttempt: () => { this.#connected = false options.backoffOptions?.onFailedAttempt?.() @@ -709,6 +836,13 @@ export class ShapeStream = Row> this.#fetchClient = createFetchWithConsumedMessages(this.#sseFetchClient) this.#subscribeToVisibilityChanges() + + this.#debugLog(`diagnostics-enabled`, { + source: this.#debugSource, + logMode: this.#mode, + subscribe: this.options.subscribe, + liveSse: !!(this.options.liveSse ?? this.options.experimentalLiveSse), + }) } get shapeHandle() { @@ -734,11 +868,16 @@ export class ShapeStream = Row> async #start(): Promise { this.#started = true this.#subscribeToWakeDetection() + this.#debugLog(`stream:start`) try { await this.#requestShape() } catch (err) { this.#error = err + this.#debugLog(`stream:error-caught`, { + errorName: err instanceof Error ? err.name : undefined, + errorMessage: err instanceof Error ? err.message : String(err), + }) const previousState = this.#syncState if (err instanceof Error) { this.#syncState = this.#syncState.toErrorState(err) @@ -781,6 +920,15 @@ export class ShapeStream = Row> // Bound the onError retry loop to prevent unbounded retries this.#consecutiveErrorRetries++ + this.#debugLog(`onError:retry`, { + consecutiveErrorRetries: this.#consecutiveErrorRetries, + retryParamKeys: retryOpts.params + ? Object.keys(retryOpts.params).join(`,`) + : undefined, + retryHeaderKeys: retryOpts.headers + ? Object.keys(retryOpts.headers).join(`,`) + : undefined, + }) if ( this.#consecutiveErrorRetries > this.#maxConsecutiveErrorRetries ) { @@ -812,6 +960,10 @@ export class ShapeStream = Row> } // onError returned void, meaning it doesn't want to retry // This is an unrecoverable error, notify subscribers + this.#debugLog(`onError:stop`, { + retryRequested: false, + retryable: isRetryable, + }) if (err instanceof Error) { this.#sendErrorToSubscribers(err) } @@ -821,6 +973,9 @@ export class ShapeStream = Row> // No onError handler provided, this is an unrecoverable error // Notify subscribers and throw + this.#debugLog(`stream:throw`, { + hasOnErrorHandler: false, + }) if (err instanceof Error) { this.#sendErrorToSubscribers(err) } @@ -832,6 +987,7 @@ export class ShapeStream = Row> } #teardown() { + this.#debugLog(`stream:teardown`) this.#connected = false this.#tickPromiseRejecter?.() this.#unsubscribeFromWakeDetection?.() @@ -839,7 +995,7 @@ export class ShapeStream = Row> #formatStateDiagnostics( state: ShapeStreamState = this.#syncState, - extra: Record = {} + extra: Record = {} ): string { const currentUrl = this.#currentFetchUrl?.toString() const shapeKey = this.#currentFetchUrl @@ -871,6 +1027,17 @@ export class ShapeStream = Row> .join(` `) } + #debugLog(event: string, extra: Record = {}) { + if (!this.#debugEnabled) return + + console.debug( + `[Electric] Debug ${this.#formatStateDiagnostics(this.#syncState, { + event, + ...extra, + })}` + ) + } + async #requestShape(requestShapeCacheBuster?: string): Promise { // ErrorState should never reach the request loop — re-throw so // #start's catch block can route it through onError properly. @@ -885,6 +1052,9 @@ export class ShapeStream = Row> if (activeCacheBuster) { this.#pendingRequestShapeCacheBuster = activeCacheBuster } + this.#debugLog(`request:skipped-paused`, { + requestCacheBuster: activeCacheBuster, + }) return } @@ -892,6 +1062,10 @@ export class ShapeStream = Row> !this.options.subscribe && (this.options.signal?.aborted || this.#syncState.isUpToDate) ) { + this.#debugLog(`request:stopped`, { + signalAborted: this.options.signal?.aborted, + isUpToDate: this.#syncState.isUpToDate, + }) return } @@ -907,6 +1081,7 @@ export class ShapeStream = Row> if (this.#syncState instanceof PausedState) { resumingFromPause = true this.#syncState = this.#syncState.resume() + this.#debugLog(`request:resumed`) } const { url, signal } = this.options @@ -914,6 +1089,7 @@ export class ShapeStream = Row> url, resumingFromPause ) + const requestId = ++this.#requestSequence if (activeCacheBuster) { fetchUrl.searchParams.set(CACHE_BUSTER_QUERY_PARAM, activeCacheBuster) @@ -932,6 +1108,10 @@ export class ShapeStream = Row> if (activeCacheBuster) { this.#pendingRequestShapeCacheBuster = activeCacheBuster } + this.#debugLog(`request:cancelled-before-dispatch`, { + requestId, + fetchUrl: fetchUrl.toString(), + }) this.#requestAbortController = undefined return } @@ -940,6 +1120,7 @@ export class ShapeStream = Row> try { await this.#fetchShape({ + requestId, fetchUrl, requestAbortController, headers: requestHeaders, @@ -956,10 +1137,18 @@ export class ShapeStream = Row> (e instanceof FetchError || e instanceof FetchBackoffAbortError) && isRestartAbort ) { + this.#debugLog(`request:restart-after-abort`, { + requestId, + abortReason: describeDiagnosticValue(abortReason), + }) return this.#requestShape() } if (e instanceof FetchBackoffAbortError) { + this.#debugLog(`request:aborted`, { + requestId, + abortReason: describeDiagnosticValue(abortReason), + }) return // interrupted } @@ -969,12 +1158,20 @@ export class ShapeStream = Row> // #staleCacheBuster set to bypass CDN cache on next request. // 2. Self-healing: stale retries exhausted, expired entry cleared, // stream reset — retry without expired_handle param. + this.#debugLog(`request:stale-cache-retry`, { + requestId, + errorMessage: e.message, + }) return this.#requestShape() } if (!(e instanceof FetchError)) throw e // should never happen if (e.status == 409) { + this.#debugLog(`request:must-refetch`, { + requestId, + responseStatus: e.status, + }) // Upon receiving a 409, start from scratch with the newly // provided shape handle (if present). An unconditional cache // buster ensures the retry URL is always unique regardless of @@ -1009,6 +1206,12 @@ export class ShapeStream = Row> // additional user input, such as 400s or failures to read the // body of a response, so we exit the loop and let #start handle it // Note: We don't notify subscribers here because onError might recover + this.#debugLog(`request:failed`, { + requestId, + errorName: e.name, + errorMessage: e.message, + responseStatus: e.status, + }) throw e } } finally { @@ -1264,6 +1467,9 @@ export class ShapeStream = Row> // If user provided a signal, listen to it and pass on the reason for the abort if (signal) { const abortListener = () => { + this.#debugLog(`request:abort-signal`, { + abortReason: describeDiagnosticValue(signal.reason), + }) this.#requestAbortController?.abort(signal.reason) } @@ -1271,6 +1477,9 @@ export class ShapeStream = Row> if (signal.aborted) { // If the signal is already aborted, abort the request immediately + this.#debugLog(`request:abort-signal-already-aborted`, { + abortReason: describeDiagnosticValue(signal.reason), + }) this.#requestAbortController?.abort(signal.reason) } @@ -1284,9 +1493,16 @@ export class ShapeStream = Row> * or `false` if the response was ignored (stale) and the body should be skipped. * Throws on stale-retry (to trigger a retry with cache buster). */ - async #onInitialResponse(response: Response): Promise { + async #onInitialResponse( + response: Response, + requestId?: number + ): Promise { const { headers, status } = response const shapeHandle = headers.get(SHAPE_HANDLE_HEADER) + const responseOffset = headers.get( + CHUNK_LAST_OFFSET_HEADER + ) as Offset | null + const responseCursor = headers.get(LIVE_CACHE_BUSTER_HEADER) const shapeKey = this.#currentFetchUrl ? canonicalShapeKey(this.#currentFetchUrl) : null @@ -1316,8 +1532,8 @@ export class ShapeStream = Row> const transition = this.#syncState.handleResponseMetadata({ status, responseHandle: shapeHandle, - responseOffset: headers.get(CHUNK_LAST_OFFSET_HEADER) as Offset | null, - responseCursor: headers.get(LIVE_CACHE_BUSTER_HEADER), + responseOffset, + responseCursor, responseSchema: getSchemaFromHeaders(headers), expiredHandle, now: Date.now(), @@ -1326,6 +1542,15 @@ export class ShapeStream = Row> }) this.#syncState = transition.state + this.#debugLog(`response:headers`, { + requestId, + action: transition.action, + responseStatus: status, + responseHandle: shapeHandle, + responseOffset, + responseCursor, + expiredHandle, + }) // Clear recovery guard on 204 (no-content), since the empty body means // #onMessages won't run to clear it via the up-to-date path. @@ -1418,7 +1643,11 @@ export class ShapeStream = Row> return true } - async #onMessages(batch: Array>, isSseMessage = false) { + async #onMessages( + batch: Array>, + isSseMessage = false, + requestId?: number + ) { if (!Array.isArray(batch)) { console.warn( `[Electric] #onMessages called with non-array argument (${typeof batch}). ` + @@ -1470,6 +1699,15 @@ export class ShapeStream = Row> return true // Always process control messages }) + this.#debugLog(`messages:batch`, { + requestId, + transport: isSseMessage ? `sse` : `long-poll`, + batchSize: batch.length, + publishedCount: messagesToProcess.length, + hasUpToDateMessage, + upToDateOffset, + }) + await this.#publish(messagesToProcess) } @@ -1481,6 +1719,7 @@ export class ShapeStream = Row> * @returns A promise that resolves when the request is complete (i.e. the long poll receives a response or the SSE connection is closed). */ async #fetchShape(opts: { + requestId: number fetchUrl: URL requestAbortController: AbortController headers: Record @@ -1502,13 +1741,19 @@ export class ShapeStream = Row> } const useSse = this.options.liveSse ?? this.options.experimentalLiveSse - if ( - this.#syncState.shouldUseSse({ - liveSseEnabled: !!useSse, - isRefreshing: this.#isRefreshing, - resumingFromPause: !!opts.resumingFromPause, - }) - ) { + const shouldUseSse = this.#syncState.shouldUseSse({ + liveSseEnabled: !!useSse, + isRefreshing: this.#isRefreshing, + resumingFromPause: !!opts.resumingFromPause, + }) + this.#debugLog(`request:dispatch`, { + requestId: opts.requestId, + transport: shouldUseSse ? `sse` : `long-poll`, + fetchUrl: opts.fetchUrl.toString(), + resumingFromPause: !!opts.resumingFromPause, + isRefreshing: this.#isRefreshing, + }) + if (shouldUseSse) { opts.fetchUrl.searchParams.set(EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, `true`) opts.fetchUrl.searchParams.set(LIVE_SSE_QUERY_PARAM, `true`) return this.#requestShapeSSE(opts) @@ -1518,6 +1763,7 @@ export class ShapeStream = Row> } async #requestShapeLongPoll(opts: { + requestId: number fetchUrl: URL requestAbortController: AbortController headers: Record @@ -1529,7 +1775,10 @@ export class ShapeStream = Row> }) this.#connected = true - const shouldProcessBody = await this.#onInitialResponse(response) + const shouldProcessBody = await this.#onInitialResponse( + response, + opts.requestId + ) if (!shouldProcessBody) return const schema = this.#syncState.schema! // we know that it is not undefined because it is set by `this.#onInitialResponse` @@ -1550,10 +1799,11 @@ export class ShapeStream = Row> ) } - await this.#onMessages(batch) + await this.#onMessages(batch, false, opts.requestId) } async #requestShapeSSE(opts: { + requestId: number fetchUrl: URL requestAbortController: AbortController headers: Record @@ -1578,7 +1828,10 @@ export class ShapeStream = Row> fetch, onopen: async (response: Response) => { this.#connected = true - const shouldProcessBody = await this.#onInitialResponse(response) + const shouldProcessBody = await this.#onInitialResponse( + response, + opts.requestId + ) if (!shouldProcessBody) { ignoredStaleResponse = true throw new Error(`stale response ignored`) @@ -1597,7 +1850,7 @@ export class ShapeStream = Row> if (isUpToDateMessage(message)) { // Flush the buffer on up-to-date message. // Ensures that we only process complete batches of operations. - this.#onMessages(buffer, true) + this.#onMessages(buffer, true, opts.requestId) buffer = [] } } @@ -1645,6 +1898,13 @@ export class ShapeStream = Row> maxShortConnections: this.#maxShortSseConnections, }) this.#syncState = transition.state + this.#debugLog(`sse:closed`, { + requestId: opts.requestId, + connectionDuration, + wasAborted, + fellBackToLongPolling: transition.fellBackToLongPolling, + wasShortConnection: transition.wasShortConnection, + }) if (transition.fellBackToLongPolling) { console.warn( @@ -1747,6 +2007,7 @@ export class ShapeStream = Row> */ async forceDisconnectAndRefresh(): Promise { this.#refreshCount++ + this.#debugLog(`stream:force-refresh`) try { if ( this.#syncState.isUpToDate && @@ -1802,8 +2063,10 @@ export class ShapeStream = Row> if (this.#hasBrowserVisibilityAPI()) { const visibilityHandler = () => { if (document.hidden) { + this.#debugLog(`pause:visibility`, { hidden: true }) this.#pauseLock.acquire(`visibility`) } else { + this.#debugLog(`pause:visibility`, { hidden: false }) this.#pauseLock.release(`visibility`) } } @@ -1846,6 +2109,7 @@ export class ShapeStream = Row> if (elapsed > INTERVAL_MS + WAKE_THRESHOLD_MS) { if (!this.#pauseLock.isPaused && this.#requestAbortController) { + this.#debugLog(`wake:detected`, { elapsedMs: elapsed }) this.#refreshCount++ this.#requestAbortController.abort(SYSTEM_WAKE) // Wake handler is synchronous (setInterval callback) so we can't @@ -1876,6 +2140,7 @@ export class ShapeStream = Row> * shape handle */ #reset(handle?: string) { + this.#debugLog(`stream:reset`, { nextHandle: handle }) this.#syncState = this.#syncState.markMustRefetch(handle) this.#connected = false // releaseAllMatching intentionally doesn't fire onReleased — every caller diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index 08a54aba93..ee7746c325 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -128,6 +128,99 @@ describe(`ShapeStream`, () => { await startedStreaming }) + it(`should enable verbose diagnostics with localStorage electric.debug`, async () => { + localStorage.setItem(`electric.debug`, `true`) + const debugSpy = vi.spyOn(console, `debug`).mockImplementation(() => {}) + + const fetchMock = vi.fn(() => + Promise.resolve( + new Response( + JSON.stringify([ + { headers: { control: `up-to-date` }, offset: `0_0` }, + ]), + { + status: 200, + headers: { + 'electric-handle': `test-handle`, + 'electric-offset': `0_0`, + 'electric-schema': `{}`, + }, + } + ) + ) + ) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `foo` }, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + }) + + stream.subscribe(() => {}) + + await vi.waitFor(() => { + expect(stream.isUpToDate).toBe(true) + }) + + expect(debugSpy).toHaveBeenCalledWith( + expect.stringContaining(`event="diagnostics-enabled"`) + ) + expect(debugSpy).toHaveBeenCalledWith( + expect.stringContaining(`event="request:dispatch"`) + ) + expect(debugSpy).toHaveBeenCalledWith( + expect.stringContaining(`event="messages:batch"`) + ) + + debugSpy.mockRestore() + }) + + it(`should enable verbose diagnostics with localStorage debug namespaces`, async () => { + localStorage.setItem(`debug`, `electric*`) + const debugSpy = vi.spyOn(console, `debug`).mockImplementation(() => {}) + + const fetchMock = vi.fn(() => + Promise.resolve( + new Response( + JSON.stringify([ + { headers: { control: `up-to-date` }, offset: `0_0` }, + ]), + { + status: 200, + headers: { + 'electric-handle': `test-handle`, + 'electric-offset': `0_0`, + 'electric-schema': `{}`, + }, + } + ) + ) + ) + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `foo` }, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + }) + + stream.subscribe(() => {}) + + await vi.waitFor(() => { + expect(stream.isUpToDate).toBe(true) + }) + + expect(debugSpy).toHaveBeenCalledWith( + expect.stringContaining(`event="diagnostics-enabled"`) + ) + expect(debugSpy).toHaveBeenCalledWith(expect.stringContaining(`electric*`)) + + debugSpy.mockRestore() + }) + it(`should correctly serialize objects into query params`, async () => { const eventTarget = new EventTarget() const requestedUrls: Array = [] From 688e96b8a840f2c0741ca58ac63383c9f3ed44f6 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Sat, 11 Apr 2026 16:05:33 -0600 Subject: [PATCH 3/7] fix(typescript-client): rate-limit verbose diagnostics --- .../skills/electric-debugging/SKILL.md | 5 +++ packages/typescript-client/src/client.ts | 44 +++++++++++++++++++ .../typescript-client/test/stream.test.ts | 11 +++++ 3 files changed, 60 insertions(+) diff --git a/packages/typescript-client/skills/electric-debugging/SKILL.md b/packages/typescript-client/skills/electric-debugging/SKILL.md index a3daefa6d9..87a29aa966 100644 --- a/packages/typescript-client/skills/electric-debugging/SKILL.md +++ b/packages/typescript-client/skills/electric-debugging/SKILL.md @@ -36,6 +36,11 @@ localStorage.setItem('electric.debug', 'true') localStorage.setItem('debug', 'electric*') ``` +When enabled, the client prints one visible `console.info` line confirming that +diagnostics are active. Detailed per-request diagnostics are emitted at +`console.debug` / `Verbose` level and are rate-limited to avoid overwhelming a +tight-looping runtime. + Enable debug logging to see retry and state machine behavior: ```ts diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 446b015988..3fd7ad0c43 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -105,6 +105,8 @@ const SHAPE_STREAM_DEBUG_NAMESPACES = [ `@electric-sql/client`, `@electric-sql/client:*`, ] as const +const SHAPE_STREAM_DEBUG_MAX_LOGS_PER_WINDOW = 50 +const SHAPE_STREAM_DEBUG_WINDOW_MS = 1_000 type DiagnosticValue = string | number | boolean | null | undefined @@ -751,6 +753,9 @@ export class ShapeStream = Row> readonly #debugEnabled: boolean readonly #debugSource?: string #requestSequence = 0 + #debugWindowStartedAt = 0 + #debugLogsInWindow = 0 + #debugLogsSuppressed = 0 constructor(options: ShapeStreamOptions>) { this.options = { subscribe: true, ...options } @@ -837,6 +842,7 @@ export class ShapeStream = Row> this.#subscribeToVisibilityChanges() + this.#announceDiagnosticsMode() this.#debugLog(`diagnostics-enabled`, { source: this.#debugSource, logMode: this.#mode, @@ -1030,6 +1036,23 @@ export class ShapeStream = Row> #debugLog(event: string, extra: Record = {}) { if (!this.#debugEnabled) return + const now = Date.now() + + if ( + this.#debugWindowStartedAt === 0 || + now - this.#debugWindowStartedAt >= SHAPE_STREAM_DEBUG_WINDOW_MS + ) { + this.#flushSuppressedDebugLogs() + this.#debugWindowStartedAt = now + this.#debugLogsInWindow = 0 + } + + if (this.#debugLogsInWindow >= SHAPE_STREAM_DEBUG_MAX_LOGS_PER_WINDOW) { + this.#debugLogsSuppressed++ + return + } + + this.#debugLogsInWindow++ console.debug( `[Electric] Debug ${this.#formatStateDiagnostics(this.#syncState, { event, @@ -1038,6 +1061,27 @@ export class ShapeStream = Row> ) } + #announceDiagnosticsMode() { + if (!this.#debugEnabled) return + + console.info( + `[Electric] ShapeStream diagnostics enabled` + + (this.#debugSource ? ` from ${this.#debugSource}.` : `.`) + + ` Detailed per-request logs use console.debug / Verbose level in DevTools. ` + + `Verbose logs are rate-limited to ${SHAPE_STREAM_DEBUG_MAX_LOGS_PER_WINDOW} per ${SHAPE_STREAM_DEBUG_WINDOW_MS}ms to avoid overwhelming the runtime.` + ) + } + + #flushSuppressedDebugLogs() { + if (!this.#debugEnabled || this.#debugLogsSuppressed === 0) return + + console.info( + `[Electric] ShapeStream diagnostics suppressed ${this.#debugLogsSuppressed} verbose logs in the last ${SHAPE_STREAM_DEBUG_WINDOW_MS}ms. ` + + `The stream is likely in a tight loop or repeated error path.` + ) + this.#debugLogsSuppressed = 0 + } + async #requestShape(requestShapeCacheBuster?: string): Promise { // ErrorState should never reach the request loop — re-throw so // #start's catch block can route it through onError properly. diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index ee7746c325..243ac71884 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -131,6 +131,7 @@ describe(`ShapeStream`, () => { it(`should enable verbose diagnostics with localStorage electric.debug`, async () => { localStorage.setItem(`electric.debug`, `true`) const debugSpy = vi.spyOn(console, `debug`).mockImplementation(() => {}) + const infoSpy = vi.spyOn(console, `info`).mockImplementation(() => {}) const fetchMock = vi.fn(() => Promise.resolve( @@ -173,13 +174,19 @@ describe(`ShapeStream`, () => { expect(debugSpy).toHaveBeenCalledWith( expect.stringContaining(`event="messages:batch"`) ) + expect(infoSpy).toHaveBeenCalledWith( + expect.stringContaining(`ShapeStream diagnostics enabled`) + ) + expect(infoSpy).toHaveBeenCalledWith(expect.stringContaining(`Verbose`)) debugSpy.mockRestore() + infoSpy.mockRestore() }) it(`should enable verbose diagnostics with localStorage debug namespaces`, async () => { localStorage.setItem(`debug`, `electric*`) const debugSpy = vi.spyOn(console, `debug`).mockImplementation(() => {}) + const infoSpy = vi.spyOn(console, `info`).mockImplementation(() => {}) const fetchMock = vi.fn(() => Promise.resolve( @@ -217,8 +224,12 @@ describe(`ShapeStream`, () => { expect.stringContaining(`event="diagnostics-enabled"`) ) expect(debugSpy).toHaveBeenCalledWith(expect.stringContaining(`electric*`)) + expect(infoSpy).toHaveBeenCalledWith( + expect.stringContaining(`ShapeStream diagnostics enabled`) + ) debugSpy.mockRestore() + infoSpy.mockRestore() }) it(`should correctly serialize objects into query params`, async () => { From 1ceb6a48aabc0ce2d2fddf2fd9e57c10013e6825 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Sat, 11 Apr 2026 16:14:34 -0600 Subject: [PATCH 4/7] fix(typescript-client): throttle repeated stream warnings --- packages/typescript-client/src/client.ts | 47 ++++++++++++++++--- .../typescript-client/test/stream.test.ts | 4 ++ 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 3fd7ad0c43..507af52239 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -107,6 +107,8 @@ const SHAPE_STREAM_DEBUG_NAMESPACES = [ ] as const const SHAPE_STREAM_DEBUG_MAX_LOGS_PER_WINDOW = 50 const SHAPE_STREAM_DEBUG_WINDOW_MS = 1_000 +const SHAPE_STREAM_WARNING_MAX_LOGS_PER_WINDOW = 3 +const SHAPE_STREAM_WARNING_WINDOW_MS = 5_000 type DiagnosticValue = string | number | boolean | null | undefined @@ -756,6 +758,10 @@ export class ShapeStream = Row> #debugWindowStartedAt = 0 #debugLogsInWindow = 0 #debugLogsSuppressed = 0 + #warningWindows = new Map< + string, + { startedAt: number; emitted: number; suppressed: number } + >() constructor(options: ShapeStreamOptions>) { this.options = { subscribe: true, ...options } @@ -888,14 +894,14 @@ export class ShapeStream = Row> if (err instanceof Error) { this.#syncState = this.#syncState.toErrorState(err) if (!(previousState instanceof ErrorState)) { - console.warn( + this.#warnWithRateLimit( + `entered-error-state`, `[Electric] Entered error state. ` + `${this.#formatStateDiagnostics(this.#syncState, { previousState: previousState.kind, errorName: err.name, errorMessage: err.message, - })}`, - new Error(`stack trace`) + })}` ) } } @@ -1082,6 +1088,35 @@ export class ShapeStream = Row> this.#debugLogsSuppressed = 0 } + #warnWithRateLimit(key: string, message: string) { + const now = Date.now() + const window = this.#warningWindows.get(key) + + if (!window || now - window.startedAt >= SHAPE_STREAM_WARNING_WINDOW_MS) { + if (window && window.suppressed > 0) { + console.warn( + `[Electric] Suppressed ${window.suppressed} repeated "${key}" warnings in the last ${SHAPE_STREAM_WARNING_WINDOW_MS}ms.` + ) + } + + this.#warningWindows.set(key, { + startedAt: now, + emitted: 1, + suppressed: 0, + }) + console.warn(message, new Error(`stack trace`)) + return + } + + if (window.emitted < SHAPE_STREAM_WARNING_MAX_LOGS_PER_WINDOW) { + window.emitted++ + console.warn(message, new Error(`stack trace`)) + return + } + + window.suppressed++ + } + async #requestShape(requestShapeCacheBuster?: string): Promise { // ErrorState should never reach the request loop — re-throw so // #start's catch block can route it through onError properly. @@ -1671,15 +1706,15 @@ export class ShapeStream = Row> } if (transition.action === `ignored`) { - console.warn( + this.#warnWithRateLimit( + `ignored-response-${this.#syncState.kind}`, `[Electric] Response was ignored by state "${this.#syncState.kind}". ` + `The response body will be skipped. ` + `This may indicate a proxy/CDN caching issue or a client state machine bug. ` + `${this.#formatStateDiagnostics(this.#syncState, { responseHandle: shapeHandle, responseStatus: status, - })}`, - new Error(`stack trace`) + })}` ) return false } diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index 243ac71884..b7498a8e9c 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -941,6 +941,7 @@ describe(`ShapeStream`, () => { // caused an unbounded retry loop. The consecutive error retry limit // ensures the loop terminates. let requestCount = 0 + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) // First request succeeds → LiveState. All subsequent → persistent 400. const fetchMock = vi.fn(async () => { @@ -1000,6 +1001,9 @@ describe(`ShapeStream`, () => { expect(subscriberError).not.toBeNull() // 1 initial success + ~51 retries (limit fires at >50) expect(requestCount).toBeLessThan(100) + expect(warnSpy.mock.calls.length).toBeLessThanOrEqual(5) + + warnSpy.mockRestore() }) it(`onError retry counter resets after successful data`, async () => { From 66a7e34d2a47b50cb114ef68f2f3b7ba8099f808 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Sat, 11 Apr 2026 16:36:36 -0600 Subject: [PATCH 5/7] fix(typescript-client): reduce onError retry churn --- packages/typescript-client/src/client.ts | 9 ++++- .../test/model-based.test.ts | 8 +++- .../typescript-client/test/stream.test.ts | 38 +++++++++++-------- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 507af52239..0b377bc7ba 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -751,7 +751,7 @@ export class ShapeStream = Row> #expiredShapeRecoveryKey: string | null = null #pendingSelfHealCheck: { shapeKey: string; staleHandle: string } | null = null #consecutiveErrorRetries = 0 - #maxConsecutiveErrorRetries = 50 + #maxConsecutiveErrorRetries = 3 readonly #debugEnabled: boolean readonly #debugSource?: string #requestSequence = 0 @@ -959,6 +959,13 @@ export class ShapeStream = Row> } // Clear the error since we're retrying + console.log( + `[Electric] onError requested retry. Restarting stream from current offset. ` + + `${this.#formatStateDiagnostics(this.#syncState, { + consecutiveErrorRetries: this.#consecutiveErrorRetries, + })}`, + err + ) this.#error = null if (this.#syncState instanceof ErrorState) { this.#syncState = this.#syncState.retry() diff --git a/packages/typescript-client/test/model-based.test.ts b/packages/typescript-client/test/model-based.test.ts index 19f2cc6ac3..0be963d790 100644 --- a/packages/typescript-client/test/model-based.test.ts +++ b/packages/typescript-client/test/model-based.test.ts @@ -544,7 +544,7 @@ interface StreamModel { terminated: boolean } -const MAX_CONSECUTIVE_ERROR_RETRIES = 50 +const MAX_CONSECUTIVE_ERROR_RETRIES = 3 const MAX_URL_LENGTH = 2000 /** @@ -679,7 +679,7 @@ async function runRetryableErrorResponse( } } -/** 400 Bad Request — counter increments, may terminate at >50 */ +/** 400 Bad Request — counter increments, may terminate at >3 */ class Respond400Cmd implements fc.AsyncCommand { check(m: Readonly): boolean { return !m.terminated @@ -943,6 +943,9 @@ class Respond404Cmd implements fc.AsyncCommand { /** * 409 with the same handle as the current one. + * The retry URL must still change via cache-buster, and the next + * successful response must carry a fresh handle to avoid simulating + * a stale cached replay forever. */ class Respond409SameHandleCmd implements fc.AsyncCommand @@ -953,6 +956,7 @@ class Respond409SameHandleCmd async run(_m: StreamModel, r: StreamReal): Promise { const prevUrl = r.gate.lastUrl await r.respond(make409(r.currentHandle)) + r.currentHandle = `handle-recovered-${nextSeq()}` assert409ProducedUniqueUrl(r, prevUrl) } toString(): string { diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index b7498a8e9c..a9ac2a8a16 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -942,6 +942,7 @@ describe(`ShapeStream`, () => { // ensures the loop terminates. let requestCount = 0 const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + const logSpy = vi.spyOn(console, `log`).mockImplementation(() => {}) // First request succeeds → LiveState. All subsequent → persistent 400. const fetchMock = vi.fn(async () => { @@ -999,11 +1000,16 @@ describe(`ShapeStream`, () => { expect(lastError).not.toBeNull() expect(subscriberError).not.toBeNull() - // 1 initial success + ~51 retries (limit fires at >50) - expect(requestCount).toBeLessThan(100) + // 1 initial success + 4 failing requests (limit fires at >3) + expect(requestCount).toBeLessThan(10) expect(warnSpy.mock.calls.length).toBeLessThanOrEqual(5) + expect(logSpy).toHaveBeenCalledWith( + expect.stringContaining(`onError requested retry. Restarting stream`), + expect.any(Error) + ) warnSpy.mockRestore() + logSpy.mockRestore() }) it(`onError retry counter resets after successful data`, async () => { @@ -1024,7 +1030,7 @@ describe(`ShapeStream`, () => { requestCount++ - if (phase.current === `errors1` && requestCount <= 30) { + if (phase.current === `errors1` && requestCount <= 2) { return new Response(`Bad Request`, { status: 400, statusText: `Bad Request`, @@ -1053,7 +1059,7 @@ describe(`ShapeStream`, () => { } ) } - if (phase.current === `errors2` && requestCount <= 30) { + if (phase.current === `errors2` && requestCount <= 2) { return new Response(`Bad Request`, { status: 400, statusText: `Bad Request`, @@ -1105,9 +1111,9 @@ describe(`ShapeStream`, () => { { timeout: 10_000 } ) - // Stream survived 60 total errors (2 bursts of 30) because the + // Stream survived 4 total errors (2 bursts of 2) because the // counter reset between bursts. Without the reset, the cumulative - // count would hit 50 and kill the stream during the first burst. + // count would hit 4 and kill the stream during the second burst. expect(subscriberError).toBeNull() }) @@ -1118,8 +1124,8 @@ describe(`ShapeStream`, () => { let requestCount = 0 // Pattern: initial 200 → LiveState, then alternating bursts of - // 10 errors and 204 successes, repeated 6 times. - // Total errors: 60 (exceeds the 50 cap if counter doesn't reset). + // 2 errors and 204 successes, repeated 4 times. + // Total errors: 8 (exceeds the cap of 3 if the counter doesn't reset). const fetchMock = vi.fn(async () => { await new Promise((r) => setTimeout(r, 0)) requestCount++ @@ -1143,9 +1149,9 @@ describe(`ShapeStream`, () => { ) } - // After initial success: cycle through 10 errors then 1 x 204, repeat - const cyclePos = (requestCount - 2) % 11 // 0-9 = errors, 10 = 204 - if (cyclePos < 10) { + // After initial success: cycle through 2 errors then 1 x 204, repeat + const cyclePos = (requestCount - 2) % 3 // 0-1 = errors, 2 = 204 + if (cyclePos < 2) { return new Response(`Bad Request`, { status: 400, statusText: `Bad Request`, @@ -1181,18 +1187,18 @@ describe(`ShapeStream`, () => { } ) - // Wait long enough for 60+ errors across 6 cycles + // Wait long enough for 8+ errors across 4 cycles await vi.waitFor( () => { - expect(requestCount).toBeGreaterThan(60) + expect(requestCount).toBeGreaterThan(10) }, { timeout: 10_000 } ) aborter.abort() - // If counter resets on 204, the stream survives 60+ total errors. - // If it doesn't, the counter hits 50 and tears down the stream. + // If counter resets on 204, the stream survives 8+ total errors. + // If it doesn't, the counter hits 4 and tears down the stream. expect(subscriberError).toBeNull() }) @@ -1264,6 +1270,6 @@ describe(`ShapeStream`, () => { // If the counter resets on accepted headers (before parse), this // assertion fails because the stream loops forever. expect(subscriberError).not.toBeNull() - expect(requestCount).toBeLessThan(200) + expect(requestCount).toBeLessThan(10) }) }) From 3b03b86d35d1ed0da2253aa165df639b9d978262 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Sat, 11 Apr 2026 17:07:00 -0600 Subject: [PATCH 6/7] feat(typescript-client): add generation diagnostics --- packages/typescript-client/src/client.ts | 235 ++++++++++++++++++++--- 1 file changed, 208 insertions(+), 27 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 0b377bc7ba..01afdc24e5 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -108,10 +108,21 @@ const SHAPE_STREAM_DEBUG_NAMESPACES = [ const SHAPE_STREAM_DEBUG_MAX_LOGS_PER_WINDOW = 50 const SHAPE_STREAM_DEBUG_WINDOW_MS = 1_000 const SHAPE_STREAM_WARNING_MAX_LOGS_PER_WINDOW = 3 +const SHAPE_STREAM_IGNORED_RESPONSE_WARNING_MAX_LOGS_PER_WINDOW = 1 const SHAPE_STREAM_WARNING_WINDOW_MS = 5_000 type DiagnosticValue = string | number | boolean | null | undefined +type ErrorDiagnosticContext = { + generationId: number + requestId?: number + transport?: `long-poll` | `sse` + errorName: string + errorMessage: string + responseStatus?: number + capturedAt: number +} + type ShapeStreamDiagnosticsConfig = { enabled: boolean source?: string @@ -754,7 +765,11 @@ export class ShapeStream = Row> #maxConsecutiveErrorRetries = 3 readonly #debugEnabled: boolean readonly #debugSource?: string + #streamGenerationSequence = 0 + #activeGenerationId = 0 #requestSequence = 0 + #lastErrorContext: ErrorDiagnosticContext | null = null + #pendingErrorContext: ErrorDiagnosticContext | null = null #debugWindowStartedAt = 0 #debugLogsInWindow = 0 #debugLogsSuppressed = 0 @@ -878,18 +893,54 @@ export class ShapeStream = Row> } async #start(): Promise { + const generationId = ++this.#streamGenerationSequence + this.#activeGenerationId = generationId + this.#pendingErrorContext = null this.#started = true this.#subscribeToWakeDetection() - this.#debugLog(`stream:start`) + this.#debugLog(`stream:start`, { generationId }) try { - await this.#requestShape() + await this.#requestShape(generationId) } catch (err) { this.#error = err + const errorRequestId = + this.#pendingErrorContext === null + ? this.#lastErrorContext?.requestId + : (this.#pendingErrorContext as ErrorDiagnosticContext).requestId + const errorTransport = + this.#pendingErrorContext === null + ? this.#lastErrorContext?.transport + : (this.#pendingErrorContext as ErrorDiagnosticContext).transport + this.#lastErrorContext = + err instanceof Error + ? (this.#pendingErrorContext ?? { + generationId, + errorName: err.name, + errorMessage: err.message, + capturedAt: Date.now(), + }) + : null this.#debugLog(`stream:error-caught`, { + generationId, + errorRequestId, + errorTransport, + isActiveGeneration: generationId === this.#activeGenerationId, errorName: err instanceof Error ? err.name : undefined, errorMessage: err instanceof Error ? err.message : String(err), }) + if (generationId !== this.#activeGenerationId) { + this.#warnWithRateLimit( + `stale-generation-error`, + `[Electric] A stale request generation surfaced an error after a newer generation was already active. ` + + `This may indicate overlapping request generations or late async work escaping quarantine. ` + + `${this.#formatStateDiagnostics(this.#syncState, { + errorGeneration: generationId, + errorRequestId, + errorTransport, + })}` + ) + } const previousState = this.#syncState if (err instanceof Error) { this.#syncState = this.#syncState.toErrorState(err) @@ -898,6 +949,11 @@ export class ShapeStream = Row> `entered-error-state`, `[Electric] Entered error state. ` + `${this.#formatStateDiagnostics(this.#syncState, { + errorGeneration: generationId, + errorRequestId, + errorTransport, + isActiveGenerationError: + generationId === this.#activeGenerationId, previousState: previousState.kind, errorName: err.name, errorMessage: err.message, @@ -933,6 +989,7 @@ export class ShapeStream = Row> // Bound the onError retry loop to prevent unbounded retries this.#consecutiveErrorRetries++ this.#debugLog(`onError:retry`, { + generationId, consecutiveErrorRetries: this.#consecutiveErrorRetries, retryParamKeys: retryOpts.params ? Object.keys(retryOpts.params).join(`,`) @@ -962,6 +1019,8 @@ export class ShapeStream = Row> console.log( `[Electric] onError requested retry. Restarting stream from current offset. ` + `${this.#formatStateDiagnostics(this.#syncState, { + errorGeneration: generationId, + errorRequestId, consecutiveErrorRetries: this.#consecutiveErrorRetries, })}`, err @@ -1031,12 +1090,16 @@ export class ShapeStream = Row> offset: state.offset, cursor: state.liveCacheBuster, replayCursor: state.replayCursor, + activeGeneration: this.#activeGenerationId, paused: this.#pauseLock.isPaused, connected: this.#connected, started: this.#started, currentUrl, shapeKey, expiredHandle, + lastErrorGeneration: this.#lastErrorContext?.generationId, + lastErrorRequestId: this.#lastErrorContext?.requestId, + lastErrorName: this.#lastErrorContext?.errorName, ...extra, } @@ -1046,6 +1109,45 @@ export class ShapeStream = Row> .join(` `) } + #recordPendingErrorContext( + generationId: number, + error: Error, + extra: { + requestId?: number + transport?: `long-poll` | `sse` + responseStatus?: number + } = {} + ) { + this.#pendingErrorContext = { + generationId, + requestId: extra.requestId, + transport: extra.transport, + errorName: error.name, + errorMessage: error.message, + responseStatus: extra.responseStatus, + capturedAt: Date.now(), + } + } + + #debugGenerationMismatch( + eventType: string, + extra: { + generationId: number + requestId?: number + transport?: `long-poll` | `sse` + } + ) { + if (extra.generationId === this.#activeGenerationId) return + + this.#debugLog(`generation:mismatch`, { + eventType, + eventGeneration: extra.generationId, + requestId: extra.requestId, + transport: extra.transport, + isActiveGeneration: false, + }) + } + #debugLog(event: string, extra: Record = {}) { if (!this.#debugEnabled) return @@ -1095,7 +1197,11 @@ export class ShapeStream = Row> this.#debugLogsSuppressed = 0 } - #warnWithRateLimit(key: string, message: string) { + #warnWithRateLimit( + key: string, + message: string, + maxLogsPerWindow = SHAPE_STREAM_WARNING_MAX_LOGS_PER_WINDOW + ) { const now = Date.now() const window = this.#warningWindows.get(key) @@ -1115,7 +1221,7 @@ export class ShapeStream = Row> return } - if (window.emitted < SHAPE_STREAM_WARNING_MAX_LOGS_PER_WINDOW) { + if (window.emitted < maxLogsPerWindow) { window.emitted++ console.warn(message, new Error(`stack trace`)) return @@ -1124,7 +1230,10 @@ export class ShapeStream = Row> window.suppressed++ } - async #requestShape(requestShapeCacheBuster?: string): Promise { + async #requestShape( + generationId: number, + requestShapeCacheBuster?: string + ): Promise { // ErrorState should never reach the request loop — re-throw so // #start's catch block can route it through onError properly. if (this.#syncState instanceof ErrorState) { @@ -1139,6 +1248,7 @@ export class ShapeStream = Row> this.#pendingRequestShapeCacheBuster = activeCacheBuster } this.#debugLog(`request:skipped-paused`, { + generationId, requestCacheBuster: activeCacheBuster, }) return @@ -1149,6 +1259,7 @@ export class ShapeStream = Row> (this.options.signal?.aborted || this.#syncState.isUpToDate) ) { this.#debugLog(`request:stopped`, { + generationId, signalAborted: this.options.signal?.aborted, isUpToDate: this.#syncState.isUpToDate, }) @@ -1167,7 +1278,7 @@ export class ShapeStream = Row> if (this.#syncState instanceof PausedState) { resumingFromPause = true this.#syncState = this.#syncState.resume() - this.#debugLog(`request:resumed`) + this.#debugLog(`request:resumed`, { generationId }) } const { url, signal } = this.options @@ -1195,6 +1306,7 @@ export class ShapeStream = Row> this.#pendingRequestShapeCacheBuster = activeCacheBuster } this.#debugLog(`request:cancelled-before-dispatch`, { + generationId, requestId, fetchUrl: fetchUrl.toString(), }) @@ -1206,6 +1318,7 @@ export class ShapeStream = Row> try { await this.#fetchShape({ + generationId, requestId, fetchUrl, requestAbortController, @@ -1224,14 +1337,16 @@ export class ShapeStream = Row> isRestartAbort ) { this.#debugLog(`request:restart-after-abort`, { + generationId, requestId, abortReason: describeDiagnosticValue(abortReason), }) - return this.#requestShape() + return this.#requestShape(generationId) } if (e instanceof FetchBackoffAbortError) { this.#debugLog(`request:aborted`, { + generationId, requestId, abortReason: describeDiagnosticValue(abortReason), }) @@ -1245,16 +1360,23 @@ export class ShapeStream = Row> // 2. Self-healing: stale retries exhausted, expired entry cleared, // stream reset — retry without expired_handle param. this.#debugLog(`request:stale-cache-retry`, { + generationId, requestId, errorMessage: e.message, }) - return this.#requestShape() + return this.#requestShape(generationId) } - if (!(e instanceof FetchError)) throw e // should never happen + if (!(e instanceof FetchError)) { + if (e instanceof Error) { + this.#recordPendingErrorContext(generationId, e, { requestId }) + } + throw e // should never happen + } if (e.status == 409) { this.#debugLog(`request:must-refetch`, { + generationId, requestId, responseStatus: e.status, }) @@ -1286,13 +1408,18 @@ export class ShapeStream = Row> // body to avoid delivering stale data rows to subscribers. await this.#publish([{ headers: { control: `must-refetch` } }]) - return this.#requestShape(nextRequestShapeCacheBuster) + return this.#requestShape(generationId, nextRequestShapeCacheBuster) } else { // errors that have reached this point are not actionable without // additional user input, such as 400s or failures to read the // body of a response, so we exit the loop and let #start handle it // Note: We don't notify subscribers here because onError might recover + this.#recordPendingErrorContext(generationId, e, { + requestId, + responseStatus: e.status, + }) this.#debugLog(`request:failed`, { + generationId, requestId, errorName: e.name, errorMessage: e.message, @@ -1308,7 +1435,7 @@ export class ShapeStream = Row> } this.#tickPromiseResolver?.() - return this.#requestShape() + return this.#requestShape(generationId) } /** @@ -1581,7 +1708,11 @@ export class ShapeStream = Row> */ async #onInitialResponse( response: Response, - requestId?: number + opts?: { + generationId: number + requestId: number + transport: `long-poll` | `sse` + } ): Promise { const { headers, status } = response const shapeHandle = headers.get(SHAPE_HANDLE_HEADER) @@ -1629,7 +1760,14 @@ export class ShapeStream = Row> this.#syncState = transition.state this.#debugLog(`response:headers`, { - requestId, + generationId: opts?.generationId, + requestId: opts?.requestId, + transport: opts?.transport, + eventGeneration: opts?.generationId, + isActiveGeneration: + opts?.generationId === undefined + ? undefined + : opts.generationId === this.#activeGenerationId, action: transition.action, responseStatus: status, responseHandle: shapeHandle, @@ -1637,6 +1775,9 @@ export class ShapeStream = Row> responseCursor, expiredHandle, }) + if (opts) { + this.#debugGenerationMismatch(`response:headers`, opts) + } // Clear recovery guard on 204 (no-content), since the empty body means // #onMessages won't run to clear it via the up-to-date path. @@ -1713,15 +1854,35 @@ export class ShapeStream = Row> } if (transition.action === `ignored`) { + this.#debugLog(`response:ignored`, { + generationId: opts?.generationId, + requestId: opts?.requestId, + transport: opts?.transport, + eventGeneration: opts?.generationId, + isActiveGeneration: + opts?.generationId === undefined + ? undefined + : opts.generationId === this.#activeGenerationId, + responseHandle: shapeHandle, + responseStatus: status, + }) this.#warnWithRateLimit( `ignored-response-${this.#syncState.kind}`, `[Electric] Response was ignored by state "${this.#syncState.kind}". ` + `The response body will be skipped. ` + `This may indicate a proxy/CDN caching issue or a client state machine bug. ` + `${this.#formatStateDiagnostics(this.#syncState, { + eventGeneration: opts?.generationId, + requestId: opts?.requestId, + transport: opts?.transport, + isActiveGeneration: + opts?.generationId === undefined + ? undefined + : opts.generationId === this.#activeGenerationId, responseHandle: shapeHandle, responseStatus: status, - })}` + })}`, + SHAPE_STREAM_IGNORED_RESPONSE_WARNING_MAX_LOGS_PER_WINDOW ) return false } @@ -1732,7 +1893,11 @@ export class ShapeStream = Row> async #onMessages( batch: Array>, isSseMessage = false, - requestId?: number + opts?: { + generationId: number + requestId: number + transport: `long-poll` | `sse` + } ) { if (!Array.isArray(batch)) { console.warn( @@ -1786,13 +1951,22 @@ export class ShapeStream = Row> }) this.#debugLog(`messages:batch`, { - requestId, - transport: isSseMessage ? `sse` : `long-poll`, + generationId: opts?.generationId, + requestId: opts?.requestId, + transport: opts?.transport ?? (isSseMessage ? `sse` : `long-poll`), + eventGeneration: opts?.generationId, + isActiveGeneration: + opts?.generationId === undefined + ? undefined + : opts.generationId === this.#activeGenerationId, batchSize: batch.length, publishedCount: messagesToProcess.length, hasUpToDateMessage, upToDateOffset, }) + if (opts) { + this.#debugGenerationMismatch(`messages:batch`, opts) + } await this.#publish(messagesToProcess) } @@ -1805,6 +1979,7 @@ export class ShapeStream = Row> * @returns A promise that resolves when the request is complete (i.e. the long poll receives a response or the SSE connection is closed). */ async #fetchShape(opts: { + generationId: number requestId: number fetchUrl: URL requestAbortController: AbortController @@ -1832,9 +2007,11 @@ export class ShapeStream = Row> isRefreshing: this.#isRefreshing, resumingFromPause: !!opts.resumingFromPause, }) + const transport = shouldUseSse ? `sse` : `long-poll` this.#debugLog(`request:dispatch`, { + generationId: opts.generationId, requestId: opts.requestId, - transport: shouldUseSse ? `sse` : `long-poll`, + transport, fetchUrl: opts.fetchUrl.toString(), resumingFromPause: !!opts.resumingFromPause, isRefreshing: this.#isRefreshing, @@ -1842,14 +2019,16 @@ export class ShapeStream = Row> if (shouldUseSse) { opts.fetchUrl.searchParams.set(EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, `true`) opts.fetchUrl.searchParams.set(LIVE_SSE_QUERY_PARAM, `true`) - return this.#requestShapeSSE(opts) + return this.#requestShapeSSE({ ...opts, transport: `sse` }) } - return this.#requestShapeLongPoll(opts) + return this.#requestShapeLongPoll({ ...opts, transport: `long-poll` }) } async #requestShapeLongPoll(opts: { + generationId: number requestId: number + transport: `long-poll` fetchUrl: URL requestAbortController: AbortController headers: Record @@ -1861,10 +2040,7 @@ export class ShapeStream = Row> }) this.#connected = true - const shouldProcessBody = await this.#onInitialResponse( - response, - opts.requestId - ) + const shouldProcessBody = await this.#onInitialResponse(response, opts) if (!shouldProcessBody) return const schema = this.#syncState.schema! // we know that it is not undefined because it is set by `this.#onInitialResponse` @@ -1885,11 +2061,13 @@ export class ShapeStream = Row> ) } - await this.#onMessages(batch, false, opts.requestId) + await this.#onMessages(batch, false, opts) } async #requestShapeSSE(opts: { + generationId: number requestId: number + transport: `sse` fetchUrl: URL requestAbortController: AbortController headers: Record @@ -1916,7 +2094,7 @@ export class ShapeStream = Row> this.#connected = true const shouldProcessBody = await this.#onInitialResponse( response, - opts.requestId + opts ) if (!shouldProcessBody) { ignoredStaleResponse = true @@ -1936,7 +2114,7 @@ export class ShapeStream = Row> if (isUpToDateMessage(message)) { // Flush the buffer on up-to-date message. // Ensures that we only process complete batches of operations. - this.#onMessages(buffer, true, opts.requestId) + this.#onMessages(buffer, true, opts) buffer = [] } } @@ -1985,12 +2163,15 @@ export class ShapeStream = Row> }) this.#syncState = transition.state this.#debugLog(`sse:closed`, { + generationId: opts.generationId, requestId: opts.requestId, + transport: opts.transport, connectionDuration, wasAborted, fellBackToLongPolling: transition.fellBackToLongPolling, wasShortConnection: transition.wasShortConnection, }) + this.#debugGenerationMismatch(`sse:closed`, opts) if (transition.fellBackToLongPolling) { console.warn( From 4b36ca6f10a0bc2b556a3bb2a3d4d78254774261 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Sat, 11 Apr 2026 18:40:30 -0600 Subject: [PATCH 7/7] feat(typescript-client): log snapshot retry diagnostics --- packages/typescript-client/src/client.ts | 78 ++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 4 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 01afdc24e5..b15c2021d1 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -228,6 +228,17 @@ function describeDiagnosticValue(value: unknown): string | undefined { } } +function summarizeDiagnosticValue( + value: unknown, + maxLength = 240 +): string | undefined { + const described = describeDiagnosticValue(value) + if (!described) return described + return described.length > maxLength + ? `${described.slice(0, maxLength - 3)}...` + : described +} + type Replica = `full` | `default` export type LogMode = `changes_only` | `full` @@ -1395,7 +1406,9 @@ export class ShapeStream = Row> if (!newShapeHandle) { console.warn( `[Electric] Received 409 response without a shape handle header. ` + - `This likely indicates a proxy or CDN stripping required headers.`, + `This can happen if a proxy/CDN strips required headers or if the server emitted ` + + `a must-refetch response without a replacement handle. ` + + `stream="main" requestUrl=${JSON.stringify(fetchUrl.toString())}`, new Error(`stack trace`) ) } @@ -2449,8 +2462,13 @@ export class ShapeStream = Row> } const snapshotReason = `snapshot-${++this.#snapshotCounter}` + const snapshotSummary = summarizeDiagnosticValue(opts) this.#pauseLock.acquire(snapshotReason) + this.#debugLog(`snapshot:pause-acquired`, { + snapshotReason, + snapshotSummary, + }) // Warn if the snapshot holds the pause lock for too long — this likely // indicates a hung fetch or leaked lock. Visibility pauses are @@ -2459,7 +2477,8 @@ export class ShapeStream = Row> console.warn( `[Electric] Snapshot "${snapshotReason}" has held the pause lock for 30s — ` + `possible hung request or leaked lock. ` + - `Current holders: ${[...new Set([snapshotReason])].join(`, `)}`, + `Current holders: ${[...new Set([snapshotReason])].join(`, `)} ` + + `subset=${JSON.stringify(snapshotSummary)}`, new Error(`stack trace`) ) }, 30_000) @@ -2468,6 +2487,14 @@ export class ShapeStream = Row> const { metadata, data, responseOffset, responseHandle } = await this.fetchSnapshot(opts) + this.#debugLog(`snapshot:completed`, { + snapshotReason, + snapshotSummary, + responseHandle, + responseOffset, + rowCount: data.length, + }) + const dataWithEndBoundary = (data as Array>).concat([ { headers: { control: `snapshot-end`, ...metadata } }, { headers: { control: `subset-end`, ...opts } }, @@ -2508,9 +2535,24 @@ export class ShapeStream = Row> metadata, data, } + } catch (e) { + this.#debugLog(`snapshot:error`, { + snapshotReason, + snapshotSummary, + errorName: e instanceof Error ? e.name : undefined, + errorMessage: + e instanceof Error + ? summarizeDiagnosticValue(e.message, 320) + : undefined, + }) + throw e } finally { clearTimeout(snapshotWarnTimer) this.#pauseLock.release(snapshotReason) + this.#debugLog(`snapshot:pause-released`, { + snapshotReason, + snapshotSummary, + }) } } @@ -2546,6 +2588,7 @@ export class ShapeStream = Row> }> { const method = opts.method ?? this.options.subsetMethod ?? `GET` const usePost = method === `POST` + const snapshotSummary = summarizeDiagnosticValue(opts) let fetchUrl: URL let fetchOptions: RequestInit @@ -2575,6 +2618,14 @@ export class ShapeStream = Row> // Capture handle before fetch to avoid race conditions if it changes during the request const usedHandle = this.#syncState.handle + this.#debugLog(`snapshot:fetch:start`, { + retryCount, + method, + fetchUrl: fetchUrl.toString(), + usedHandle, + cacheBuster, + snapshotSummary, + }) let response: Response try { @@ -2586,6 +2637,15 @@ export class ShapeStream = Row> // clear the pause lock and break requestSnapshot's pause/resume logic. if (e instanceof FetchError && e.status === 409) { const nextRetryCount = retryCount + 1 + const nextHandle = e.headers[SHAPE_HANDLE_HEADER] + this.#debugLog(`snapshot:fetch:409`, { + retryCount, + nextRetryCount, + fetchUrl: fetchUrl.toString(), + usedHandle, + nextHandle, + snapshotSummary, + }) if (nextRetryCount > this.#maxSnapshotRetries) { throw new FetchError( 502, @@ -2606,13 +2666,15 @@ export class ShapeStream = Row> // For snapshot 409s, only update the handle — don't reset offset/schema/etc. // The main stream is paused and should not be disturbed. - const nextHandle = e.headers[SHAPE_HANDLE_HEADER] if (nextHandle) { this.#syncState = this.#syncState.withHandle(nextHandle) } else { console.warn( `[Electric] Received 409 response without a shape handle header. ` + - `This likely indicates a proxy or CDN stripping required headers.`, + `This can happen if a proxy/CDN strips required headers or if the server emitted ` + + `a must-refetch response without a replacement handle. ` + + `stream="snapshot" retryCount=${nextRetryCount} requestUrl=${JSON.stringify(fetchUrl.toString())} ` + + `snapshot=${JSON.stringify(snapshotSummary)}`, new Error(`stack trace`) ) } @@ -2648,6 +2710,14 @@ export class ShapeStream = Row> const responseOffset = (response.headers.get(CHUNK_LAST_OFFSET_HEADER) as Offset) || null const responseHandle = response.headers.get(SHAPE_HANDLE_HEADER) + this.#debugLog(`snapshot:fetch:success`, { + retryCount, + fetchUrl: fetchUrl.toString(), + responseHandle, + responseOffset, + rowCount: data.length, + snapshotSummary, + }) return { metadata, data, responseOffset, responseHandle } }