Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/new-games-prove.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mssql': minor
'@powersync/service-module-mysql': minor
'@powersync/service-sync-rules': minor
---

[Internal] Track and propagate source on buckets and parameter indexes to storage APIs.
7 changes: 0 additions & 7 deletions .changeset/smart-jokes-walk.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as lib_mongo from '@powersync/lib-service-mongodb';
import {
addPartialChecksums,
bson,
BucketChecksumRequest,
BucketChecksum,
ChecksumCache,
ChecksumMap,
Expand Down Expand Up @@ -74,7 +75,7 @@ export class MongoChecksums {
* Calculate checksums, utilizing the cache for partial checkums, and querying the remainder from
* the database (bucket_state + bucket_data).
*/
async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise<ChecksumMap> {
async getChecksums(checkpoint: InternalOpId, buckets: BucketChecksumRequest[]): Promise<ChecksumMap> {
return this.cache.getChecksumMap(checkpoint, buckets);
}

Expand Down Expand Up @@ -298,6 +299,7 @@ export class MongoChecksums {
const req = requests.get(bucket);
requests.set(bucket, {
bucket,
source: req!.source,
start: doc.last_op,
end: req!.end
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ export class MongoCompactor {
buckets.map((bucket) => {
return {
bucket,
source: {} as any,
end: this.maxOpId
};
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,19 +372,20 @@ export class MongoSyncBucketStorage

async *getBucketDataBatch(
checkpoint: utils.InternalOpId,
dataBuckets: Map<string, InternalOpId>,
dataBuckets: storage.BucketDataRequest[],
options?: storage.BucketDataBatchOptions
): AsyncIterable<storage.SyncBucketDataChunk> {
if (dataBuckets.size == 0) {
if (dataBuckets.length == 0) {
return;
}
let filters: mongo.Filter<BucketDataDocument>[] = [];
const bucketMap = new Map(dataBuckets.map((request) => [request.bucket, request.start]));

if (checkpoint == null) {
throw new ServiceAssertionError('checkpoint is null');
}
const end = checkpoint;
for (let [name, start] of dataBuckets.entries()) {
for (let { bucket: name, start } of dataBuckets) {
filters.push({
_id: {
$gt: {
Expand Down Expand Up @@ -477,7 +478,7 @@ export class MongoSyncBucketStorage
}

if (start == null) {
const startOpId = dataBuckets.get(bucket);
const startOpId = bucketMap.get(bucket);
if (startOpId == null) {
throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`);
}
Expand Down Expand Up @@ -519,7 +520,10 @@ export class MongoSyncBucketStorage
}
}

async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise<utils.ChecksumMap> {
async getChecksums(
checkpoint: utils.InternalOpId,
buckets: storage.BucketChecksumRequest[]
): Promise<utils.ChecksumMap> {
return this.checksums.getChecksums(checkpoint, buckets);
}

Expand Down
38 changes: 19 additions & 19 deletions modules/module-mongodb-storage/test/src/storage_compacting.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { bucketRequest, bucketRequests, register, test_utils } from '@powersync/service-core-tests';
import { storage, SyncRulesBucketStorage, updateSyncRulesFromYaml } from '@powersync/service-core';
import { bucketRequest, register, test_utils } from '@powersync/service-core-tests';
import { describe, expect, test } from 'vitest';
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
import { storage, SyncRulesBucketStorage, updateSyncRulesFromYaml } from '@powersync/service-core';

describe('Mongo Sync Bucket Storage Compact', () => {
register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY);
Expand Down Expand Up @@ -71,17 +71,17 @@ bucket_definitions:
signal: null as any
});

const checksumAfter = await bucketStorage.getChecksums(
checkpoint,
bucketRequests(syncRules, ['by_user["u1"]', 'by_user["u2"]'])
);
expect(checksumAfter.get(bucketRequest(syncRules, 'by_user["u1"]'))).toEqual({
bucket: bucketRequest(syncRules, 'by_user["u1"]'),
const users = ['u1', 'u2'];
const userRequests = users.map((user) => bucketRequest(syncRules, `by_user["${user}"]`));
const [u1Request, u2Request] = userRequests;
const checksumAfter = await bucketStorage.getChecksums(checkpoint, userRequests);
expect(checksumAfter.get(u1Request.bucket)).toEqual({
bucket: u1Request.bucket,
checksum: -659469718,
count: 1
});
expect(checksumAfter.get(bucketRequest(syncRules, 'by_user["u2"]'))).toEqual({
bucket: bucketRequest(syncRules, 'by_user["u2"]'),
expect(checksumAfter.get(u2Request.bucket)).toEqual({
bucket: u2Request.bucket,
checksum: 430217650,
count: 1
});
Expand All @@ -91,7 +91,7 @@ bucket_definitions:
// Populate old sync rules version
const { factory } = await setup();

// Not populate another version (bucket definition name changed)
// Now populate another version (bucket definition name changed)
const syncRules = await factory.updateSyncRules(
updateSyncRulesFromYaml(`
bucket_definitions:
Expand Down Expand Up @@ -125,17 +125,17 @@ bucket_definitions:
});
expect(result2.buckets).toEqual(0);

const checksumAfter = await bucketStorage.getChecksums(
checkpoint,
bucketRequests(syncRules, ['by_user2["u1"]', 'by_user2["u2"]'])
);
expect(checksumAfter.get(bucketRequest(syncRules, 'by_user2["u1"]'))).toEqual({
bucket: bucketRequest(syncRules, 'by_user2["u1"]'),
const users = ['u1', 'u2'];
const userRequests = users.map((user) => bucketRequest(syncRules, `by_user2["${user}"]`));
const [u1Request, u2Request] = userRequests;
const checksumAfter = await bucketStorage.getChecksums(checkpoint, userRequests);
expect(checksumAfter.get(u1Request.bucket)).toEqual({
bucket: u1Request.bucket,
checksum: -659469718,
count: 1
});
expect(checksumAfter.get(bucketRequest(syncRules, 'by_user2["u2"]'))).toEqual({
bucket: bucketRequest(syncRules, 'by_user2["u2"]'),
expect(checksumAfter.get(u2Request.bucket)).toEqual({
bucket: u2Request.bucket,
checksum: 430217650,
count: 1
});
Expand Down
7 changes: 3 additions & 4 deletions modules/module-mongodb-storage/test/src/storage_sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor
const checkpoint = result!.flushed_op;

const options: storage.BucketDataBatchOptions = {};

const batch1 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([[globalBucket, 0n]]), options)
bucketStorage.getBucketDataBatch(checkpoint, [bucketRequest(syncRules, 'global[]', 0n)], options)
);
expect(test_utils.getBatchData(batch1)).toEqual([
{ op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 },
Expand All @@ -97,7 +96,7 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor
const batch2 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([[globalBucket, BigInt(batch1[0].chunkData.next_after)]]),
[bucketRequest(syncRules, 'global[]', batch1[0].chunkData.next_after)],
options
)
);
Expand All @@ -113,7 +112,7 @@ function registerSyncStorageTests(storageConfig: storage.TestStorageConfig, stor
const batch3 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([[globalBucket, BigInt(batch2[0].chunkData.next_after)]]),
[bucketRequest(syncRules, 'global[]', batch2[0].chunkData.next_after)],
options
)
);
Expand Down
39 changes: 20 additions & 19 deletions modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import {
OplogEntry,
ProtocolOpId,
ReplicationCheckpoint,
storage,
STORAGE_VERSION_CONFIG,
SyncRulesBucketStorage,
TestStorageOptions,
updateSyncRulesFromYaml,
utils
} from '@powersync/service-core';
import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';
import { bucketRequest, METRICS_HELPER, test_utils } from '@powersync/service-core-tests';

import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js';
import { MongoManager } from '@module/replication/MongoManager.js';
Expand All @@ -28,6 +29,7 @@ export class ChangeStreamTestContext {
private abortController = new AbortController();
private streamPromise?: Promise<PromiseSettledResult<void>>;
private syncRulesId?: number;
private syncRulesContent?: storage.PersistedSyncRulesContent;
public storage?: SyncRulesBucketStorage;

/**
Expand Down Expand Up @@ -104,6 +106,7 @@ export class ChangeStreamTestContext {
updateSyncRulesFromYaml(content, { validate: true, storageVersion: this.storageVersion })
);
this.syncRulesId = syncRules.id;
this.syncRulesContent = syncRules;
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}
Expand All @@ -115,10 +118,18 @@ export class ChangeStreamTestContext {
}

this.syncRulesId = syncRules.id;
this.syncRulesContent = syncRules;
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}

private getSyncRulesContent(): storage.PersistedSyncRulesContent {
if (this.syncRulesContent == null) {
throw new Error('Sync rules not configured - call updateSyncRules() first');
}
return this.syncRulesContent;
}

get streamer() {
if (this.storage == null) {
throw new Error('updateSyncRules() first');
Expand Down Expand Up @@ -183,21 +194,10 @@ export class ChangeStreamTestContext {
return checkpoint;
}

private resolveBucketName(bucket: string) {
if (!this.versionedBuckets || /^\d+#/.test(bucket)) {
return bucket;
}
if (this.syncRulesId == null) {
throw new Error('Sync rules not configured - call updateSyncRules() first');
}
return `${this.syncRulesId}#${bucket}`;
}

async getBucketsDataBatch(buckets: Record<string, InternalOpId>, options?: { timeout?: number }) {
let checkpoint = await this.getCheckpoint(options);
const map = new Map<string, InternalOpId>(
Object.entries(buckets).map(([bucket, opId]) => [this.resolveBucketName(bucket), opId])
);
const syncRules = this.getSyncRulesContent();
const map = Object.entries(buckets).map(([bucket, start]) => bucketRequest(syncRules, bucket, start));
return test_utils.fromAsync(this.storage!.getBucketDataBatch(checkpoint, map));
}

Expand All @@ -206,9 +206,9 @@ export class ChangeStreamTestContext {
if (typeof start == 'string') {
start = BigInt(start);
}
const resolvedBucket = this.resolveBucketName(bucket);
const syncRules = this.getSyncRulesContent();
const checkpoint = await this.getCheckpoint(options);
const map = new Map<string, InternalOpId>([[resolvedBucket, start]]);
let map = [bucketRequest(syncRules, bucket, start)];
let data: OplogEntry[] = [];
while (true) {
const batch = this.storage!.getBucketDataBatch(checkpoint, map);
Expand All @@ -218,19 +218,20 @@ export class ChangeStreamTestContext {
if (batches.length == 0 || !batches[0]!.chunkData.has_more) {
break;
}
map.set(resolvedBucket, BigInt(batches[0]!.chunkData.next_after));
map = [bucketRequest(syncRules, bucket, BigInt(batches[0]!.chunkData.next_after))];
}
return data;
}

async getChecksums(buckets: string[], options?: { timeout?: number }): Promise<utils.ChecksumMap> {
let checkpoint = await this.getCheckpoint(options);
const versionedBuckets = buckets.map((bucket) => this.resolveBucketName(bucket));
const syncRules = this.getSyncRulesContent();
const versionedBuckets = buckets.map((bucket) => bucketRequest(syncRules, bucket, 0n));
const checksums = await this.storage!.getChecksums(checkpoint, versionedBuckets);

const unversioned: utils.ChecksumMap = new Map();
for (let i = 0; i < buckets.length; i++) {
unversioned.set(buckets[i], checksums.get(versionedBuckets[i])!);
unversioned.set(buckets[i], checksums.get(versionedBuckets[i].bucket)!);
}
return unversioned;
}
Expand Down
Loading