From 1f2059ef40753e0ae2cd25f6fcef4066f91091be Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 4 Mar 2026 16:50:13 +0200 Subject: [PATCH 1/6] Propagate source through storage APIs. --- .../storage/implementation/MongoChecksums.ts | 4 +- .../storage/implementation/MongoCompactor.ts | 1 + .../implementation/MongoSyncBucketStorage.ts | 14 +- .../test/src/storage_sync.test.ts | 19 +-- .../test/src/change_stream_utils.ts | 34 ++-- .../test/src/CDCStreamTestContext.ts | 21 ++- .../test/src/BinlogStreamUtils.ts | 15 +- .../src/storage/PostgresSyncRulesStorage.ts | 17 +- .../test/src/storage_sync.test.ts | 23 +-- .../test/src/wal_stream_utils.ts | 37 ++-- packages/service-core-tests/src/tests/util.ts | 33 +++- .../service-core/src/storage/ChecksumCache.ts | 20 ++- .../src/storage/SyncRulesBucketStorage.ts | 21 ++- .../src/sync/BucketChecksumState.ts | 24 +-- .../test/src/checksum_cache.test.ts | 159 +++++++++++------- .../test/src/sync/BucketChecksumState.test.ts | 24 +-- packages/service-core/test/src/utils.ts | 9 + 17 files changed, 304 insertions(+), 171 deletions(-) create mode 100644 packages/service-core/test/src/utils.ts diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts index 6e699976e..0eb2c99e3 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts @@ -2,6 +2,7 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; import { addPartialChecksums, bson, + BucketChecksumRequest, BucketChecksum, ChecksumCache, ChecksumMap, @@ -74,7 +75,7 @@ export class MongoChecksums { * Calculate checksums, utilizing the cache for partial checkums, and querying the remainder from * the database (bucket_state + bucket_data). */ - async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { + async getChecksums(checkpoint: InternalOpId, buckets: BucketChecksumRequest[]): Promise { return this.cache.getChecksumMap(checkpoint, buckets); } @@ -298,6 +299,7 @@ export class MongoChecksums { const req = requests.get(bucket); requests.set(bucket, { bucket, + source: req!.source, start: doc.last_op, end: req!.end }); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 43c6f6655..d957f17b1 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -662,6 +662,7 @@ export class MongoCompactor { buckets.map((bucket) => { return { bucket, + source: {} as any, end: this.maxOpId }; }) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 6669b6632..1a395ee37 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -372,19 +372,20 @@ export class MongoSyncBucketStorage async *getBucketDataBatch( checkpoint: utils.InternalOpId, - dataBuckets: Map, + dataBuckets: storage.BucketDataRequest[], options?: storage.BucketDataBatchOptions ): AsyncIterable { - if (dataBuckets.size == 0) { + if (dataBuckets.length == 0) { return; } let filters: mongo.Filter[] = []; + const bucketMap = new Map(dataBuckets.map((request) => [request.bucket, request.start])); if (checkpoint == null) { throw new ServiceAssertionError('checkpoint is null'); } const end = checkpoint; - for (let [name, start] of dataBuckets.entries()) { + for (let { bucket: name, start } of dataBuckets) { filters.push({ _id: { $gt: { @@ -477,7 +478,7 @@ export class MongoSyncBucketStorage } if (start == null) { - const startOpId = dataBuckets.get(bucket); + const startOpId = bucketMap.get(bucket); if (startOpId == null) { throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`); } @@ -519,7 +520,10 @@ export class MongoSyncBucketStorage } } - async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise { + async getChecksums( + checkpoint: utils.InternalOpId, + buckets: storage.BucketChecksumRequest[] + ): Promise { return this.checksums.getChecksums(checkpoint, buckets); } diff --git a/modules/module-mongodb-storage/test/src/storage_sync.test.ts b/modules/module-mongodb-storage/test/src/storage_sync.test.ts index 261146415..d910ef5b7 100644 --- a/modules/module-mongodb-storage/test/src/storage_sync.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_sync.test.ts @@ -80,10 +80,11 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor const checkpoint = result!.flushed_op; const options: storage.BucketDataBatchOptions = {}; + const batchRequest = (start: bigint): storage.BucketDataRequest[] => [ + { bucket: globalBucket, start, source: {} as any } + ]; - const batch1 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([[globalBucket, 0n]]), options) - ); + const batch1 = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, batchRequest(0n), options)); expect(test_utils.getBatchData(batch1)).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 }, { op_id: '2', op: 'PUT', object_id: 'large1', checksum: 1178768505 } @@ -95,11 +96,7 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch( - checkpoint, - new Map([[globalBucket, BigInt(batch1[0].chunkData.next_after)]]), - options - ) + bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch1[0].chunkData.next_after)), options) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 } @@ -111,11 +108,7 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch( - checkpoint, - new Map([[globalBucket, BigInt(batch2[0].chunkData.next_after)]]), - options - ) + bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch2[0].chunkData.next_after)), options) ); expect(test_utils.getBatchData(batch3)).toEqual([ { op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index 1f8941e6c..f32b893ec 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -8,6 +8,7 @@ import { OplogEntry, ProtocolOpId, ReplicationCheckpoint, + storage, STORAGE_VERSION_CONFIG, SyncRulesBucketStorage, TestStorageOptions, @@ -193,12 +194,25 @@ export class ChangeStreamTestContext { return `${this.syncRulesId}#${bucket}`; } + private bucketDataRequest(bucket: string, start: InternalOpId): storage.BucketDataRequest { + return { + bucket: this.resolveBucketName(bucket), + start, + source: {} as any + }; + } + + private bucketChecksumRequest(bucket: string): storage.BucketChecksumRequest { + return { + bucket: this.resolveBucketName(bucket), + source: {} as any + }; + } + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { let checkpoint = await this.getCheckpoint(options); - const map = new Map( - Object.entries(buckets).map(([bucket, opId]) => [this.resolveBucketName(bucket), opId]) - ); - return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); + const requests = Object.entries(buckets).map(([bucket, opId]) => this.bucketDataRequest(bucket, opId)); + return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, requests)); } async getBucketData(bucket: string, start?: ProtocolOpId | InternalOpId | undefined, options?: { timeout?: number }) { @@ -206,31 +220,31 @@ export class ChangeStreamTestContext { if (typeof start == 'string') { start = BigInt(start); } - const resolvedBucket = this.resolveBucketName(bucket); const checkpoint = await this.getCheckpoint(options); - const map = new Map([[resolvedBucket, start]]); + const resolvedBucket = this.resolveBucketName(bucket); + let request: storage.BucketDataRequest = { bucket: resolvedBucket, start, source: {} as any }; let data: OplogEntry[] = []; while (true) { - const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batch = this.storage!.getBucketDataBatch(checkpoint, [request]); const batches = await test_utils.fromAsync(batch); data = data.concat(batches[0]?.chunkData.data ?? []); if (batches.length == 0 || !batches[0]!.chunkData.has_more) { break; } - map.set(resolvedBucket, BigInt(batches[0]!.chunkData.next_after)); + request = { ...request, start: BigInt(batches[0]!.chunkData.next_after) }; } return data; } async getChecksums(buckets: string[], options?: { timeout?: number }): Promise { let checkpoint = await this.getCheckpoint(options); - const versionedBuckets = buckets.map((bucket) => this.resolveBucketName(bucket)); + const versionedBuckets = buckets.map((bucket) => this.bucketChecksumRequest(bucket)); const checksums = await this.storage!.getChecksums(checkpoint, versionedBuckets); const unversioned: utils.ChecksumMap = new Map(); for (let i = 0; i < buckets.length; i++) { - unversioned.set(buckets[i], checksums.get(versionedBuckets[i])!); + unversioned.set(buckets[i], checksums.get(versionedBuckets[i].bucket)!); } return unversioned; } diff --git a/modules/module-mssql/test/src/CDCStreamTestContext.ts b/modules/module-mssql/test/src/CDCStreamTestContext.ts index 00de56929..1c4335b4e 100644 --- a/modules/module-mssql/test/src/CDCStreamTestContext.ts +++ b/modules/module-mssql/test/src/CDCStreamTestContext.ts @@ -169,10 +169,18 @@ export class CDCStreamTestContext implements AsyncDisposable { return checkpoint; } + private bucketDataRequest(bucket: string, start: InternalOpId): storage.BucketDataRequest { + return { + bucket, + start, + source: {} as any + }; + } + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { let checkpoint = await this.getCheckpoint(options); - const map = new Map(Object.entries(buckets)); - return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); + const requests = Object.entries(buckets).map(([bucket, start]) => this.bucketDataRequest(bucket, start)); + return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, requests)); } /** @@ -184,17 +192,17 @@ export class CDCStreamTestContext implements AsyncDisposable { start = BigInt(start); } const checkpoint = await this.getCheckpoint(options); - const map = new Map([[bucket, start]]); + let request: storage.BucketDataRequest = this.bucketDataRequest(bucket, start); let data: OplogEntry[] = []; while (true) { - const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batch = this.storage!.getBucketDataBatch(checkpoint, [request]); const batches = await test_utils.fromAsync(batch); data = data.concat(batches[0]?.chunkData.data ?? []); if (batches.length == 0 || !batches[0]!.chunkData.has_more) { break; } - map.set(bucket, BigInt(batches[0]!.chunkData.next_after)); + request = this.bucketDataRequest(bucket, BigInt(batches[0]!.chunkData.next_after)); } return data; } @@ -208,8 +216,7 @@ export class CDCStreamTestContext implements AsyncDisposable { start = BigInt(start); } const { checkpoint } = await this.storage!.getCheckpoint(); - const map = new Map([[bucket, start]]); - const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batch = this.storage!.getBucketDataBatch(checkpoint, [this.bucketDataRequest(bucket, start)]); const batches = await test_utils.fromAsync(batch); return batches[0]?.chunkData.data ?? []; } diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index 065ae93a7..e0dc18f2e 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -152,10 +152,18 @@ export class BinlogStreamTestContext { return checkpoint; } + private bucketDataRequest(bucket: string, start: InternalOpId): storage.BucketDataRequest { + return { + bucket, + start, + source: {} as any + }; + } + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { const checkpoint = await this.getCheckpoint(options); - const map = new Map(Object.entries(buckets)); - return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); + const requests = Object.entries(buckets).map(([bucket, start]) => this.bucketDataRequest(bucket, start)); + return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, requests)); } async getBucketData( @@ -168,8 +176,7 @@ export class BinlogStreamTestContext { start = BigInt(start); } const checkpoint = await this.getCheckpoint(options); - const map = new Map([[bucket, start]]); - const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batch = this.storage!.getBucketDataBatch(checkpoint, [this.bucketDataRequest(bucket, start)]); const batches = await test_utils.fromAsync(batch); return batches[0]?.chunkData.data ?? []; } diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 0737aef00..19d1b25bc 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -431,10 +431,10 @@ export class PostgresSyncRulesStorage async *getBucketDataBatch( checkpoint: InternalOpId, - dataBuckets: Map, + dataBuckets: storage.BucketDataRequest[], options?: storage.BucketDataBatchOptions ): AsyncIterable { - if (dataBuckets.size == 0) { + if (dataBuckets.length == 0) { return; } @@ -446,10 +446,8 @@ export class PostgresSyncRulesStorage // not match up with chunks. const end = checkpoint ?? BIGINT_MAX; - const filters = Array.from(dataBuckets.entries()).map(([name, start]) => ({ - bucket_name: name, - start: start - })); + const filters = dataBuckets.map((request) => ({ bucket_name: request.bucket, start: request.start })); + const startOpByBucket = new Map(dataBuckets.map((request) => [request.bucket, request.start])); const batchRowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT; const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES; @@ -549,7 +547,7 @@ export class PostgresSyncRulesStorage } if (start == null) { - const startOpId = dataBuckets.get(bucket_name); + const startOpId = startOpByBucket.get(bucket_name); if (startOpId == null) { throw new framework.ServiceAssertionError(`data for unexpected bucket: ${bucket_name}`); } @@ -604,7 +602,10 @@ export class PostgresSyncRulesStorage } } - async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise { + async getChecksums( + checkpoint: utils.InternalOpId, + buckets: storage.BucketChecksumRequest[] + ): Promise { return this.checksumCache.getChecksumMap(checkpoint, buckets); } diff --git a/modules/module-postgres-storage/test/src/storage_sync.test.ts b/modules/module-postgres-storage/test/src/storage_sync.test.ts index 11de4070e..2de50900c 100644 --- a/modules/module-postgres-storage/test/src/storage_sync.test.ts +++ b/modules/module-postgres-storage/test/src/storage_sync.test.ts @@ -87,9 +87,12 @@ function registerStorageVersionTests(storageVersion: number) { const checkpoint = result!.flushed_op; const options: storage.BucketDataBatchOptions = {}; + const batchRequest = (start: bigint): storage.BucketDataRequest[] => [ + { bucket: globalBucket, start, source: {} as any } + ]; const batch1 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([[globalBucket, 0n]]), options) + bucketStorage.getBucketDataBatch(checkpoint, batchRequest(0n), options) ); expect(test_utils.getBatchData(batch1)).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } @@ -101,11 +104,7 @@ function registerStorageVersionTests(storageVersion: number) { }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch( - checkpoint, - new Map([[globalBucket, BigInt(batch1[0].chunkData.next_after)]]), - options - ) + bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch1[0].chunkData.next_after)), options) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '2', op: 'PUT', object_id: 'large1', checksum: 1178768505 } @@ -117,11 +116,7 @@ function registerStorageVersionTests(storageVersion: number) { }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch( - checkpoint, - new Map([[globalBucket, BigInt(batch2[0].chunkData.next_after)]]), - options - ) + bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch2[0].chunkData.next_after)), options) ); expect(test_utils.getBatchData(batch3)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 } @@ -133,11 +128,7 @@ function registerStorageVersionTests(storageVersion: number) { }); const batch4 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch( - checkpoint, - new Map([[globalBucket, BigInt(batch3[0].chunkData.next_after)]]), - options - ) + bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch3[0].chunkData.next_after)), options) ); expect(test_utils.getBatchData(batch4)).toEqual([ { op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 } diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 0b4f88aaa..e250f0c32 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -192,12 +192,25 @@ export class WalStreamTestContext implements AsyncDisposable { return `${this.syncRulesId}#${bucket}`; } + private bucketDataRequest(bucket: string, start: InternalOpId): storage.BucketDataRequest { + return { + bucket: this.resolveBucketName(bucket), + start, + source: {} as any + }; + } + + private bucketChecksumRequest(bucket: string): storage.BucketChecksumRequest { + return { + bucket: this.resolveBucketName(bucket), + source: {} as any + }; + } + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { let checkpoint = await this.getCheckpoint(options); - const map = new Map( - Object.entries(buckets).map(([bucket, opId]) => [this.resolveBucketName(bucket), opId]) - ); - return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); + const requests = Object.entries(buckets).map(([bucket, opId]) => this.bucketDataRequest(bucket, opId)); + return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, requests)); } /** @@ -208,31 +221,31 @@ export class WalStreamTestContext implements AsyncDisposable { if (typeof start == 'string') { start = BigInt(start); } - const resolvedBucket = this.resolveBucketName(bucket); const checkpoint = await this.getCheckpoint(options); - const map = new Map([[resolvedBucket, start]]); + const resolvedBucket = this.resolveBucketName(bucket); + let request: storage.BucketDataRequest = { bucket: resolvedBucket, start, source: {} as any }; let data: OplogEntry[] = []; while (true) { - const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batch = this.storage!.getBucketDataBatch(checkpoint, [request]); const batches = await test_utils.fromAsync(batch); data = data.concat(batches[0]?.chunkData.data ?? []); if (batches.length == 0 || !batches[0]!.chunkData.has_more) { break; } - map.set(resolvedBucket, BigInt(batches[0]!.chunkData.next_after)); + request = { ...request, start: BigInt(batches[0]!.chunkData.next_after) }; } return data; } async getChecksums(buckets: string[], options?: { timeout?: number }) { const checkpoint = await this.getCheckpoint(options); - const versionedBuckets = buckets.map((bucket) => this.resolveBucketName(bucket)); + const versionedBuckets = buckets.map((bucket) => this.bucketChecksumRequest(bucket)); const checksums = await this.storage!.getChecksums(checkpoint, versionedBuckets); const unversioned = new Map(); for (let i = 0; i < buckets.length; i++) { - unversioned.set(buckets[i], checksums.get(versionedBuckets[i])!); + unversioned.set(buckets[i], checksums.get(versionedBuckets[i].bucket)!); } return unversioned; @@ -251,10 +264,8 @@ export class WalStreamTestContext implements AsyncDisposable { if (typeof start == 'string') { start = BigInt(start); } - const resolvedBucket = this.resolveBucketName(bucket); const { checkpoint } = await this.storage!.getCheckpoint(); - const map = new Map([[resolvedBucket, start]]); - const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batch = this.storage!.getBucketDataBatch(checkpoint, [this.bucketDataRequest(bucket, start)]); const batches = await test_utils.fromAsync(batch); return batches[0]?.chunkData.data ?? []; } diff --git a/packages/service-core-tests/src/tests/util.ts b/packages/service-core-tests/src/tests/util.ts index fcb0147ba..f6dd0163a 100644 --- a/packages/service-core-tests/src/tests/util.ts +++ b/packages/service-core-tests/src/tests/util.ts @@ -1,5 +1,6 @@ import { storage } from '@powersync/service-core'; import { + BucketDataSource, ParameterIndexLookupCreator, SourceTableInterface, SqliteRow, @@ -16,17 +17,37 @@ export function bucketRequest(syncRules: storage.PersistedSyncRulesContent, buck return versionedBuckets ? `${syncRules.id}#${bucketName}` : bucketName; } -export function bucketRequests(syncRules: storage.PersistedSyncRulesContent, bucketNames: string[]): string[] { - return bucketNames.map((bucketName) => bucketRequest(syncRules, bucketName)); -} - export function bucketRequestMap( syncRules: storage.PersistedSyncRulesContent, buckets: Iterable -): Map { - return new Map(Array.from(buckets, ([bucketName, opId]) => [bucketRequest(syncRules, bucketName), opId])); +): storage.BucketDataRequest[] { + return Array.from(buckets, ([bucketName, opId]) => ({ + bucket: bucketRequest(syncRules, bucketName), + start: opId, + source: EMPTY_DATA_SOURCE + })); } +export function bucketRequests( + syncRules: storage.PersistedSyncRulesContent, + bucketNames: string[] +): storage.BucketChecksumRequest[] { + return bucketNames.map((bucketName) => ({ + bucket: bucketRequest(syncRules, bucketName), + source: EMPTY_DATA_SOURCE + })); +} + +const EMPTY_DATA_SOURCE: BucketDataSource = { + uniqueName: 'empty', + bucketParameters: [], + getSourceTables: () => new Set(), + tableSyncsData: () => false, + evaluateRow: () => [], + resolveResultSets: () => {}, + debugWriteOutputTables: () => {} +}; + const EMPTY_LOOKUP_SOURCE: ParameterIndexLookupCreator = { get defaultLookupScope(): ParameterLookupScope { return { diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 2e5ac1c22..4790f3dd4 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -1,7 +1,9 @@ import { OrderedSet } from '@js-sdsl/ordered-set'; import { LRUCache } from 'lru-cache/min'; +import { BucketDataSource } from '@powersync/service-sync-rules'; import { BucketChecksum } from '../util/protocol-types.js'; import { addBucketChecksums, ChecksumMap, InternalOpId, PartialChecksum } from '../util/utils.js'; +import { BucketChecksumRequest } from './SyncRulesBucketStorage.js'; interface ChecksumFetchContext { fetch(bucket: string): Promise; @@ -10,6 +12,7 @@ interface ChecksumFetchContext { export interface FetchPartialBucketChecksum { bucket: string; + source: BucketDataSource; start?: InternalOpId; end: InternalOpId; } @@ -113,10 +116,10 @@ export class ChecksumCache { this.bucketCheckpoints.clear(); } - async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { + async getChecksums(checkpoint: InternalOpId, buckets: BucketChecksumRequest[]): Promise { const checksums = await this.getChecksumMap(checkpoint, buckets); // Return results in the same order as the request - return buckets.map((bucket) => checksums.get(bucket)!); + return buckets.map((bucket) => checksums.get(bucket.bucket)!); } /** @@ -126,7 +129,7 @@ export class ChecksumCache { * * @returns a Map with exactly one entry for each bucket requested */ - async getChecksumMap(checkpoint: InternalOpId, buckets: string[]): Promise { + async getChecksumMap(checkpoint: InternalOpId, buckets: BucketChecksumRequest[]): Promise { // Buckets that don't have a cached checksum for this checkpoint yet let toFetch = new Set(); @@ -164,19 +167,21 @@ export class ChecksumCache { // One promise to await to ensure all fetch requests completed. let settledPromise: Promise[]> | null = null; + const sourceMap = new Map(); + try { // Individual cache fetch promises let cacheFetchPromises: Promise[] = []; for (let bucket of buckets) { - const cacheKey = makeCacheKey(checkpoint, bucket); + const cacheKey = makeCacheKey(checkpoint, bucket.bucket); let status: LRUCache.Status = {}; const p = this.cache.fetch(cacheKey, { context: context, status: status }).then((checksums) => { if (checksums == null) { // Should never happen throw new Error(`Failed to get checksums for ${cacheKey}`); } - finalResults.set(bucket, checksums); + finalResults.set(bucket.bucket, checksums); }); cacheFetchPromises.push(p); if (status.fetch == 'hit' || status.fetch == 'inflight') { @@ -185,7 +190,8 @@ export class ChecksumCache { // In either case, we don't need to fetch a new checksum. } else { // We need a new request for this checksum. - toFetch.add(bucket); + toFetch.add(bucket.bucket); + sourceMap.set(bucket.bucket, bucket.source); } } // We do this directly after creating the promises, otherwise @@ -220,6 +226,7 @@ export class ChecksumCache { // Partial checksum found - make a partial checksum request bucketRequest = { bucket, + source: sourceMap.get(bucket)!, start: cp, end: checkpoint }; @@ -240,6 +247,7 @@ export class ChecksumCache { // No partial checksum found - make a new full checksum request bucketRequest = { bucket, + source: sourceMap.get(bucket)!, end: checkpoint }; add.set(bucket, { diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 6543c5bba..9be697f21 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -1,5 +1,10 @@ import { Logger, ObserverClient } from '@powersync/lib-services-framework'; -import { HydratedSyncRules, ScopedParameterLookup, SqliteJsonRow } from '@powersync/service-sync-rules'; +import { + BucketDataSource, + HydratedSyncRules, + ScopedParameterLookup, + SqliteJsonRow +} from '@powersync/service-sync-rules'; import * as util from '../util/util-index.js'; import { BucketStorageBatch, FlushedResult, SaveUpdate } from './BucketStorageBatch.js'; import { BucketStorageFactory } from './BucketStorageFactory.js'; @@ -103,7 +108,7 @@ export interface SyncRulesBucketStorage */ getBucketDataBatch( checkpoint: util.InternalOpId, - dataBuckets: Map, + dataBuckets: BucketDataRequest[], options?: BucketDataBatchOptions ): AsyncIterable; @@ -115,7 +120,7 @@ export interface SyncRulesBucketStorage * This may be slow, depending on the size of the buckets. * The checksums are cached internally to compensate for this, but does not cover all cases. */ - getChecksums(checkpoint: util.InternalOpId, buckets: string[]): Promise; + getChecksums(checkpoint: util.InternalOpId, buckets: BucketChecksumRequest[]): Promise; /** * Clear checksum cache. Primarily intended for tests. @@ -127,6 +132,16 @@ export interface SyncRulesBucketStorageListener { batchStarted: (batch: BucketStorageBatch) => void; } +export interface BucketDataRequest { + bucket: string; + start: util.InternalOpId; + source: BucketDataSource; +} +export interface BucketChecksumRequest { + bucket: string; + source: BucketDataSource; +} + export interface SyncRuleStatus { checkpoint_lsn: string | null; active: boolean; diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index b652e9f84..1ade23800 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -138,20 +138,20 @@ export class BucketChecksumState { } // Re-check updated buckets only - let checksumLookups: string[] = []; + let checksumLookups: storage.BucketChecksumRequest[] = []; let newChecksums = new Map(); - for (let bucket of bucketDescriptionMap.keys()) { - if (!updatedBuckets.has(bucket)) { - const existing = this.lastChecksums.get(bucket); + for (let desc of bucketDescriptionMap.values()) { + if (!updatedBuckets.has(desc.bucket)) { + const existing = this.lastChecksums.get(desc.bucket); if (existing == null) { // If this happens, it means updatedBuckets did not correctly include all new buckets - throw new ServiceAssertionError(`Existing checksum not found for bucket ${bucket}`); + throw new ServiceAssertionError(`Existing checksum not found for bucket ${desc.bucket}`); } // Bucket is not specifically updated, and we have a previous checksum - newChecksums.set(bucket, existing); + newChecksums.set(desc.bucket, existing); } else { - checksumLookups.push(bucket); + checksumLookups.push({ bucket: desc.bucket, source: desc.source }); } } @@ -164,7 +164,7 @@ export class BucketChecksumState { checksumMap = newChecksums; } else { // Re-check all buckets - const bucketList = [...bucketDescriptionMap.keys()]; + const bucketList = [...bucketDescriptionMap.values()].map((b) => ({ bucket: b.bucket, source: b.source })); checksumMap = await storage.getChecksums(base.checkpoint, bucketList); } @@ -340,17 +340,17 @@ export class BucketChecksumState { deferredLog(); }, - getFilteredBucketPositions: (buckets?: ResolvedBucket[]): Map => { + getFilteredBucketPositions: (buckets?: ResolvedBucket[]): storage.BucketDataRequest[] => { if (!hasAdvanced) { throw new ServiceAssertionError('Call line.advance() before getFilteredBucketPositions()'); } buckets ??= bucketsToFetch; - const filtered = new Map(); + const filtered: storage.BucketDataRequest[] = []; for (let bucket of buckets) { const state = this.bucketDataPositions.get(bucket.bucket); if (state) { - filtered.set(bucket.bucket, state.start_op_id); + filtered.push({ bucket: bucket.bucket, start: state.start_op_id, source: bucket.source }); } } return filtered; @@ -670,7 +670,7 @@ export interface CheckpointLine { * * @param bucketsToFetch List of buckets to fetch - either this.bucketsToFetch, or a subset of it. Defaults to this.bucketsToFetch. */ - getFilteredBucketPositions(bucketsToFetch?: ResolvedBucket[]): Map; + getFilteredBucketPositions(bucketsToFetch?: ResolvedBucket[]): storage.BucketDataRequest[]; /** * Update the position of bucket data the client has, after it was sent to the client. diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index f0b61342c..1ae7d9ba6 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -1,7 +1,9 @@ import { ChecksumCache, FetchChecksums, FetchPartialBucketChecksum } from '@/storage/ChecksumCache.js'; import { addChecksums, BucketChecksum, InternalOpId, PartialChecksum } from '@/util/util-index.js'; +import { BucketDataSource } from '@powersync/service-sync-rules'; import * as crypto from 'node:crypto'; import { describe, expect, it } from 'vitest'; +import { removeSource } from './utils.js'; /** * Create a deterministic BucketChecksum based on the bucket name and checkpoint for testing purposes. @@ -67,6 +69,12 @@ describe('checksum cache', function () { return new ChecksumCache({ fetchChecksums: fetch }); }; + const DUMMY_SOURCE: BucketDataSource = null as any; + + function removeLookupSources(lookups: FetchPartialBucketChecksum[]) { + return lookups.map((b) => removeSource(b)); + } + it('should handle a sequential lookups (a)', async function () { let lookups: FetchPartialBucketChecksum[][] = []; const cache = factory(async (batch) => { @@ -74,13 +82,13 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_123]); - expect(await cache.getChecksums(1234n, ['test'])).toEqual([TEST_1234]); + expect(await cache.getChecksums(1234n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_1234]); - expect(await cache.getChecksums(123n, ['test2'])).toEqual([TEST2_123]); + expect(await cache.getChecksums(123n, [{ bucket: 'test2', source: DUMMY_SOURCE }])).toEqual([TEST2_123]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toMatchObject([ [{ bucket: 'test', end: 123n }], // This should use the previous lookup [{ bucket: 'test', start: 123n, end: 1234n }], @@ -96,13 +104,13 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums(123n, ['test2'])).toEqual([TEST2_123]); + expect(await cache.getChecksums(123n, [{ bucket: 'test2', source: DUMMY_SOURCE }])).toEqual([TEST2_123]); - expect(await cache.getChecksums(1234n, ['test'])).toEqual([TEST_1234]); + expect(await cache.getChecksums(1234n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_1234]); - expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_123]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ // With this order, there is no option for a partial lookup [{ bucket: 'test2', end: 123n }], [{ bucket: 'test', end: 1234n }], @@ -117,16 +125,16 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - const p1 = cache.getChecksums(123n, ['test']); - const p2 = cache.getChecksums(1234n, ['test']); - const p3 = cache.getChecksums(123n, ['test2']); + const p1 = cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }]); + const p2 = cache.getChecksums(1234n, [{ bucket: 'test', source: DUMMY_SOURCE }]); + const p3 = cache.getChecksums(123n, [{ bucket: 'test2', source: DUMMY_SOURCE }]); expect(await p1).toEqual([TEST_123]); expect(await p2).toEqual([TEST_1234]); expect(await p3).toEqual([TEST2_123]); // Concurrent requests, so we can't do a partial lookup for 123 -> 1234 - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ [{ bucket: 'test', end: 123n }], [{ bucket: 'test', end: 1234n }], [{ bucket: 'test2', end: 123n }] @@ -140,15 +148,15 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - const p1 = cache.getChecksums(123n, ['test']); - const p2 = cache.getChecksums(123n, ['test']); + const p1 = cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }]); + const p2 = cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }]); expect(await p1).toEqual([TEST_123]); expect(await p2).toEqual([TEST_123]); // The lookup should be deduplicated, even though it's in progress - expect(lookups).toEqual([[{ bucket: 'test', end: 123n }]]); + expect(lookups.map(removeLookupSources)).toEqual([[{ bucket: 'test', end: 123n }]]); }); it('should handle serial + concurrent lookups', async function () { @@ -158,15 +166,15 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_123]); - const p2 = cache.getChecksums(1234n, ['test']); - const p3 = cache.getChecksums(1234n, ['test']); + const p2 = cache.getChecksums(1234n, [{ bucket: 'test', source: DUMMY_SOURCE }]); + const p3 = cache.getChecksums(1234n, [{ bucket: 'test', source: DUMMY_SOURCE }]); expect(await p2).toEqual([TEST_1234]); expect(await p3).toEqual([TEST_1234]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ [{ bucket: 'test', end: 123n }], // This lookup is deduplicated [{ bucket: 'test', start: 123n, end: 1234n }] @@ -180,9 +188,14 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums(123n, ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); + expect( + await cache.getChecksums(123n, [ + { bucket: 'test', source: DUMMY_SOURCE }, + { bucket: 'test2', source: DUMMY_SOURCE } + ]) + ).toEqual([TEST_123, TEST2_123]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ [ // Both lookups in the same request { bucket: 'test', end: 123n }, @@ -198,10 +211,15 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); - expect(await cache.getChecksums(123n, ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_123]); + expect( + await cache.getChecksums(123n, [ + { bucket: 'test', source: DUMMY_SOURCE }, + { bucket: 'test2', source: DUMMY_SOURCE } + ]) + ).toEqual([TEST_123, TEST2_123]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ // Request 1 [{ bucket: 'test', end: 123n }], // Request 2 @@ -216,13 +234,19 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - const a = cache.getChecksums(123n, ['test', 'test2']); - const b = cache.getChecksums(123n, ['test2', 'test3']); + const a = cache.getChecksums(123n, [ + { bucket: 'test', source: DUMMY_SOURCE }, + { bucket: 'test2', source: DUMMY_SOURCE } + ]); + const b = cache.getChecksums(123n, [ + { bucket: 'test2', source: DUMMY_SOURCE }, + { bucket: 'test3', source: DUMMY_SOURCE } + ]); expect(await a).toEqual([TEST_123, TEST2_123]); expect(await b).toEqual([TEST2_123, TEST3_123]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ // Request A [ { bucket: 'test', end: 123n }, @@ -240,9 +264,9 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_123]); - expect(await cache.getChecksums(125n, ['test'])).toEqual([ + expect(await cache.getChecksums(125n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ { bucket: 'test', checksum: -1865121912, @@ -250,14 +274,14 @@ describe('checksum cache', function () { } ]); - expect(await cache.getChecksums(124n, ['test'])).toEqual([ + expect(await cache.getChecksums(124n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ { bucket: 'test', checksum: 1887460431, count: 124 } ]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ [{ bucket: 'test', end: 123n }], [{ bucket: 'test', start: 123n, end: 125n }], [{ bucket: 'test', start: 123n, end: 124n }] @@ -275,19 +299,31 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - const a = cache.getChecksums(123n, ['test', 'test2']); - const b = cache.getChecksums(123n, ['test2', 'test3']); + const a = cache.getChecksums(123n, [ + { bucket: 'test', source: DUMMY_SOURCE }, + { bucket: 'test2', source: DUMMY_SOURCE } + ]); + const b = cache.getChecksums(123n, [ + { bucket: 'test2', source: DUMMY_SOURCE }, + { bucket: 'test3', source: DUMMY_SOURCE } + ]); await expect(a).rejects.toEqual(TEST_ERROR); await expect(b).rejects.toEqual(TEST_ERROR); - const a2 = cache.getChecksums(123n, ['test', 'test2']); - const b2 = cache.getChecksums(123n, ['test2', 'test3']); + const a2 = cache.getChecksums(123n, [ + { bucket: 'test', source: DUMMY_SOURCE }, + { bucket: 'test2', source: DUMMY_SOURCE } + ]); + const b2 = cache.getChecksums(123n, [ + { bucket: 'test2', source: DUMMY_SOURCE }, + { bucket: 'test3', source: DUMMY_SOURCE } + ]); expect(await a2).toEqual([TEST_123, TEST2_123]); expect(await b2).toEqual([TEST2_123, TEST3_123]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ // Request A (fails) [ { bucket: 'test', end: 123n }, @@ -311,11 +347,15 @@ describe('checksum cache', function () { return fetchTestChecksums(batch.filter((b) => b.bucket != 'test')); }); - expect(await cache.getChecksums(123n, ['test'])).toEqual([{ bucket: 'test', checksum: 0, count: 0 }]); - expect(await cache.getChecksums(123n, ['test', 'test2'])).toEqual([ - { bucket: 'test', checksum: 0, count: 0 }, - TEST2_123 + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ + { bucket: 'test', checksum: 0, count: 0 } ]); + expect( + await cache.getChecksums(123n, [ + { bucket: 'test', source: DUMMY_SOURCE }, + { bucket: 'test2', source: DUMMY_SOURCE } + ]) + ).toEqual([{ bucket: 'test', checksum: 0, count: 0 }, TEST2_123]); }); it('should handle missing checksums (b)', async function () { @@ -325,8 +365,10 @@ describe('checksum cache', function () { return fetchTestChecksums(batch.filter((b) => b.bucket != 'test' || b.end != 123n)); }); - expect(await cache.getChecksums(123n, ['test'])).toEqual([{ bucket: 'test', checksum: 0, count: 0 }]); - expect(await cache.getChecksums(1234n, ['test'])).toEqual([ + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ + { bucket: 'test', checksum: 0, count: 0 } + ]); + expect(await cache.getChecksums(1234n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ { bucket: 'test', checksum: 1597020602, @@ -334,7 +376,10 @@ describe('checksum cache', function () { } ]); - expect(lookups).toEqual([[{ bucket: 'test', end: 123n }], [{ bucket: 'test', start: 123n, end: 1234n }]]); + expect(lookups.map(removeLookupSources)).toEqual([ + [{ bucket: 'test', end: 123n }], + [{ bucket: 'test', start: 123n, end: 1234n }] + ]); }); it('should use maxSize', async function () { @@ -347,8 +392,8 @@ describe('checksum cache', function () { maxSize: 2 }); - expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); - expect(await cache.getChecksums(124n, ['test'])).toEqual([ + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_123]); + expect(await cache.getChecksums(124n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ { bucket: 'test', checksum: 1887460431, @@ -356,30 +401,30 @@ describe('checksum cache', function () { } ]); - expect(await cache.getChecksums(125n, ['test'])).toEqual([ + expect(await cache.getChecksums(125n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ { bucket: 'test', checksum: -1865121912, count: 125 } ]); - expect(await cache.getChecksums(126n, ['test'])).toEqual([ + expect(await cache.getChecksums(126n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ { bucket: 'test', checksum: -1720007310, count: 126 } ]); - expect(await cache.getChecksums(124n, ['test'])).toEqual([ + expect(await cache.getChecksums(124n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([ { bucket: 'test', checksum: 1887460431, count: 124 } ]); - expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_123]); - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ [{ bucket: 'test', end: 123n }], [{ bucket: 'test', start: 123n, end: 124n }], [{ bucket: 'test', start: 124n, end: 125n }], @@ -400,10 +445,10 @@ describe('checksum cache', function () { maxSize: 2 }); - const p3 = cache.getChecksums(123n, ['test3']); - const p4 = cache.getChecksums(123n, ['test4']); - const p1 = cache.getChecksums(123n, ['test']); - const p2 = cache.getChecksums(123n, ['test2']); + const p3 = cache.getChecksums(123n, [{ bucket: 'test3', source: DUMMY_SOURCE }]); + const p4 = cache.getChecksums(123n, [{ bucket: 'test4', source: DUMMY_SOURCE }]); + const p1 = cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }]); + const p2 = cache.getChecksums(123n, [{ bucket: 'test2', source: DUMMY_SOURCE }]); expect(await p1).toEqual([TEST_123]); expect(await p2).toEqual([TEST2_123]); @@ -417,7 +462,7 @@ describe('checksum cache', function () { ]); // The lookup should be deduplicated, even though it's in progress - expect(lookups).toEqual([ + expect(lookups.map(removeLookupSources)).toEqual([ [{ bucket: 'test3', end: 123n }], [{ bucket: 'test4', end: 123n }], [{ bucket: 'test', end: 123n }], @@ -434,7 +479,7 @@ describe('checksum cache', function () { return fetchTestChecksums(batch); }); - expect(await cache.getChecksums(123n, ['test'])).toEqual([TEST_123]); - expect(await cache.getChecksums(1234n, ['test'])).toEqual([TEST_1234]); + expect(await cache.getChecksums(123n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_123]); + expect(await cache.getChecksums(1234n, [{ bucket: 'test', source: DUMMY_SOURCE }])).toEqual([TEST_1234]); }); }); diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index a59f6a223..71264e931 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -95,6 +95,10 @@ bucket_definitions: const syncRequest: StreamingSyncRequest = {}; const tokenPayload = new JwtPayload({ sub: '' }); + function bucketStarts(requests: { bucket: string; start: InternalOpId }[]) { + return new Map(requests.map((request) => [request.bucket, request.start])); + } + test('global bucket with update', async () => { const storage = new MockBucketChecksumStateStorage(); // Set intial state @@ -129,7 +133,7 @@ bucket_definitions: } ]); // This is the bucket data to be fetched - expect(line.getFilteredBucketPositions()).toEqual(new Map([['1#global[]', 0n]])); + expect(bucketStarts(line.getFilteredBucketPositions())).toEqual(new Map([['1#global[]', 0n]])); // This similuates the bucket data being sent line.advance(); @@ -160,7 +164,7 @@ bucket_definitions: write_checkpoint: undefined } }); - expect(line2.getFilteredBucketPositions()).toEqual(new Map([['1#global[]', 1n]])); + expect(bucketStarts(line2.getFilteredBucketPositions())).toEqual(new Map([['1#global[]', 1n]])); }); test('global bucket with initial state', async () => { @@ -201,7 +205,7 @@ bucket_definitions: } ]); // This is the main difference between this and the previous test - expect(line.getFilteredBucketPositions()).toEqual(new Map([['1#global[]', 1n]])); + expect(bucketStarts(line.getFilteredBucketPositions())).toEqual(new Map([['1#global[]', 1n]])); }); test('multiple static buckets', async () => { @@ -308,7 +312,7 @@ bucket_definitions: priority: 3 } ]); - expect(line.getFilteredBucketPositions()).toEqual(new Map([['1#global[]', 0n]])); + expect(bucketStarts(line.getFilteredBucketPositions())).toEqual(new Map([['1#global[]', 0n]])); }); test('invalidating individual bucket', async () => { @@ -464,7 +468,7 @@ bucket_definitions: ]); // This is the bucket data to be fetched - expect(line.getFilteredBucketPositions()).toEqual( + expect(bucketStarts(line.getFilteredBucketPositions())).toEqual( new Map([ ['2#global[1]', 0n], ['2#global[2]', 0n] @@ -516,7 +520,7 @@ bucket_definitions: } ]); - expect(line2.getFilteredBucketPositions()).toEqual( + expect(bucketStarts(line2.getFilteredBucketPositions())).toEqual( new Map([ ['2#global[1]', 3n], ['2#global[2]', 1n] @@ -588,7 +592,7 @@ bucket_definitions: ]); line.advance(); // This is the bucket data to be fetched - expect(line.getFilteredBucketPositions()).toEqual( + expect(bucketStarts(line.getFilteredBucketPositions())).toEqual( new Map([ ['3#by_project[1]', 0n], ['3#by_project[2]', 0n] @@ -630,7 +634,7 @@ bucket_definitions: write_checkpoint: undefined } }); - expect(line2.getFilteredBucketPositions()).toEqual(new Map([['3#by_project[3]', 0n]])); + expect(bucketStarts(line2.getFilteredBucketPositions())).toEqual(new Map([['3#by_project[3]', 0n]])); }); describe('streams', () => { @@ -1072,9 +1076,9 @@ class MockBucketChecksumStateStorage implements BucketChecksumStateStorage { this.filter?.({ invalidate: true }); } - async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { + async getChecksums(_checkpoint: InternalOpId, buckets: { bucket: string }[]): Promise { return new Map( - buckets.map((bucket) => { + buckets.map(({ bucket }) => { const checksum = this.state.get(bucket); return [ bucket, diff --git a/packages/service-core/test/src/utils.ts b/packages/service-core/test/src/utils.ts new file mode 100644 index 000000000..de7ece82c --- /dev/null +++ b/packages/service-core/test/src/utils.ts @@ -0,0 +1,9 @@ +/** + * Removes the source property from an object. + * + * This is for tests where we don't care about this value, and it adds a lot of noise in the output. + */ +export function removeSource(obj: T): Omit { + const { source, ...rest } = obj; + return rest; +} From 0ee2ddf8956a4997f7507ca689cdc0f2def9e036 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 4 Mar 2026 18:08:31 +0200 Subject: [PATCH 2/6] Update tests to use actual sources in most cases. --- .../test/src/storage_compacting.test.ts | 34 ++++----- .../test/src/storage_sync.test.ts | 20 ++++-- .../test/src/change_stream_utils.ts | 53 ++++++-------- .../test/src/CDCStreamTestContext.ts | 37 ++++++---- .../test/src/BinlogStreamUtils.ts | 30 ++++---- .../test/src/storage_compacting.test.ts | 2 +- .../test/src/storage_sync.test.ts | 25 ++++--- .../test/src/wal_stream_utils.ts | 58 +++++++-------- .../src/test-utils/general-utils.ts | 35 ++++++++- .../src/tests/register-compacting-tests.ts | 71 +++++++++---------- .../tests/register-data-storage-data-tests.ts | 45 ++++++------ .../register-data-storage-parameter-tests.ts | 13 ++-- .../src/tests/register-sync-tests.ts | 16 ++--- packages/service-core-tests/src/tests/util.ts | 35 ++------- .../src/routes/endpoints/admin.ts | 1 + .../src/storage/PersistedSyncRulesContent.ts | 5 ++ 16 files changed, 249 insertions(+), 231 deletions(-) diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index de96ce918..a4428ce3c 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -71,17 +71,17 @@ bucket_definitions: signal: null as any }); - const checksumAfter = await bucketStorage.getChecksums( - checkpoint, - bucketRequests(syncRules, ['by_user["u1"]', 'by_user["u2"]']) - ); - expect(checksumAfter.get(bucketRequest(syncRules, 'by_user["u1"]'))).toEqual({ - bucket: bucketRequest(syncRules, 'by_user["u1"]'), + const users = ['u1', 'u2']; + const userRequests = users.map((user) => bucketRequest(syncRules, `by_user["${user}"]`)); + const [u1Request, u2Request] = userRequests; + const checksumAfter = await bucketStorage.getChecksums(checkpoint, userRequests); + expect(checksumAfter.get(u1Request.bucket)).toEqual({ + bucket: u1Request.bucket, checksum: -659469718, count: 1 }); - expect(checksumAfter.get(bucketRequest(syncRules, 'by_user["u2"]'))).toEqual({ - bucket: bucketRequest(syncRules, 'by_user["u2"]'), + expect(checksumAfter.get(u2Request.bucket)).toEqual({ + bucket: u2Request.bucket, checksum: 430217650, count: 1 }); @@ -91,7 +91,7 @@ bucket_definitions: // Populate old sync rules version const { factory } = await setup(); - // Not populate another version (bucket definition name changed) + // Now populate another version (bucket definition name changed) const syncRules = await factory.updateSyncRules( updateSyncRulesFromYaml(` bucket_definitions: @@ -125,17 +125,17 @@ bucket_definitions: }); expect(result2.buckets).toEqual(0); - const checksumAfter = await bucketStorage.getChecksums( - checkpoint, - bucketRequests(syncRules, ['by_user2["u1"]', 'by_user2["u2"]']) - ); - expect(checksumAfter.get(bucketRequest(syncRules, 'by_user2["u1"]'))).toEqual({ - bucket: bucketRequest(syncRules, 'by_user2["u1"]'), + const users = ['u1', 'u2']; + const userRequests = users.map((user) => bucketRequest(syncRules, `by_user2["${user}"]`)); + const [u1Request, u2Request] = userRequests; + const checksumAfter = await bucketStorage.getChecksums(checkpoint, userRequests); + expect(checksumAfter.get(u1Request.bucket)).toEqual({ + bucket: u1Request.bucket, checksum: -659469718, count: 1 }); - expect(checksumAfter.get(bucketRequest(syncRules, 'by_user2["u2"]'))).toEqual({ - bucket: bucketRequest(syncRules, 'by_user2["u2"]'), + expect(checksumAfter.get(u2Request.bucket)).toEqual({ + bucket: u2Request.bucket, checksum: 430217650, count: 1 }); diff --git a/modules/module-mongodb-storage/test/src/storage_sync.test.ts b/modules/module-mongodb-storage/test/src/storage_sync.test.ts index d910ef5b7..b2767fd83 100644 --- a/modules/module-mongodb-storage/test/src/storage_sync.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_sync.test.ts @@ -80,11 +80,9 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor const checkpoint = result!.flushed_op; const options: storage.BucketDataBatchOptions = {}; - const batchRequest = (start: bigint): storage.BucketDataRequest[] => [ - { bucket: globalBucket, start, source: {} as any } - ]; - - const batch1 = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, batchRequest(0n), options)); + const batch1 = await test_utils.fromAsync( + bucketStorage.getBucketDataBatch(checkpoint, [bucketRequest(syncRules, 'global[]', 0n)], options) + ); expect(test_utils.getBatchData(batch1)).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 }, { op_id: '2', op: 'PUT', object_id: 'large1', checksum: 1178768505 } @@ -96,7 +94,11 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch1[0].chunkData.next_after)), options) + bucketStorage.getBucketDataBatch( + checkpoint, + [bucketRequest(syncRules, 'global[]', batch1[0].chunkData.next_after)], + options + ) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 } @@ -108,7 +110,11 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch2[0].chunkData.next_after)), options) + bucketStorage.getBucketDataBatch( + checkpoint, + [bucketRequest(syncRules, 'global[]', batch2[0].chunkData.next_after)], + options + ) ); expect(test_utils.getBatchData(batch3)).toEqual([ { op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index f32b893ec..8fc3931fc 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -15,7 +15,7 @@ import { updateSyncRulesFromYaml, utils } from '@powersync/service-core'; -import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; +import { bucketRequest, METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js'; import { MongoManager } from '@module/replication/MongoManager.js'; @@ -29,6 +29,7 @@ export class ChangeStreamTestContext { private abortController = new AbortController(); private streamPromise?: Promise>; private syncRulesId?: number; + private syncRulesContent?: storage.PersistedSyncRulesContent; public storage?: SyncRulesBucketStorage; /** @@ -105,6 +106,7 @@ export class ChangeStreamTestContext { updateSyncRulesFromYaml(content, { validate: true, storageVersion: this.storageVersion }) ); this.syncRulesId = syncRules.id; + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } @@ -116,10 +118,18 @@ export class ChangeStreamTestContext { } this.syncRulesId = syncRules.id; + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } + private getSyncRulesContent(): storage.PersistedSyncRulesContent { + if (this.syncRulesContent == null) { + throw new Error('Sync rules not configured - call updateSyncRules() first'); + } + return this.syncRulesContent; + } + get streamer() { if (this.storage == null) { throw new Error('updateSyncRules() first'); @@ -184,35 +194,11 @@ export class ChangeStreamTestContext { return checkpoint; } - private resolveBucketName(bucket: string) { - if (!this.versionedBuckets || /^\d+#/.test(bucket)) { - return bucket; - } - if (this.syncRulesId == null) { - throw new Error('Sync rules not configured - call updateSyncRules() first'); - } - return `${this.syncRulesId}#${bucket}`; - } - - private bucketDataRequest(bucket: string, start: InternalOpId): storage.BucketDataRequest { - return { - bucket: this.resolveBucketName(bucket), - start, - source: {} as any - }; - } - - private bucketChecksumRequest(bucket: string): storage.BucketChecksumRequest { - return { - bucket: this.resolveBucketName(bucket), - source: {} as any - }; - } - async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { let checkpoint = await this.getCheckpoint(options); - const requests = Object.entries(buckets).map(([bucket, opId]) => this.bucketDataRequest(bucket, opId)); - return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, requests)); + const syncRules = this.getSyncRulesContent(); + const map = Object.entries(buckets).map(([bucket, start]) => bucketRequest(syncRules, bucket, start)); + return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); } async getBucketData(bucket: string, start?: ProtocolOpId | InternalOpId | undefined, options?: { timeout?: number }) { @@ -220,26 +206,27 @@ export class ChangeStreamTestContext { if (typeof start == 'string') { start = BigInt(start); } + const syncRules = this.getSyncRulesContent(); const checkpoint = await this.getCheckpoint(options); - const resolvedBucket = this.resolveBucketName(bucket); - let request: storage.BucketDataRequest = { bucket: resolvedBucket, start, source: {} as any }; + let map = [bucketRequest(syncRules, bucket, start)]; let data: OplogEntry[] = []; while (true) { - const batch = this.storage!.getBucketDataBatch(checkpoint, [request]); + const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); data = data.concat(batches[0]?.chunkData.data ?? []); if (batches.length == 0 || !batches[0]!.chunkData.has_more) { break; } - request = { ...request, start: BigInt(batches[0]!.chunkData.next_after) }; + map = [bucketRequest(syncRules, bucket, BigInt(batches[0]!.chunkData.next_after))]; } return data; } async getChecksums(buckets: string[], options?: { timeout?: number }): Promise { let checkpoint = await this.getCheckpoint(options); - const versionedBuckets = buckets.map((bucket) => this.bucketChecksumRequest(bucket)); + const syncRules = this.getSyncRulesContent(); + const versionedBuckets = buckets.map((bucket) => bucketRequest(syncRules, bucket, 0n)); const checksums = await this.storage!.getChecksums(checkpoint, versionedBuckets); const unversioned: utils.ChecksumMap = new Map(); diff --git a/modules/module-mssql/test/src/CDCStreamTestContext.ts b/modules/module-mssql/test/src/CDCStreamTestContext.ts index 1c4335b4e..69ee25c69 100644 --- a/modules/module-mssql/test/src/CDCStreamTestContext.ts +++ b/modules/module-mssql/test/src/CDCStreamTestContext.ts @@ -9,7 +9,7 @@ import { SyncRulesBucketStorage, updateSyncRulesFromYaml } from '@powersync/service-core'; -import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; +import { bucketRequest, METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; import { clearTestDb, getClientCheckpoint, TEST_CONNECTION_OPTIONS } from './util.js'; import { CDCStream, CDCStreamOptions } from '@module/replication/CDCStream.js'; import { MSSQLConnectionManager } from '@module/replication/MSSQLConnectionManager.js'; @@ -26,6 +26,7 @@ export class CDCStreamTestContext implements AsyncDisposable { private _cdcStream?: CDCStream; private abortController = new AbortController(); private streamPromise?: Promise; + private syncRulesContent?: storage.PersistedSyncRulesContent; public storage?: SyncRulesBucketStorage; private snapshotPromise?: Promise; private replicationDone = false; @@ -77,6 +78,7 @@ export class CDCStreamTestContext implements AsyncDisposable { const syncRules = await this.factory.updateSyncRules( updateSyncRulesFromYaml(content, { validate: true, storageVersion: LEGACY_STORAGE_VERSION }) ); + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } @@ -87,6 +89,7 @@ export class CDCStreamTestContext implements AsyncDisposable { throw new Error(`Next sync rules not available`); } + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } @@ -97,10 +100,18 @@ export class CDCStreamTestContext implements AsyncDisposable { throw new Error(`Active sync rules not available`); } + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } + private getSyncRulesContent(): storage.PersistedSyncRulesContent { + if (this.syncRulesContent == null) { + throw new Error('Sync rules not configured - call updateSyncRules() first'); + } + return this.syncRulesContent; + } + get cdcStream() { if (this.storage == null) { throw new Error('updateSyncRules() first'); @@ -169,18 +180,11 @@ export class CDCStreamTestContext implements AsyncDisposable { return checkpoint; } - private bucketDataRequest(bucket: string, start: InternalOpId): storage.BucketDataRequest { - return { - bucket, - start, - source: {} as any - }; - } - async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { let checkpoint = await this.getCheckpoint(options); - const requests = Object.entries(buckets).map(([bucket, start]) => this.bucketDataRequest(bucket, start)); - return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, requests)); + const syncRules = this.getSyncRulesContent(); + const map = Object.entries(buckets).map(([bucket, start]) => bucketRequest(syncRules, bucket, start)); + return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); } /** @@ -191,18 +195,19 @@ export class CDCStreamTestContext implements AsyncDisposable { if (typeof start == 'string') { start = BigInt(start); } + const syncRules = this.getSyncRulesContent(); const checkpoint = await this.getCheckpoint(options); - let request: storage.BucketDataRequest = this.bucketDataRequest(bucket, start); + let map = [bucketRequest(syncRules, bucket, start)]; let data: OplogEntry[] = []; while (true) { - const batch = this.storage!.getBucketDataBatch(checkpoint, [request]); + const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); data = data.concat(batches[0]?.chunkData.data ?? []); if (batches.length == 0 || !batches[0]!.chunkData.has_more) { break; } - request = this.bucketDataRequest(bucket, BigInt(batches[0]!.chunkData.next_after)); + map = [bucketRequest(syncRules, bucket, BigInt(batches[0]!.chunkData.next_after))]; } return data; } @@ -216,7 +221,9 @@ export class CDCStreamTestContext implements AsyncDisposable { start = BigInt(start); } const { checkpoint } = await this.storage!.getCheckpoint(); - const batch = this.storage!.getBucketDataBatch(checkpoint, [this.bucketDataRequest(bucket, start)]); + const syncRules = this.getSyncRulesContent(); + const map = [bucketRequest(syncRules, bucket, start)]; + const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); return batches[0]?.chunkData.data ?? []; } diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index e0dc18f2e..cd8b7bcb6 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -15,7 +15,7 @@ import { SyncRulesBucketStorage, updateSyncRulesFromYaml } from '@powersync/service-core'; -import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; +import { bucketRequest, METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; import mysqlPromise from 'mysql2/promise'; import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; import timers from 'timers/promises'; @@ -33,6 +33,7 @@ export class BinlogStreamTestContext { private streamPromise?: Promise; public storage?: SyncRulesBucketStorage; private replicationDone = false; + private syncRulesContent?: storage.PersistedSyncRulesContent; static async open(factory: storage.TestStorageFactory, options?: { doNotClear?: boolean }) { const f = await factory({ doNotClear: options?.doNotClear }); @@ -73,6 +74,7 @@ export class BinlogStreamTestContext { const syncRules = await this.factory.updateSyncRules( updateSyncRulesFromYaml(content, { validate: true, storageVersion: LEGACY_STORAGE_VERSION }) ); + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } @@ -83,6 +85,7 @@ export class BinlogStreamTestContext { throw new Error(`Next sync rules not available`); } + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } @@ -93,11 +96,19 @@ export class BinlogStreamTestContext { throw new Error(`Active sync rules not available`); } + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); this.replicationDone = true; return this.storage!; } + private getSyncRulesContent(): storage.PersistedSyncRulesContent { + if (this.syncRulesContent == null) { + throw new Error('Sync rules not configured - call updateSyncRules() first'); + } + return this.syncRulesContent; + } + get binlogStream(): BinLogStream { if (this.storage == null) { throw new Error('updateSyncRules() first'); @@ -152,18 +163,11 @@ export class BinlogStreamTestContext { return checkpoint; } - private bucketDataRequest(bucket: string, start: InternalOpId): storage.BucketDataRequest { - return { - bucket, - start, - source: {} as any - }; - } - async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { const checkpoint = await this.getCheckpoint(options); - const requests = Object.entries(buckets).map(([bucket, start]) => this.bucketDataRequest(bucket, start)); - return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, requests)); + const syncRules = this.getSyncRulesContent(); + const map = Object.entries(buckets).map(([bucket, start]) => bucketRequest(syncRules, bucket, start)); + return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); } async getBucketData( @@ -175,8 +179,10 @@ export class BinlogStreamTestContext { if (typeof start == 'string') { start = BigInt(start); } + const syncRules = this.getSyncRulesContent(); const checkpoint = await this.getCheckpoint(options); - const batch = this.storage!.getBucketDataBatch(checkpoint, [this.bucketDataRequest(bucket, start)]); + const map = [bucketRequest(syncRules, bucket, start)]; + const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); return batches[0]?.chunkData.data ?? []; } diff --git a/modules/module-postgres-storage/test/src/storage_compacting.test.ts b/modules/module-postgres-storage/test/src/storage_compacting.test.ts index 6a6bf8143..1f12ff1d0 100644 --- a/modules/module-postgres-storage/test/src/storage_compacting.test.ts +++ b/modules/module-postgres-storage/test/src/storage_compacting.test.ts @@ -41,7 +41,7 @@ bucket_definitions: // Compact with an explicit bucket name — exercises the this.buckets // iteration path, NOT the compactAllBuckets discovery path. await bucketStorage.compact({ - compactBuckets: [bucketRequest(syncRules, 'global[]')], + compactBuckets: [bucketRequest(syncRules, 'global[]').bucket], minBucketChanges: 1 }); diff --git a/modules/module-postgres-storage/test/src/storage_sync.test.ts b/modules/module-postgres-storage/test/src/storage_sync.test.ts index 2de50900c..2596cdabe 100644 --- a/modules/module-postgres-storage/test/src/storage_sync.test.ts +++ b/modules/module-postgres-storage/test/src/storage_sync.test.ts @@ -87,13 +87,8 @@ function registerStorageVersionTests(storageVersion: number) { const checkpoint = result!.flushed_op; const options: storage.BucketDataBatchOptions = {}; - const batchRequest = (start: bigint): storage.BucketDataRequest[] => [ - { bucket: globalBucket, start, source: {} as any } - ]; - const batch1 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, batchRequest(0n), options) - ); + const batch1 = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, [globalBucket], options)); expect(test_utils.getBatchData(batch1)).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } ]); @@ -104,7 +99,11 @@ function registerStorageVersionTests(storageVersion: number) { }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch1[0].chunkData.next_after)), options) + bucketStorage.getBucketDataBatch( + checkpoint, + [{ ...globalBucket, start: BigInt(batch1[0].chunkData.next_after) }], + options + ) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '2', op: 'PUT', object_id: 'large1', checksum: 1178768505 } @@ -116,7 +115,11 @@ function registerStorageVersionTests(storageVersion: number) { }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch2[0].chunkData.next_after)), options) + bucketStorage.getBucketDataBatch( + checkpoint, + [{ ...globalBucket, start: BigInt(batch2[0].chunkData.next_after) }], + options + ) ); expect(test_utils.getBatchData(batch3)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 } @@ -128,7 +131,11 @@ function registerStorageVersionTests(storageVersion: number) { }); const batch4 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, batchRequest(BigInt(batch3[0].chunkData.next_after)), options) + bucketStorage.getBucketDataBatch( + checkpoint, + [{ ...globalBucket, start: BigInt(batch3[0].chunkData.next_after) }], + options + ) ); expect(test_utils.getBatchData(batch4)).toEqual([ { op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 } diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index e250f0c32..94f1d3f35 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -15,7 +15,7 @@ import { unsettledPromise, updateSyncRulesFromYaml } from '@powersync/service-core'; -import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; +import { bucketRequest, METRICS_HELPER, test_utils } from '@powersync/service-core-tests'; import * as pgwire from '@powersync/service-jpgwire'; import { clearTestDb, getClientCheckpoint, TEST_CONNECTION_OPTIONS } from './util.js'; @@ -23,6 +23,7 @@ export class WalStreamTestContext implements AsyncDisposable { private _walStream?: WalStream; private abortController = new AbortController(); private syncRulesId?: number; + private syncRulesContent?: storage.PersistedSyncRulesContent; public storage?: SyncRulesBucketStorage; private settledReplicationPromise?: Promise>; @@ -97,6 +98,7 @@ export class WalStreamTestContext implements AsyncDisposable { updateSyncRulesFromYaml(content, { validate: true, storageVersion: this.storageVersion }) ); this.syncRulesId = syncRules.id; + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } @@ -108,6 +110,7 @@ export class WalStreamTestContext implements AsyncDisposable { } this.syncRulesId = syncRules.id; + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } @@ -119,10 +122,18 @@ export class WalStreamTestContext implements AsyncDisposable { } this.syncRulesId = syncRules.id; + this.syncRulesContent = syncRules; this.storage = this.factory.getInstance(syncRules); return this.storage!; } + private getSyncRulesContent(): storage.PersistedSyncRulesContent { + if (this.syncRulesContent == null) { + throw new Error('Sync rules not configured - call updateSyncRules() first'); + } + return this.syncRulesContent; + } + get walStream() { if (this.storage == null) { throw new Error('updateSyncRules() first'); @@ -182,35 +193,11 @@ export class WalStreamTestContext implements AsyncDisposable { return checkpoint; } - private resolveBucketName(bucket: string) { - if (!this.versionedBuckets || /^\d+#/.test(bucket)) { - return bucket; - } - if (this.syncRulesId == null) { - throw new Error('Sync rules not configured - call updateSyncRules() first'); - } - return `${this.syncRulesId}#${bucket}`; - } - - private bucketDataRequest(bucket: string, start: InternalOpId): storage.BucketDataRequest { - return { - bucket: this.resolveBucketName(bucket), - start, - source: {} as any - }; - } - - private bucketChecksumRequest(bucket: string): storage.BucketChecksumRequest { - return { - bucket: this.resolveBucketName(bucket), - source: {} as any - }; - } - async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { let checkpoint = await this.getCheckpoint(options); - const requests = Object.entries(buckets).map(([bucket, opId]) => this.bucketDataRequest(bucket, opId)); - return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, requests)); + const syncRules = this.getSyncRulesContent(); + const map = Object.entries(buckets).map(([bucket, start]) => bucketRequest(syncRules, bucket, start)); + return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); } /** @@ -221,26 +208,27 @@ export class WalStreamTestContext implements AsyncDisposable { if (typeof start == 'string') { start = BigInt(start); } + const syncRules = this.getSyncRulesContent(); const checkpoint = await this.getCheckpoint(options); - const resolvedBucket = this.resolveBucketName(bucket); - let request: storage.BucketDataRequest = { bucket: resolvedBucket, start, source: {} as any }; + let map = [bucketRequest(syncRules, bucket, start)]; let data: OplogEntry[] = []; while (true) { - const batch = this.storage!.getBucketDataBatch(checkpoint, [request]); + const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); data = data.concat(batches[0]?.chunkData.data ?? []); if (batches.length == 0 || !batches[0]!.chunkData.has_more) { break; } - request = { ...request, start: BigInt(batches[0]!.chunkData.next_after) }; + map = [bucketRequest(syncRules, bucket, BigInt(batches[0]!.chunkData.next_after))]; } return data; } async getChecksums(buckets: string[], options?: { timeout?: number }) { const checkpoint = await this.getCheckpoint(options); - const versionedBuckets = buckets.map((bucket) => this.bucketChecksumRequest(bucket)); + const syncRules = this.getSyncRulesContent(); + const versionedBuckets = buckets.map((bucket) => bucketRequest(syncRules, bucket, 0n)); const checksums = await this.storage!.getChecksums(checkpoint, versionedBuckets); const unversioned = new Map(); @@ -264,8 +252,10 @@ export class WalStreamTestContext implements AsyncDisposable { if (typeof start == 'string') { start = BigInt(start); } + const syncRules = this.getSyncRulesContent(); const { checkpoint } = await this.storage!.getCheckpoint(); - const batch = this.storage!.getBucketDataBatch(checkpoint, [this.bucketDataRequest(bucket, start)]); + const map = [bucketRequest(syncRules, bucket, start)]; + const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); return batches[0]?.chunkData.data ?? []; } diff --git a/packages/service-core-tests/src/test-utils/general-utils.ts b/packages/service-core-tests/src/test-utils/general-utils.ts index 9afaf6cc5..755c9e476 100644 --- a/packages/service-core-tests/src/test-utils/general-utils.ts +++ b/packages/service-core-tests/src/test-utils/general-utils.ts @@ -1,4 +1,4 @@ -import { storage, utils } from '@powersync/service-core'; +import { BucketDataRequest, InternalOpId, storage, utils } from '@powersync/service-core'; import { GetQuerierOptions, RequestParameters } from '@powersync/service-sync-rules'; import * as bson from 'bson'; @@ -50,6 +50,39 @@ export function getBatchData( }); } +function isParsedSyncRules( + syncRules: storage.PersistedSyncRulesContent | storage.PersistedSyncRules +): syncRules is storage.PersistedSyncRules { + return (syncRules as storage.PersistedSyncRules).sync_rules !== undefined; +} + +/** + * Bucket names no longer purely depend on the sync rules. + * This converts a bucket name like "global[]" into the actual bucket name, for use in tests. + */ +export function bucketRequest( + syncRules: storage.PersistedSyncRulesContent | storage.PersistedSyncRules, + bucket: string, + start?: InternalOpId | string | number +): BucketDataRequest { + const parsed = isParsedSyncRules(syncRules) ? syncRules : syncRules.parsed(PARSE_OPTIONS); + const hydrationState = parsed.hydrationState; + const parameterStart = bucket.indexOf('['); + const definitionName = bucket.substring(0, parameterStart); + const parameters = bucket.substring(parameterStart); + const source = parsed.sync_rules.config.bucketDataSources.find((b) => b.uniqueName === definitionName); + + if (source == null) { + throw new Error(`Failed to find global bucket ${bucket}`); + } + const bucketName = hydrationState.getBucketSourceScope(source).bucketPrefix + parameters; + return { + bucket: bucketName, + start: BigInt(start ?? 0n), + source: source + }; +} + export function getBatchMeta( batch: utils.SyncBucketData[] | storage.SyncBucketDataChunk[] | storage.SyncBucketDataChunk ) { diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index 2566dcf01..5b775426b 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -1,7 +1,8 @@ import { addChecksums, storage, updateSyncRulesFromYaml } from '@powersync/service-core'; import { expect, test } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; -import { bucketRequest, bucketRequestMap, bucketRequests } from './util.js'; +import { bucketRequest } from '../test-utils/test-utils-index.js'; +import { bucketRequestMap, bucketRequests } from './util.js'; export function registerCompactTests(config: storage.TestStorageConfig) { const generateStorageFactory = config.factory; @@ -52,11 +53,11 @@ bucket_definitions: const checkpoint = result!.flushed_op; - const batchBefore = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, bucketRequestMap(syncRules, [['global[]', 0n]])) - ); + const request = bucketRequest(syncRules, 'global[]'); + + const batchBefore = await test_utils.oneFromAsync(bucketStorage.getBucketDataBatch(checkpoint, [request])); const dataBefore = batchBefore.chunkData.data; - const checksumBefore = await bucketStorage.getChecksums(checkpoint, bucketRequests(syncRules, ['global[]'])); + const checksumBefore = await bucketStorage.getChecksums(checkpoint, [request]); expect(dataBefore).toMatchObject([ { @@ -85,13 +86,11 @@ bucket_definitions: minChangeRatio: 0 }); - const batchAfter = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, bucketRequestMap(syncRules, [['global[]', 0n]])) - ); + const batchAfter = await test_utils.oneFromAsync(bucketStorage.getBucketDataBatch(checkpoint, [request])); const dataAfter = batchAfter.chunkData.data; - const checksumAfter = await bucketStorage.getChecksums(checkpoint, bucketRequests(syncRules, ['global[]'])); + const checksumAfter = await bucketStorage.getChecksums(checkpoint, [request]); bucketStorage.clearChecksumCache(); - const checksumAfter2 = await bucketStorage.getChecksums(checkpoint, bucketRequests(syncRules, ['global[]'])); + const checksumAfter2 = await bucketStorage.getChecksums(checkpoint, [request]); expect(batchAfter.targetOp).toEqual(3n); expect(dataAfter).toMatchObject([ @@ -109,12 +108,8 @@ bucket_definitions: } ]); - expect(checksumAfter.get(bucketRequest(syncRules, 'global[]'))).toEqual( - checksumBefore.get(bucketRequest(syncRules, 'global[]')) - ); - expect(checksumAfter2.get(bucketRequest(syncRules, 'global[]'))).toEqual( - checksumBefore.get(bucketRequest(syncRules, 'global[]')) - ); + expect(checksumAfter.get(request.bucket)).toEqual(checksumBefore.get(request.bucket)); + expect(checksumAfter2.get(request.bucket)).toEqual(checksumBefore.get(request.bucket)); test_utils.validateCompactedBucket(dataBefore, dataAfter); }); @@ -172,12 +167,11 @@ bucket_definitions: }); const checkpoint = result!.flushed_op; + const request = bucketRequest(syncRules, 'global[]'); - const batchBefore = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, bucketRequestMap(syncRules, [['global[]', 0n]])) - ); + const batchBefore = await test_utils.oneFromAsync(bucketStorage.getBucketDataBatch(checkpoint, [request])); const dataBefore = batchBefore.chunkData.data; - const checksumBefore = await bucketStorage.getChecksums(checkpoint, bucketRequests(syncRules, ['global[]'])); + const checksumBefore = await bucketStorage.getChecksums(checkpoint, [request]); // op_id sequence depends on the storage implementation expect(dataBefore).toMatchObject([ @@ -207,12 +201,10 @@ bucket_definitions: minChangeRatio: 0 }); - const batchAfter = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, bucketRequestMap(syncRules, [['global[]', 0n]])) - ); + const batchAfter = await test_utils.oneFromAsync(bucketStorage.getBucketDataBatch(checkpoint, [request])); const dataAfter = batchAfter.chunkData.data; bucketStorage.clearChecksumCache(); - const checksumAfter = await bucketStorage.getChecksums(checkpoint, bucketRequests(syncRules, ['global[]'])); + const checksumAfter = await bucketStorage.getChecksums(checkpoint, [request]); expect(batchAfter.targetOp).toBeLessThanOrEqual(checkpoint); expect(dataAfter).toMatchObject([ @@ -229,8 +221,8 @@ bucket_definitions: op: 'PUT' } ]); - expect(checksumAfter.get(bucketRequest(syncRules, 'global[]'))).toEqual({ - ...checksumBefore.get(bucketRequest(syncRules, 'global[]')), + expect(checksumAfter.get(request.bucket)).toEqual({ + ...checksumBefore.get(request.bucket), count: 2 }); @@ -281,7 +273,8 @@ bucket_definitions: }); const checkpoint1 = result!.flushed_op; - const checksumBefore = await bucketStorage.getChecksums(checkpoint1, bucketRequests(syncRules, ['global[]'])); + const request = bucketRequest(syncRules, 'global[]'); + const checksumBefore = await bucketStorage.getChecksums(checkpoint1, [request]); const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -304,20 +297,18 @@ bucket_definitions: minChangeRatio: 0 }); - const batchAfter = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint2, bucketRequestMap(syncRules, [['global[]', 0n]])) - ); + const batchAfter = await test_utils.oneFromAsync(bucketStorage.getBucketDataBatch(checkpoint2, [request])); const dataAfter = batchAfter.chunkData.data; await bucketStorage.clearChecksumCache(); - const checksumAfter = await bucketStorage.getChecksums(checkpoint2, bucketRequests(syncRules, ['global[]'])); + const checksumAfter = await bucketStorage.getChecksums(checkpoint2, [request]); expect(dataAfter).toMatchObject([ { op: 'CLEAR' } ]); - expect(checksumAfter.get(bucketRequest(syncRules, 'global[]'))).toEqual({ - bucket: bucketRequest(syncRules, 'global[]'), + expect(checksumAfter.get(request.bucket)).toEqual({ + bucket: request.bucket, count: 1, checksum: dataAfter[0].checksum }); @@ -517,11 +508,12 @@ bucket_definitions: await batch.commit('2/1'); }); const checkpoint2 = result2!.flushed_op; + const request = bucketRequest(syncRules, 'global[]'); await bucketStorage.clearChecksumCache(); - const checksumAfter = await bucketStorage.getChecksums(checkpoint2, bucketRequests(syncRules, ['global[]'])); - const globalChecksum = checksumAfter.get(bucketRequest(syncRules, 'global[]')); + const checksumAfter = await bucketStorage.getChecksums(checkpoint2, [request]); + const globalChecksum = checksumAfter.get(request.bucket); expect(globalChecksum).toMatchObject({ - bucket: bucketRequest(syncRules, 'global[]'), + bucket: request.bucket, count: 4 }); @@ -586,11 +578,12 @@ bucket_definitions: }); const checkpoint2 = result2!.flushed_op; + const request = bucketRequest(syncRules, 'global[]'); // Check that the checksum was correctly updated with the clear operation after having a cached checksum - const checksumAfter = await bucketStorage.getChecksums(checkpoint2, bucketRequests(syncRules, ['global[]'])); - const globalChecksum = checksumAfter.get(bucketRequest(syncRules, 'global[]')); + const checksumAfter = await bucketStorage.getChecksums(checkpoint2, [request]); + const globalChecksum = checksumAfter.get(request.bucket); expect(globalChecksum).toMatchObject({ - bucket: bucketRequest(syncRules, 'global[]'), + bucket: request.bucket, count: 1 }); // storage-specific checksum - just check that it does not change diff --git a/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts index 969ef5d9e..489209752 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts @@ -9,7 +9,8 @@ import { } from '@powersync/service-core'; import { describe, expect, test } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; -import { bucketRequest, bucketRequestMap, bucketRequests } from './util.js'; +import { bucketRequest } from '../test-utils/test-utils-index.js'; +import { bucketRequestMap, bucketRequests } from './util.js'; /** * Normalize data from OplogEntries for comparison in tests. @@ -100,7 +101,7 @@ bucket_definitions: ]; expect(checksums).toEqual([ { - bucket: bucketRequest(syncRules, 'global[]'), + bucket: bucketRequest(syncRules, 'global[]').bucket, checksum: (c1 + c2) & 0xffffffff, count: 2 } @@ -172,7 +173,7 @@ bucket_definitions: ]; expect(checksums).toEqual([ { - bucket: bucketRequest(syncRules, 'global[]'), + bucket: bucketRequest(syncRules, 'global[]').bucket, checksum: c1 & 0xffffffff, count: 1 } @@ -249,7 +250,7 @@ bucket_definitions: ]; expect(checksums).toEqual([ { - bucket: bucketRequest(syncRules, 'global[]'), + bucket: bucketRequest(syncRules, 'global[]').bucket, checksum: c1 & 0xffffffff, count: 1 } @@ -316,7 +317,7 @@ bucket_definitions: ]; expect(checksums).toEqual([ { - bucket: bucketRequest(syncRules, 'global[]'), + bucket: bucketRequest(syncRules, 'global[]').bucket, checksum: c1 & 0xffffffff, count: 1 } @@ -554,7 +555,7 @@ bucket_definitions: ]; expect(checksums).toEqual([ { - bucket: bucketRequest(syncRules, 'global[]'), + bucket: bucketRequest(syncRules, 'global[]').bucket, checksum: (c1 + c2) & 0xffffffff, count: 2 } @@ -686,7 +687,7 @@ bucket_definitions: ]; expect(checksums).toEqual([ { - bucket: bucketRequest(syncRules, 'global[]'), + bucket: bucketRequest(syncRules, 'global[]').bucket, checksum: (c1 + c1 + c1 + c2) & 0xffffffff, count: 4 } @@ -1343,8 +1344,8 @@ bucket_definitions: const { batch, syncRules } = await setup({ limit: 5 }); expect(batch.length).toEqual(2); - expect(batch[0].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global1[]')); - expect(batch[1].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global2[]')); + expect(batch[0].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global1[]').bucket); + expect(batch[1].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global2[]').bucket); expect(test_utils.getBatchData(batch[0])).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } @@ -1374,8 +1375,8 @@ bucket_definitions: const { batch, syncRules } = await setup({ limit: 11 }); expect(batch.length).toEqual(2); - expect(batch[0].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global1[]')); - expect(batch[1].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global2[]')); + expect(batch[0].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global1[]').bucket); + expect(batch[1].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global2[]').bucket); expect(test_utils.getBatchData(batch[0])).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } @@ -1411,9 +1412,9 @@ bucket_definitions: const { batch, syncRules } = await setup({ limit: 3, chunkLimitBytes: 50 }); expect(batch.length).toEqual(3); - expect(batch[0].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global1[]')); - expect(batch[1].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global2[]')); - expect(batch[2].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global2[]')); + expect(batch[0].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global1[]').bucket); + expect(batch[1].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global2[]').bucket); + expect(batch[2].chunkData.bucket).toEqual(bucketRequest(syncRules, 'global2[]').bucket); expect(test_utils.getBatchData(batch[0])).toEqual([ { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } @@ -1558,11 +1559,15 @@ bucket_definitions: const checksums = [ ...(await bucketStorage.getChecksums(checkpoint, bucketRequests(syncRules, ['global[]']))).values() ]; - expect(checksums).toEqual([{ bucket: bucketRequest(syncRules, 'global[]'), checksum: 1917136889, count: 1 }]); + expect(checksums).toEqual([ + { bucket: bucketRequest(syncRules, 'global[]').bucket, checksum: 1917136889, count: 1 } + ]); const checksums2 = [ ...(await bucketStorage.getChecksums(checkpoint + 1n, bucketRequests(syncRules, ['global[]']))).values() ]; - expect(checksums2).toEqual([{ bucket: bucketRequest(syncRules, 'global[]'), checksum: 1917136889, count: 1 }]); + expect(checksums2).toEqual([ + { bucket: bucketRequest(syncRules, 'global[]').bucket, checksum: 1917136889, count: 1 } + ]); }); testChecksumBatching(config); @@ -1828,10 +1833,10 @@ bucket_definitions: const checksums = [...(await bucketStorage.getChecksums(checkpoint, buckets)).values()]; checksums.sort((a, b) => a.bucket.localeCompare(b.bucket)); expect(checksums).toEqual([ - { bucket: bucketRequest(syncRules, 'user["u1"]'), count: 4, checksum: 346204588 }, - { bucket: bucketRequest(syncRules, 'user["u2"]'), count: 4, checksum: 5261081 }, - { bucket: bucketRequest(syncRules, 'user["u3"]'), count: 4, checksum: 134760718 }, - { bucket: bucketRequest(syncRules, 'user["u4"]'), count: 4, checksum: -302639724 } + { bucket: bucketRequest(syncRules, 'user["u1"]').bucket, count: 4, checksum: 346204588 }, + { bucket: bucketRequest(syncRules, 'user["u2"]').bucket, count: 4, checksum: 5261081 }, + { bucket: bucketRequest(syncRules, 'user["u3"]').bucket, count: 4, checksum: 134760718 }, + { bucket: bucketRequest(syncRules, 'user["u4"]').bucket, count: 4, checksum: -302639724 } ]); }); } diff --git a/packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts index 3a3d7d1d6..7f4011f35 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts @@ -2,7 +2,8 @@ import { CURRENT_STORAGE_VERSION, JwtPayload, storage, updateSyncRulesFromYaml } import { RequestParameters, ScopedParameterLookup, SqliteJsonRow } from '@powersync/service-sync-rules'; import { expect, test } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; -import { bucketRequest, parameterLookupScope } from './util.js'; +import { bucketRequest } from '../test-utils/test-utils-index.js'; +import { parameterLookupScope } from './util.js'; /** * @example @@ -383,7 +384,7 @@ bucket_definitions: }); expect(buckets).toEqual([ { - bucket: bucketRequest(syncRules, 'by_workspace["workspace1"]'), + bucket: bucketRequest(syncRules, 'by_workspace["workspace1"]').bucket, priority: 3, definition: 'by_workspace', inclusion_reasons: ['default'] @@ -467,13 +468,13 @@ bucket_definitions: buckets.sort((a, b) => a.bucket.localeCompare(b.bucket)); expect(buckets).toEqual([ { - bucket: bucketRequest(syncRules, 'by_public_workspace["workspace1"]'), + bucket: bucketRequest(syncRules, 'by_public_workspace["workspace1"]').bucket, priority: 3, definition: 'by_public_workspace', inclusion_reasons: ['default'] }, { - bucket: bucketRequest(syncRules, 'by_public_workspace["workspace3"]'), + bucket: bucketRequest(syncRules, 'by_public_workspace["workspace3"]').bucket, priority: 3, definition: 'by_public_workspace', inclusion_reasons: ['default'] @@ -581,8 +582,8 @@ bucket_definitions: buckets.sort(); expect(buckets).toEqual([ - bucketRequest(syncRules, 'by_workspace["workspace1"]'), - bucketRequest(syncRules, 'by_workspace["workspace3"]') + bucketRequest(syncRules, 'by_workspace["workspace1"]').bucket, + bucketRequest(syncRules, 'by_workspace["workspace3"]').bucket ]); }); diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index efacea5c2..b71a422c5 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -14,8 +14,7 @@ import * as timers from 'timers/promises'; import { fileURLToPath } from 'url'; import { expect, test } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; -import { METRICS_HELPER } from '../test-utils/test-utils-index.js'; -import { bucketRequest } from './util.js'; +import { bucketRequest, METRICS_HELPER } from '../test-utils/test-utils-index.js'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -836,10 +835,11 @@ bucket_definitions: await batch.commit('0/1'); }); + const { bucket } = bucketRequest(syncRules, 'by_user["user1"]'); const checkpoint2 = await getCheckpointLines(iter); expect( (checkpoint2[0] as StreamingSyncCheckpointDiff).checkpoint_diff?.updated_buckets?.map((b) => b.bucket) - ).toEqual([bucketRequest(syncRules, 'by_user["user1"]')]); + ).toEqual([bucket]); expect(checkpoint2).toMatchSnapshot(); }); @@ -893,10 +893,9 @@ bucket_definitions: iter.return?.(); }); + const { bucket } = bucketRequest(syncRules, 'by_user["user1"]'); const checkpoint1 = await getCheckpointLines(iter); - expect((checkpoint1[0] as StreamingSyncCheckpoint).checkpoint?.buckets?.map((b) => b.bucket)).toEqual([ - bucketRequest(syncRules, 'by_user["user1"]') - ]); + expect((checkpoint1[0] as StreamingSyncCheckpoint).checkpoint?.buckets?.map((b) => b.bucket)).toEqual([bucket]); expect(checkpoint1).toMatchSnapshot(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { @@ -917,7 +916,7 @@ bucket_definitions: const checkpoint2 = await getCheckpointLines(iter); expect( (checkpoint2[0] as StreamingSyncCheckpointDiff).checkpoint_diff?.updated_buckets?.map((b) => b.bucket) - ).toEqual([bucketRequest(syncRules, 'by_user["user1"]')]); + ).toEqual([bucket]); expect(checkpoint2).toMatchSnapshot(); }); @@ -990,10 +989,11 @@ bucket_definitions: await batch.commit('0/1'); }); + const { bucket } = bucketRequest(syncRules, 'by_user["user1"]'); const checkpoint2 = await getCheckpointLines(iter); expect( (checkpoint2[0] as StreamingSyncCheckpointDiff).checkpoint_diff?.updated_buckets?.map((b) => b.bucket) - ).toEqual([bucketRequest(syncRules, 'by_user["user1"]')]); + ).toEqual([bucket]); expect(checkpoint2).toMatchSnapshot(); }); diff --git a/packages/service-core-tests/src/tests/util.ts b/packages/service-core-tests/src/tests/util.ts index f6dd0163a..8819473fb 100644 --- a/packages/service-core-tests/src/tests/util.ts +++ b/packages/service-core-tests/src/tests/util.ts @@ -1,53 +1,30 @@ import { storage } from '@powersync/service-core'; import { - BucketDataSource, ParameterIndexLookupCreator, SourceTableInterface, SqliteRow, TablePattern } from '@powersync/service-sync-rules'; import { ParameterLookupScope } from '@powersync/service-sync-rules/src/HydrationState.js'; - -export function bucketRequest(syncRules: storage.PersistedSyncRulesContent, bucketName: string): string { - if (/^\d+#/.test(bucketName)) { - return bucketName; - } - - const versionedBuckets = storage.STORAGE_VERSION_CONFIG[syncRules.storageVersion]?.versionedBuckets ?? false; - return versionedBuckets ? `${syncRules.id}#${bucketName}` : bucketName; -} +import { bucketRequest } from '../test-utils/general-utils.js'; export function bucketRequestMap( syncRules: storage.PersistedSyncRulesContent, buckets: Iterable ): storage.BucketDataRequest[] { - return Array.from(buckets, ([bucketName, opId]) => ({ - bucket: bucketRequest(syncRules, bucketName), - start: opId, - source: EMPTY_DATA_SOURCE - })); + return Array.from(buckets, ([bucketName, opId]) => bucketRequest(syncRules, bucketName, opId)); } export function bucketRequests( syncRules: storage.PersistedSyncRulesContent, bucketNames: string[] ): storage.BucketChecksumRequest[] { - return bucketNames.map((bucketName) => ({ - bucket: bucketRequest(syncRules, bucketName), - source: EMPTY_DATA_SOURCE - })); + return bucketNames.map((bucketName) => { + const request = bucketRequest(syncRules, bucketName, 0n); + return { bucket: request.bucket, source: request.source }; + }); } -const EMPTY_DATA_SOURCE: BucketDataSource = { - uniqueName: 'empty', - bucketParameters: [], - getSourceTables: () => new Set(), - tableSyncsData: () => false, - evaluateRow: () => [], - resolveResultSets: () => {}, - debugWriteOutputTables: () => {} -}; - const EMPTY_LOOKUP_SOURCE: ParameterIndexLookupCreator = { get defaultLookupScope(): ParameterLookupScope { return { diff --git a/packages/service-core/src/routes/endpoints/admin.ts b/packages/service-core/src/routes/endpoints/admin.ts index 59eca123a..713ae99ec 100644 --- a/packages/service-core/src/routes/endpoints/admin.ts +++ b/packages/service-core/src/routes/endpoints/admin.ts @@ -176,6 +176,7 @@ class FakeSyncRulesContentForValidation extends storage.PersistedSyncRulesConten ...this.apiHandler.getParseSyncRulesOptions(), schema: this.schema }), + hydrationState: DEFAULT_HYDRATION_STATE, hydratedSyncRules() { return this.sync_rules.config.hydrate({ hydrationState: DEFAULT_HYDRATION_STATE }); } diff --git a/packages/service-core/src/storage/PersistedSyncRulesContent.ts b/packages/service-core/src/storage/PersistedSyncRulesContent.ts index 382decaa2..c9a0a7019 100644 --- a/packages/service-core/src/storage/PersistedSyncRulesContent.ts +++ b/packages/service-core/src/storage/PersistedSyncRulesContent.ts @@ -120,6 +120,7 @@ export abstract class PersistedSyncRulesContent implements PersistedSyncRulesCon id: this.id, slot_name: this.slot_name, sync_rules: config, + hydrationState, hydratedSyncRules: () => { return config.config.hydrate({ hydrationState }); } @@ -140,6 +141,10 @@ export interface PersistedSyncRules { readonly id: number; readonly sync_rules: SyncConfigWithErrors; readonly slot_name: string; + /** + * For testing only. + */ + readonly hydrationState: HydrationState; hydratedSyncRules(): HydratedSyncRules; } From 533c235b9d1375326f3d276bca4f80ec4edc3168 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 4 Mar 2026 18:09:18 +0200 Subject: [PATCH 3/6] Changeset. --- .changeset/new-games-prove.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 .changeset/new-games-prove.md diff --git a/.changeset/new-games-prove.md b/.changeset/new-games-prove.md new file mode 100644 index 000000000..4c7d1f355 --- /dev/null +++ b/.changeset/new-games-prove.md @@ -0,0 +1,13 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-core': minor +'@powersync/service-module-mssql': minor +'@powersync/service-module-mysql': minor +'@powersync/service-sync-rules': minor +--- + +[Internal] Propagate source definitions to storage APIs. From 5c3b12827ea635eb890b9fe210ae8839e95a35a6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 5 Mar 2026 11:00:39 +0200 Subject: [PATCH 4/6] Fix tests. --- modules/module-postgres/test/src/schema_changes.test.ts | 2 +- modules/module-postgres/test/src/wal_stream.test.ts | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/module-postgres/test/src/schema_changes.test.ts b/modules/module-postgres/test/src/schema_changes.test.ts index 9a19f8967..781ec9f14 100644 --- a/modules/module-postgres/test/src/schema_changes.test.ts +++ b/modules/module-postgres/test/src/schema_changes.test.ts @@ -646,7 +646,7 @@ config: { statement: `UPDATE test_data SET other = ROW(TRUE, 2)::composite;` } ); - const data = await context.getBucketData('1#stream|0[]'); + const data = await context.getBucketData('stream|0[]'); expect(data).toMatchObject([ putOp('test_data', { id: 't1' }), putOp('test_data', { id: 't1', other: '{"foo":1,"bar":2}' }) diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index febb3aa95..5dd6aee65 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -502,7 +502,7 @@ config: await context.initializeReplication(); await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', '2025-09-10 15:17:14+02')`); - const data = await context.getBucketData('1#stream|0[]'); + const data = await context.getBucketData('stream|0[]'); expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '2025-09-10T13:17:14.000000Z' })]); }); @@ -534,7 +534,7 @@ config: `INSERT INTO test_data(id, description, ts) VALUES ('t2', ROW(TRUE, 2)::composite, '2025-11-17T09:12:00Z')` ); - const data = await context.getBucketData('1#stream|0[]'); + const data = await context.getBucketData('stream|0[]'); expect(data).toMatchObject([ putOp('test_data', { id: 't1', description: '{"foo":1,"bar":1}', ts: '2025-11-17T09:11:00.000000Z' }), putOp('test_data', { id: 't2', description: '{"foo":1,"bar":2}', ts: '2025-11-17T09:12:00.000000Z' }) @@ -561,7 +561,7 @@ config: await context.initializeReplication(); await pool.query(`INSERT INTO test_data(id) VALUES ('t1')`); - const data = await context.getBucketData('1#stream|0[]'); + const data = await context.getBucketData('stream|0[]'); expect(data).toMatchObject([putOp('test_data', { id: 't1' })]); }); From 31d572ac6307f4f3c2d08000fa483ffeea1e2f89 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 5 Mar 2026 12:18:18 +0200 Subject: [PATCH 5/6] Merge changesets. --- .changeset/new-games-prove.md | 2 +- .changeset/smart-jokes-walk.md | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) delete mode 100644 .changeset/smart-jokes-walk.md diff --git a/.changeset/new-games-prove.md b/.changeset/new-games-prove.md index 4c7d1f355..c203adc59 100644 --- a/.changeset/new-games-prove.md +++ b/.changeset/new-games-prove.md @@ -10,4 +10,4 @@ '@powersync/service-sync-rules': minor --- -[Internal] Propagate source definitions to storage APIs. +[Internal] Track and propagate source on buckets and parameter indexes to storage APIs. diff --git a/.changeset/smart-jokes-walk.md b/.changeset/smart-jokes-walk.md deleted file mode 100644 index 07bea5169..000000000 --- a/.changeset/smart-jokes-walk.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -'@powersync/service-core-tests': minor -'@powersync/service-core': minor -'@powersync/service-sync-rules': minor ---- - -[Internal] Track source on buckets and parameter indexes. From 914a9f5dfe099df674a2e803dcef063de53c970d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 5 Mar 2026 12:20:47 +0200 Subject: [PATCH 6/6] Fix test. --- .../test/src/storage_compacting.test.ts | 4 ++-- .../test/src/storage_compacting.test.ts | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index a4428ce3c..9f797a5d3 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -1,7 +1,7 @@ -import { bucketRequest, bucketRequests, register, test_utils } from '@powersync/service-core-tests'; +import { storage, SyncRulesBucketStorage, updateSyncRulesFromYaml } from '@powersync/service-core'; +import { bucketRequest, register, test_utils } from '@powersync/service-core-tests'; import { describe, expect, test } from 'vitest'; import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; -import { storage, SyncRulesBucketStorage, updateSyncRulesFromYaml } from '@powersync/service-core'; describe('Mongo Sync Bucket Storage Compact', () => { register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY); diff --git a/modules/module-postgres-storage/test/src/storage_compacting.test.ts b/modules/module-postgres-storage/test/src/storage_compacting.test.ts index 1f12ff1d0..78641e758 100644 --- a/modules/module-postgres-storage/test/src/storage_compacting.test.ts +++ b/modules/module-postgres-storage/test/src/storage_compacting.test.ts @@ -67,7 +67,7 @@ bucket_definitions: `) ); const bucketStorage = factory.getInstance(syncRules); - const bucket = bucketRequest(syncRules, 'global[]'); + const request = bucketRequest(syncRules, 'global[]'); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.markAllSnapshotDone('1/1'); @@ -99,20 +99,18 @@ bucket_definitions: }); const checkpoint = result!.flushed_op; - const rowsBefore = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, bucketRequestMap(syncRules, [['global[]', 0n]])) - ); + const rowsBefore = await test_utils.oneFromAsync(bucketStorage.getBucketDataBatch(checkpoint, [request])); const dataBefore = test_utils.getBatchData(rowsBefore); const clearToOpId = BigInt(dataBefore[2].op_id); const compactor = new PostgresCompactor(factory.db, bucketStorage.group_id, {}); // Trigger the private method directly - await expect(compactor.clearBucketForTests(bucket, clearToOpId)).rejects.toThrow(/Unexpected PUT operation/); + await expect(compactor.clearBucketForTests(request.bucket, clearToOpId)).rejects.toThrow( + /Unexpected PUT operation/ + ); // The method wraps in a transaction; on assertion error the bucket must remain unchanged. - const rowsAfter = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, bucketRequestMap(syncRules, [['global[]', 0n]])) - ); + const rowsAfter = await test_utils.oneFromAsync(bucketStorage.getBucketDataBatch(checkpoint, [request])); expect(test_utils.getBatchData(rowsAfter)).toEqual(dataBefore); }); });