From 6ea922c059f013ed68a96d9c3075c83c1183a96d Mon Sep 17 00:00:00 2001 From: Pavel Kudinov Date: Thu, 23 Apr 2026 23:46:40 -0700 Subject: [PATCH] fix electric progressive persisted resume --- .changeset/progressive-persisted-resume.md | 5 + .../electric-db-collection/src/electric.ts | 6 +- .../tests/electric.test.ts | 110 ++++++++++++++++-- 3 files changed, 111 insertions(+), 10 deletions(-) create mode 100644 .changeset/progressive-persisted-resume.md diff --git a/.changeset/progressive-persisted-resume.md b/.changeset/progressive-persisted-resume.md new file mode 100644 index 000000000..d23766fa8 --- /dev/null +++ b/.changeset/progressive-persisted-resume.md @@ -0,0 +1,5 @@ +--- +'@tanstack/electric-db-collection': patch +--- + +Prevent progressive Electric collections from truncating persisted rows when resuming from saved Electric shape metadata. diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 72a9a072e..22d222c15 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1441,7 +1441,11 @@ function createElectricSync>( let transactionStarted = false const newTxids = new Set() const newSnapshots: Array = [] - let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode + // Track if we've completed initial sync in progressive mode. A persisted + // resume starts from an already-committed stream offset, so the next + // up-to-date message must not run the initial atomic swap again. + let hasReceivedUpToDate = + syncMode === `progressive` && canUsePersistedResume // Progressive mode state // Helper to determine if we're buffering the initial sync diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 30b15887b..bfa567293 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -95,16 +95,22 @@ describe(`Electric Integration`, () => { const createPersistedAdapter = ( collectionMetadata?: Map, + rows: Map = new Map(), ) => ({ - loadSubset: async () => [], - loadCollectionMetadata: async () => - Array.from((collectionMetadata ?? new Map()).entries()).map( - ([key, value]) => ({ - key, - value, - }), + loadSubset: () => + Promise.resolve( + Array.from(rows.entries()).map(([key, value]) => ({ key, value })), + ), + loadCollectionMetadata: () => + Promise.resolve( + Array.from((collectionMetadata ?? new Map()).entries()).map( + ([key, value]) => ({ + key, + value, + }), + ), ), - applyCommittedTx: async (_collectionId: string, tx: any) => { + applyCommittedTx: (_collectionId: string, tx: any) => { for (const mutation of tx.collectionMetadataMutations ?? []) { if (mutation.type === `delete`) { collectionMetadata?.delete(mutation.key) @@ -112,8 +118,19 @@ describe(`Electric Integration`, () => { collectionMetadata?.set(mutation.key, mutation.value) } } + if (tx.truncate) { + rows.clear() + } + for (const mutation of tx.mutations ?? []) { + if (mutation.type === `delete`) { + rows.delete(mutation.key) + } else { + rows.set(mutation.key, mutation.value) + } + } + return Promise.resolve() }, - ensureIndex: async () => {}, + ensureIndex: () => Promise.resolve(), }) beforeEach(() => { @@ -3136,6 +3153,81 @@ describe(`Electric Integration`, () => { ) }) + it(`should keep hydrated persisted rows when progressive resume receives up-to-date without snapshot rows`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + mockFetchSnapshot.mockResolvedValue({ + metadata: {}, + data: [], + }) + + const collectionMetadata = new Map([ + [ + `electric:resume`, + { + kind: `resume`, + offset: `10_0`, + handle: `handle-1`, + shapeId: `{"params":{"table":"test_table"},"url":"http://test-url"}`, + updatedAt: 1, + }, + ], + ]) + const persistedRows = new Map([ + [1, { id: 1, name: `Persisted User` }], + ]) + + const persistedCollection = createCollection( + persistedCollectionOptions({ + ...(electricCollectionOptions({ + id: `persisted-progressive-resume-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + }) as any), + persistence: { + adapter: createPersistedAdapter(collectionMetadata, persistedRows), + }, + }) as any, + ) + + persistedCollection.startSyncImmediate() + await persistedCollection._sync.loadSubset({ limit: 10 }) + + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: `10_0`, + handle: `handle-1`, + }), + ) + expect(stripVirtualProps(persistedCollection.get(1))).toEqual({ + id: 1, + name: `Persisted User`, + }) + + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(stripVirtualProps(persistedCollection.get(1))).toEqual({ + id: 1, + name: `Persisted User`, + }) + expect(stripVirtualProps(persistedRows.get(1))).toEqual({ + id: 1, + name: `Persisted User`, + }) + }) + it(`should not mix explicit handle with persisted offset`, async () => { vi.clearAllMocks()