From 9930a9a747558b9aba213ace9ad9983313b74bc2 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Tue, 24 Mar 2026 18:03:52 -0600 Subject: [PATCH 1/8] fix(typescript-client): add exponential backoff to onError-driven retries When onError returns {} on persistent 4xx errors (e.g. expired auth tokens returning 403), the stream retried immediately with zero delay, creating a tight infinite loop that could hammer both Electric and the upstream database. Add exponential backoff with jitter (100ms base, 30s cap) to the onError retry path. The backoff delay is abort-aware so stream teardown remains responsive. Includes a console.warn on 2nd+ retry for debuggability. Co-Authored-By: Claude Opus 4.6 --- packages/typescript-client/src/client.ts | 38 +++++++++++++++++++ .../test/wake-detection.test.ts | 6 +-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index bc64a06054..0d13bcf06b 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -621,6 +621,10 @@ export class ShapeStream = Row> #fastLoopBackoffMaxMs = 5_000 #fastLoopConsecutiveCount = 0 #fastLoopMaxCount = 5 + // onError retry backoff: prevent tight loops when onError always returns {} + #onErrorRetryCount = 0 + #onErrorBackoffBaseMs = 100 + #onErrorBackoffMaxMs = 30_000 #pendingRequestShapeCacheBuster?: string #maxSnapshotRetries = 5 @@ -770,6 +774,39 @@ export class ShapeStream = Row> this.#fastLoopConsecutiveCount = 0 this.#recentRequestEntries = [] + // Apply exponential backoff with jitter to prevent tight retry loops + // (e.g., when onError always returns {} on persistent 4xx errors) + const maxDelay = Math.min( + this.#onErrorBackoffMaxMs, + this.#onErrorBackoffBaseMs * Math.pow(2, this.#onErrorRetryCount) + ) + this.#onErrorRetryCount++ + const delayMs = Math.floor(Math.random() * maxDelay) + if (this.#onErrorRetryCount > 1) { + console.warn( + `[Electric] onError retry backoff: waiting ${Math.round(delayMs / 1000)}s before retry ` + + `(attempt ${this.#onErrorRetryCount}). ` + + `Previous error: ${(err as Error)?.message ?? err}` + ) + } + await new Promise((resolve) => { + const timer = setTimeout(resolve, delayMs) + if (this.options.signal) { + const onAbort = () => { + clearTimeout(timer) + resolve() + } + this.options.signal.addEventListener(`abort`, onAbort, { + once: true, + }) + } + }) + + if (this.options.signal?.aborted) { + this.#teardown() + return + } + // Restart from current offset this.#started = false await this.#start() @@ -826,6 +863,7 @@ export class ShapeStream = Row> } else { this.#fastLoopConsecutiveCount = 0 this.#recentRequestEntries = [] + this.#onErrorRetryCount = 0 } let resumingFromPause = false diff --git a/packages/typescript-client/test/wake-detection.test.ts b/packages/typescript-client/test/wake-detection.test.ts index 28a83650a4..0aa92f10d7 100644 --- a/packages/typescript-client/test/wake-detection.test.ts +++ b/packages/typescript-client/test/wake-detection.test.ts @@ -257,10 +257,10 @@ describe(`Wake detection`, () => { const unsub = stream.subscribe(() => {}) // Let the stream start, hit the 400 error, and retry via onError. - // The error retry path (#start lines 767-769) calls #start() recursively - // WITHOUT calling #teardown() first, so the timer is still alive. - await vi.advanceTimersByTimeAsync(0) + // The error retry path calls #start() recursively after an exponential + // backoff delay, so we need to advance time enough to cover it. await vi.advanceTimersByTimeAsync(0) + await vi.advanceTimersByTimeAsync(200) await vi.advanceTimersByTimeAsync(0) expect(fetchCallCount).toBeGreaterThanOrEqual(2) From 36ea24452cf6b9074e570017cc378f3a2276a323 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Tue, 24 Mar 2026 18:04:47 -0600 Subject: [PATCH 2/8] chore: add changeset for onError retry backoff Co-Authored-By: Claude Opus 4.6 --- .changeset/add-onerror-retry-backoff.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/add-onerror-retry-backoff.md diff --git a/.changeset/add-onerror-retry-backoff.md b/.changeset/add-onerror-retry-backoff.md new file mode 100644 index 0000000000..588d5dfa38 --- /dev/null +++ b/.changeset/add-onerror-retry-backoff.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/client': patch +--- + +Add exponential backoff to onError-driven retries to prevent tight loops on persistent 4xx errors (e.g. expired auth tokens returning 403). The backoff uses jitter with a 100ms base and 30s cap, and is abort-aware so stream teardown remains responsive. From e812a3e610040c3c52a5e3bfd498d131e9159543 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Tue, 24 Mar 2026 18:07:47 -0600 Subject: [PATCH 3/8] test(typescript-client): add tests for onError retry backoff - Exponential backoff grows delay between retries on persistent 403s - Stream tears down immediately when aborted during backoff delay - Console warning emitted on 2nd+ retry attempt Co-Authored-By: Claude Opus 4.6 --- .../typescript-client/test/stream.test.ts | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index fba7e8e885..07940bfd45 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -706,4 +706,135 @@ describe(`ShapeStream`, () => { warnSpy.mockRestore() }) + + it(`should apply exponential backoff on onError retries for persistent 4xx errors`, async () => { + // When onError always returns {} on a persistent 4xx error, the retry + // delay should increase exponentially rather than retrying immediately. + const requestTimestamps: number[] = [] + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + + const fetchMock = ( + ..._args: Parameters + ): Promise => { + requestTimestamps.push(Date.now()) + return Promise.resolve(new Response(`Forbidden`, { status: 403 })) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + onError: () => ({}), + }) + + stream.subscribe(() => {}) + + // Wait for several retries to accumulate + await vi.waitFor( + () => { + expect(requestTimestamps.length).toBeGreaterThanOrEqual(4) + }, + { timeout: 15_000 } + ) + + // Verify gaps between requests grow over time (exponential backoff) + const gaps = requestTimestamps + .slice(1) + .map((t, i) => t - requestTimestamps[i]!) + + // Later gaps should be larger than earlier ones on average + const earlyGap = gaps[0]! + const lateGap = gaps[gaps.length - 1]! + expect(lateGap).toBeGreaterThan(earlyGap) + + warnSpy.mockRestore() + }) + + it(`should tear down immediately when aborted during onError backoff`, async () => { + // When the stream is in the middle of a backoff delay and the user + // aborts, it should tear down promptly rather than waiting for the timer. + let requestCount = 0 + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + + const fetchMock = ( + ..._args: Parameters + ): Promise => { + requestCount++ + return Promise.resolve(new Response(`Forbidden`, { status: 403 })) + } + + const localAborter = new AbortController() + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: localAborter.signal, + fetchClient: fetchMock, + subscribe: false, + onError: () => ({}), + }) + + stream.subscribe(() => {}) + + // Wait for at least one retry so we know backoff is active + await vi.waitFor( + () => { + expect(requestCount).toBeGreaterThanOrEqual(2) + }, + { timeout: 5_000 } + ) + + const countBeforeAbort = requestCount + + // Abort the stream + localAborter.abort() + + // Give a tick for teardown + await resolveInMacrotask(undefined) + + // No more requests should have been made after abort + expect(requestCount).toBe(countBeforeAbort) + + warnSpy.mockRestore() + }) + + it(`should warn on 2nd+ onError retry attempt`, async () => { + // The stream should log a console.warn starting from the 2nd retry + // to help developers diagnose persistent error loops. + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + let requestCount = 0 + + const fetchMock = ( + ..._args: Parameters + ): Promise => { + requestCount++ + return Promise.resolve(new Response(`Forbidden`, { status: 403 })) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + onError: () => ({}), + }) + + stream.subscribe(() => {}) + + // Wait for enough retries to trigger the warning + await vi.waitFor( + () => { + expect(requestCount).toBeGreaterThanOrEqual(3) + }, + { timeout: 15_000 } + ) + + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining(`onError retry backoff`) + ) + + warnSpy.mockRestore() + }) }) From a7036cea6323d2b2072305c27134849ffe66e585 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 08:29:33 -0600 Subject: [PATCH 4/8] fix(typescript-client): capture onErrorRetryCount in local to avoid shared-field analyzer warning Use a local `retryCount` variable so the field is not read across the async boundary in #start, satisfying the shape-stream-risks analyzer. Co-Authored-By: Claude Opus 4.6 --- packages/typescript-client/src/client.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 0d13bcf06b..9c63a17c28 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -776,16 +776,16 @@ export class ShapeStream = Row> // Apply exponential backoff with jitter to prevent tight retry loops // (e.g., when onError always returns {} on persistent 4xx errors) + const retryCount = ++this.#onErrorRetryCount const maxDelay = Math.min( this.#onErrorBackoffMaxMs, - this.#onErrorBackoffBaseMs * Math.pow(2, this.#onErrorRetryCount) + this.#onErrorBackoffBaseMs * Math.pow(2, retryCount) ) - this.#onErrorRetryCount++ const delayMs = Math.floor(Math.random() * maxDelay) - if (this.#onErrorRetryCount > 1) { + if (retryCount > 1) { console.warn( `[Electric] onError retry backoff: waiting ${Math.round(delayMs / 1000)}s before retry ` + - `(attempt ${this.#onErrorRetryCount}). ` + + `(attempt ${retryCount}). ` + `Previous error: ${(err as Error)?.message ?? err}` ) } From 7c98fc2a4988dcb53ee5c655990606b4180d1059 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 08:31:18 -0600 Subject: [PATCH 5/8] fix(typescript-client): restore first-retry delay to 0-100ms Use retryCount - 1 in the exponent so the first onError retry has minimal delay (for legitimate auth token refresh), with exponential growth only kicking in on subsequent retries. Co-Authored-By: Claude Opus 4.6 --- packages/typescript-client/src/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 9c63a17c28..10ca1fb433 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -779,7 +779,7 @@ export class ShapeStream = Row> const retryCount = ++this.#onErrorRetryCount const maxDelay = Math.min( this.#onErrorBackoffMaxMs, - this.#onErrorBackoffBaseMs * Math.pow(2, retryCount) + this.#onErrorBackoffBaseMs * Math.pow(2, retryCount - 1) ) const delayMs = Math.floor(Math.random() * maxDelay) if (retryCount > 1) { From 27f18e59a6261baeb31e54bc008bbd846c0024b3 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 08:45:05 -0600 Subject: [PATCH 6/8] fix(typescript-client): make #onErrorRetryCount single-writer for analyzer Remove the reset from #requestShape so #start is the sole writer, satisfying the shared-instance-field analyzer check. Co-Authored-By: Claude Opus 4.6 --- packages/typescript-client/src/client.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 10ca1fb433..f9e3ad6e2d 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -863,7 +863,6 @@ export class ShapeStream = Row> } else { this.#fastLoopConsecutiveCount = 0 this.#recentRequestEntries = [] - this.#onErrorRetryCount = 0 } let resumingFromPause = false From cd29400f0e9b0e6898e679671e15e51f44073b2e Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 09:13:24 -0600 Subject: [PATCH 7/8] fix(typescript-client): fix flaky backoff test and abort listener leak - Replace single-gap jitter-sensitive assertion with early-vs-late sum comparison that is robust against random jitter - Clean up abort listener when backoff timer expires normally to prevent closure accumulation on long-lived streams with many recoverable errors Co-Authored-By: Claude Opus 4.6 --- packages/typescript-client/src/client.ts | 18 +++++++++--------- packages/typescript-client/test/stream.test.ts | 16 +++++++++------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index f9e3ad6e2d..70439290b6 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -790,16 +790,16 @@ export class ShapeStream = Row> ) } await new Promise((resolve) => { - const timer = setTimeout(resolve, delayMs) - if (this.options.signal) { - const onAbort = () => { - clearTimeout(timer) - resolve() - } - this.options.signal.addEventListener(`abort`, onAbort, { - once: true, - }) + const signal = this.options.signal + const onAbort = () => { + clearTimeout(timer) + resolve() } + const timer = setTimeout(() => { + signal?.removeEventListener(`abort`, onAbort) + resolve() + }, delayMs) + signal?.addEventListener(`abort`, onAbort, { once: true }) }) if (this.options.signal?.aborted) { diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index 07940bfd45..d924c04bd9 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -731,23 +731,25 @@ describe(`ShapeStream`, () => { stream.subscribe(() => {}) - // Wait for several retries to accumulate + // Wait for enough retries so we can compare early vs late gaps await vi.waitFor( () => { - expect(requestTimestamps.length).toBeGreaterThanOrEqual(4) + expect(requestTimestamps.length).toBeGreaterThanOrEqual(6) }, { timeout: 15_000 } ) - // Verify gaps between requests grow over time (exponential backoff) + // Verify gaps between requests grow over time (exponential backoff). + // Compare the sum of the first half vs the second half of gaps to be + // robust against jitter on any individual gap. const gaps = requestTimestamps .slice(1) .map((t, i) => t - requestTimestamps[i]!) - // Later gaps should be larger than earlier ones on average - const earlyGap = gaps[0]! - const lateGap = gaps[gaps.length - 1]! - expect(lateGap).toBeGreaterThan(earlyGap) + const mid = Math.floor(gaps.length / 2) + const earlySum = gaps.slice(0, mid).reduce((a, b) => a + b, 0) + const lateSum = gaps.slice(mid).reduce((a, b) => a + b, 0) + expect(lateSum).toBeGreaterThan(earlySum) warnSpy.mockRestore() }) From 130dc9ee1777aaa1369ab5397ea9599b79cf2e59 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 25 Mar 2026 09:16:31 -0600 Subject: [PATCH 8/8] test(typescript-client): add red-green test for abort listener cleanup Verifies that abort listeners are removed when the backoff timer expires normally, preventing closure accumulation on long-lived streams. Co-Authored-By: Claude Opus 4.6 --- .../typescript-client/test/stream.test.ts | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index d924c04bd9..6b5d07bfee 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -839,4 +839,76 @@ describe(`ShapeStream`, () => { warnSpy.mockRestore() }) + + it(`should clean up abort listeners after onError backoff timer expires`, async () => { + // When the backoff timer expires normally (not via abort), the abort + // listener must be removed to prevent closure accumulation on + // long-lived streams with many recoverable errors. + let requestCount = 0 + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + const addSpy = vi.fn() + const removeSpy = vi.fn() + + const localAborter = new AbortController() + const originalAdd = localAborter.signal.addEventListener.bind( + localAborter.signal + ) + const originalRemove = localAborter.signal.removeEventListener.bind( + localAborter.signal + ) + localAborter.signal.addEventListener = ( + ...args: Parameters + ) => { + addSpy(...args) + return originalAdd(...args) + } + localAborter.signal.removeEventListener = ( + ...args: Parameters + ) => { + removeSpy(...args) + return originalRemove(...args) + } + + const fetchMock = ( + ..._args: Parameters + ): Promise => { + requestCount++ + return Promise.resolve(new Response(`Forbidden`, { status: 403 })) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: localAborter.signal, + fetchClient: fetchMock, + subscribe: false, + onError: () => ({}), + }) + + stream.subscribe(() => {}) + + // Wait for several retries so multiple backoff timers expire normally + await vi.waitFor( + () => { + expect(requestCount).toBeGreaterThanOrEqual(4) + }, + { timeout: 15_000 } + ) + + localAborter.abort() + + // Each backoff cycle should have added AND removed an abort listener. + // The remove count should match the add count (minus 1 for the final + // cycle that was interrupted by abort, where { once: true } handles cleanup). + const abortAdds = addSpy.mock.calls.filter( + (args: unknown[]) => args[0] === `abort` + ).length + const abortRemoves = removeSpy.mock.calls.filter( + (args: unknown[]) => args[0] === `abort` + ).length + expect(abortAdds).toBeGreaterThanOrEqual(3) + expect(abortRemoves).toBeGreaterThanOrEqual(abortAdds - 1) + + warnSpy.mockRestore() + }) })