Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/progressive-persisted-resume.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/electric-db-collection': patch
---

Prevent progressive Electric collections from truncating persisted rows when resuming from saved Electric shape metadata.
6 changes: 5 additions & 1 deletion packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,11 @@ function createElectricSync<T extends Row<unknown>>(
let transactionStarted = false
const newTxids = new Set<Txid>()
const newSnapshots: Array<PostgresSnapshot> = []
let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode
// Track if we've completed initial sync in progressive mode. A persisted
// resume starts from an already-committed stream offset, so the next
// up-to-date message must not run the initial atomic swap again.
let hasReceivedUpToDate =
syncMode === `progressive` && canUsePersistedResume

// Progressive mode state
// Helper to determine if we're buffering the initial sync
Expand Down
110 changes: 101 additions & 9 deletions packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,42 @@ describe(`Electric Integration`, () => {

const createPersistedAdapter = (
collectionMetadata?: Map<string, unknown>,
rows: Map<string | number, Row> = new Map(),
) => ({
loadSubset: async () => [],
loadCollectionMetadata: async () =>
Array.from((collectionMetadata ?? new Map()).entries()).map(
([key, value]) => ({
key,
value,
}),
loadSubset: () =>
Promise.resolve(
Array.from(rows.entries()).map(([key, value]) => ({ key, value })),
),
loadCollectionMetadata: () =>
Promise.resolve(
Array.from((collectionMetadata ?? new Map()).entries()).map(
([key, value]) => ({
key,
value,
}),
),
),
applyCommittedTx: async (_collectionId: string, tx: any) => {
applyCommittedTx: (_collectionId: string, tx: any) => {
for (const mutation of tx.collectionMetadataMutations ?? []) {
if (mutation.type === `delete`) {
collectionMetadata?.delete(mutation.key)
} else {
collectionMetadata?.set(mutation.key, mutation.value)
}
}
if (tx.truncate) {
rows.clear()
}
for (const mutation of tx.mutations ?? []) {
if (mutation.type === `delete`) {
rows.delete(mutation.key)
} else {
rows.set(mutation.key, mutation.value)
}
}
return Promise.resolve()
},
ensureIndex: async () => {},
ensureIndex: () => Promise.resolve(),
})

beforeEach(() => {
Expand Down Expand Up @@ -3136,6 +3153,81 @@ describe(`Electric Integration`, () => {
)
})

it(`should keep hydrated persisted rows when progressive resume receives up-to-date without snapshot rows`, async () => {
vi.clearAllMocks()

const { ShapeStream } = await import(`@electric-sql/client`)
mockFetchSnapshot.mockResolvedValue({
metadata: {},
data: [],
})

const collectionMetadata = new Map<string, unknown>([
[
`electric:resume`,
{
kind: `resume`,
offset: `10_0`,
handle: `handle-1`,
shapeId: `{"params":{"table":"test_table"},"url":"http://test-url"}`,
updatedAt: 1,
},
],
])
const persistedRows = new Map<string | number, Row>([
[1, { id: 1, name: `Persisted User` }],
])

const persistedCollection = createCollection(
persistedCollectionOptions({
...(electricCollectionOptions({
id: `persisted-progressive-resume-test`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `test_table`,
},
},
syncMode: `progressive` as const,
getKey: (item: Row) => item.id as number,
startSync: true,
}) as any),
persistence: {
adapter: createPersistedAdapter(collectionMetadata, persistedRows),
},
}) as any,
)

persistedCollection.startSyncImmediate()
await persistedCollection._sync.loadSubset({ limit: 10 })

expect(ShapeStream).toHaveBeenCalledWith(
expect.objectContaining({
offset: `10_0`,
handle: `handle-1`,
}),
)
expect(stripVirtualProps(persistedCollection.get(1))).toEqual({
id: 1,
name: `Persisted User`,
})

subscriber([
{
headers: { control: `up-to-date` },
},
])

expect(stripVirtualProps(persistedCollection.get(1))).toEqual({
id: 1,
name: `Persisted User`,
})
expect(stripVirtualProps(persistedRows.get(1))).toEqual({
id: 1,
name: `Persisted User`,
})
})

it(`should not mix explicit handle with persisted offset`, async () => {
vi.clearAllMocks()

Expand Down