diff --git a/CHANGELOG.md b/CHANGELOG.md index d8a4e7a..10ca806 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## Upcoming +- Align Prisma WAL/state-protocol events with the published spec by using + `old_value`, validating change/control message shapes on append, and + accepting control messages without generating touch invalidations. +- Use `bun pm pack` in package smoke tests and release docs so release + validation works with the repository's Bun package-manager pin. - Add stream profiles with built-in `generic` and `state-protocol` support, including a simplified `profile`-based `/_profile` API for live/touch setup. - Rename state-protocol touch processing metrics and runtime state to diff --git a/bun.lock b/bun.lock index f08bcbd..c7850bc 100644 --- a/bun.lock +++ b/bun.lock @@ -13,6 +13,7 @@ }, "devDependencies": { "@durable-streams/server-conformance-tests": "^0.2.1", + "@durable-streams/state": "0.2.5", "bun-types": "^1.3.6", "fast-check": "^3.14.0", "typescript": "^5.9.2", @@ -29,6 +30,8 @@ "@durable-streams/server-conformance-tests": ["@durable-streams/server-conformance-tests@0.2.1", "", { "dependencies": { "@durable-streams/client": "0.2.1", "fast-check": "^4.4.0", "vitest": "^4.0.0" }, "bin": { "server-conformance-tests": "dist/cli.js", "durable-streams-server-conformance": "dist/cli.js", "durable-streams-server-conformance-dev": "bin/conformance-dev.mjs" } }, "sha512-Us94fOweskeGqWNkQJo1sYyPtHuvYR8BYVmpJtaJEWWNkrkXFKcirfIOsC1vdCVJJyTwSXM3ZV8EuP9vpgSNhw=="], + "@durable-streams/state": ["@durable-streams/state@0.2.5", "", { "dependencies": { "@durable-streams/client": "0.2.3", "@standard-schema/spec": "^1.0.0" }, "peerDependencies": { "@tanstack/db": ">=0.5.0" }, "bin": { "intent": "bin/intent.js" } }, "sha512-9vVe2Ww+FxVuNs79f5geS7KZ6kvFXOROC+gCZmIXVugb5Fpah/B/IBl+kTm0gL/4MpjIuAcZtu3JK4rG9TuEeA=="], + "@esbuild/aix-ppc64": ["@esbuild/aix-ppc64@0.27.2", "", { "os": "aix", "cpu": "ppc64" }, "sha512-GZMB+a0mOMZs4MpDbj8RJp4cw+w1WV5NYD6xzgvzUJ5Ek2jerwfO2eADyI6ExDSUED+1X8aMbegahsJi+8mgpw=="], "@esbuild/android-arm": ["@esbuild/android-arm@0.27.2", "", { "os": "android", "cpu": "arm" }, "sha512-DVNI8jlPa7Ujbr1yjU2PfUSRtAUZPG9I1RwW4F4xFB1Imiu2on0ADiI/c3td+KmDtVKNbi+nffGDQMfcIMkwIA=="], @@ -137,6 +140,12 @@ "@standard-schema/spec": ["@standard-schema/spec@1.1.0", "", {}, "sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w=="], + "@tanstack/db": ["@tanstack/db@0.6.5", "", { "dependencies": { "@standard-schema/spec": "^1.1.0", "@tanstack/db-ivm": "0.1.18", "@tanstack/pacer-lite": "^0.2.1" }, "peerDependencies": { "typescript": ">=4.7" } }, "sha512-gtCuAo4UtC9SR/kTMu5fVEff6qZ2R1FZi9X7MybtHKA6wve7RePifGG6qBI4OmMB+7juT5/+glNbnqZOrG0/pg=="], + + "@tanstack/db-ivm": ["@tanstack/db-ivm@0.1.18", "", { "dependencies": { "fractional-indexing": "^3.2.0", "sorted-btree": "^1.8.1" }, "peerDependencies": { "typescript": ">=4.7" } }, "sha512-+pZJiRKdoKRM5Epq9T7otD9ZJl82pRFauo7LKuJGrarjVKQ7r+QQlPe3kGdN9LEKSnuNGIWjX9OOY4M8kH4eLw=="], + + "@tanstack/pacer-lite": ["@tanstack/pacer-lite@0.2.1", "", {}, "sha512-3PouiFjR4B6x1c969/Pl4ZIJleof1M0n6fNX8NRiC9Sqv1g06CVDlEaXUR4212ycGFyfq4q+t8Gi37Xy+z34iQ=="], + "@types/chai": ["@types/chai@5.2.3", "", { "dependencies": { "@types/deep-eql": "*", "assertion-error": "^2.0.1" } }, "sha512-Mw558oeA9fFbv65/y4mHtXDs9bPnFMZAL/jxdPFUpOHHIXX91mcgEHbS5Lahr+pwZFR8A7GQleRWeI6cGFC2UA=="], "@types/deep-eql": ["@types/deep-eql@4.0.2", "", {}, "sha512-c9h9dVVMigMPc4bwTvC5dxqtqJZwQPePsWjPlpSOnojbor6pGqdk541lfA7AqFQr5pB1BRdq0juY9db81BwyFw=="], @@ -189,6 +198,8 @@ "fdir": ["fdir@6.5.0", "", { "peerDependencies": { "picomatch": "^3 || ^4" }, "optionalPeers": ["picomatch"] }, "sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg=="], + "fractional-indexing": ["fractional-indexing@3.2.0", "", {}, "sha512-PcOxmqwYCW7O2ovKRU8OoQQj2yqTfEB/yeTYk4gPid6dN5ODRfU1hXd9tTVZzax/0NkO7AxpHykvZnT1aYp/BQ=="], + "fsevents": ["fsevents@2.3.3", "", { "os": "darwin" }, "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw=="], "graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="], @@ -227,6 +238,8 @@ "sisteransi": ["sisteransi@1.0.5", "", {}, "sha512-bLGGlR1QxBcynn2d5YmDX4MGjlZvy2MRBDRNHLJ8VI6l6+9FUiyTFNJ0IveOSP0bcXgVDPRcfGqA0pjaqUpfVg=="], + "sorted-btree": ["sorted-btree@1.8.1", "", {}, "sha512-395+XIP+wqNn3USkFSrNz7G3Ss/MXlZEqesxvzCRFwL14h6e8LukDHdLBePn5pwbm5OQ9vGu8mDyz2lLDIqamQ=="], + "source-map-js": ["source-map-js@1.2.1", "", {}, "sha512-UXWMKhLOwVKb728IUtQPXxfYU+usdybtUrK/8uGE8CQMvrhOpwvzDBwj0QhSL7MQc7vIsISBG8VQ8+IDQxpfQA=="], "stackback": ["stackback@0.0.2", "", {}, "sha512-1XMJE5fQo1jGH6Y/7ebnwPOBEkIEnT4QF32d5R1+VXdXveM0IBMJt8zfaxX1P3QhVwrYe+576+jkANtSS2mBbw=="], @@ -253,6 +266,8 @@ "@durable-streams/server-conformance-tests/fast-check": ["fast-check@4.5.3", "", { "dependencies": { "pure-rand": "^7.0.0" } }, "sha512-IE9csY7lnhxBnA8g/WI5eg/hygA6MGWJMSNfFRrBlXUciADEhS1EDB0SIsMSvzubzIlOBbVITSsypCsW717poA=="], + "@durable-streams/state/@durable-streams/client": ["@durable-streams/client@0.2.3", "", { "dependencies": { "@microsoft/fetch-event-source": "^2.0.1", "fastq": "^1.19.1" }, "bin": { "intent": "bin/intent.js" } }, "sha512-609hWTqe8/OXzIFnv+oDdlT57QsCAc3F2c/nAQBcYhSLmmbXk5rHx7rnQSmk9MeGGQ8dsg9UCZf47dTJG3q3ig=="], + "@durable-streams/server-conformance-tests/fast-check/pure-rand": ["pure-rand@7.0.1", "", {}, "sha512-oTUZM/NAZS8p7ANR3SHh30kXB+zK2r2BPcEn/awJIbOvq82WoMN4p62AWWp3Hhw50G0xMsw1mhIBLqHw64EcNQ=="], } } diff --git a/docs/live-load-tests.md b/docs/live-load-tests.md index 702d591..bb406b1 100644 --- a/docs/live-load-tests.md +++ b/docs/live-load-tests.md @@ -76,7 +76,7 @@ bun run experiments/loadtests/live/write_path.ts \ - `--steps 1000,5000,10000,20000` and `--step-seconds 60` - `--producers` and `--batch-events` (client-side throughput tuning) -- `--columns 128` (size of `value` and `oldValue` objects) +- `--columns 128` (size of `value` and `old_value` objects) - `--coarse-interval-ms` and `--coalesce-window-ms` - Guardrails: - `--lag-degrade-offsets` + `--lag-recover-offsets` (hysteresis for coarse-only mode under lag) diff --git a/docs/live.md b/docs/live.md index 84b33c6..b3e3a92 100644 --- a/docs/live.md +++ b/docs/live.md @@ -70,7 +70,7 @@ Touch generation consumes State Protocol records from the base stream. "type": "public.todos", "key": "1", "value": { "id": "1", "tenantId": "t1", "status": "open" }, - "oldValue": { "id": "1", "tenantId": "t1", "status": "done" }, + "old_value": { "id": "1", "tenantId": "t1", "status": "done" }, "headers": { "operation": "update", "txid": "2057", @@ -85,7 +85,7 @@ Rules: 2. `type` must be non-empty. 3. `key` must be non-empty. 4. `value` should exist for `insert|update`. -5. `oldValue` is optional but strongly recommended for precise invalidation on +5. `old_value` is optional but strongly recommended for precise invalidation on updates. 6. Control messages are ignored for touch derivation. @@ -487,7 +487,7 @@ keys from those templates. Otherwise it waits on your provided keyset as-is. } ``` -This pattern assumes your change adapter can provide the `oldValue` data needed +This pattern assumes your change adapter can provide the `old_value` data needed for precise update invalidation. If before-images are not guaranteed, prefer coarse waits for correctness. @@ -659,7 +659,7 @@ before-images. `touch.onMissingBefore`: -- `coarse` (default): suppress fine invalidation when `oldValue` is missing, +- `coarse` (default): suppress fine invalidation when `old_value` is missing, but still emit coarse table touches - `skipBefore`: best-effort after-only fine invalidation - `error`: treat missing or insufficient before-images as a state-protocol @@ -669,7 +669,7 @@ Guidance: - treat touches as invalidation hints only - always re-run the real query after `touched` -- provide `oldValue` for updates whenever possible +- provide `old_value` for updates whenever possible - fine waits are only fully correct when the adapter can supply the before image fields needed to derive the right watch keys - if before-images are not guaranteed, prefer coarse waits for correctness diff --git a/docs/prisma-dev-pglite-live.md b/docs/prisma-dev-pglite-live.md index 4233a81..d9f97a3 100644 --- a/docs/prisma-dev-pglite-live.md +++ b/docs/prisma-dev-pglite-live.md @@ -138,7 +138,7 @@ Each committed database change should be normalized into a JSON record like: "type": "public.posts", "key": "42", "value": { "id": 42, "userId": "u1", "title": "Hello" }, - "oldValue": { "id": 42, "userId": "u0", "title": "Hello" }, + "old_value": { "id": 42, "userId": "u0", "title": "Hello" }, "headers": { "operation": "update", "txid": "12345", @@ -152,15 +152,15 @@ Mapping guidance: - insert: - `headers.operation = "insert"` - `value = row after` - - `oldValue = null` or omitted + - `old_value = null` or omitted - update: - `headers.operation = "update"` - `value = row after` - - `oldValue = row before` + - `old_value = row before` - delete: - `headers.operation = "delete"` - `value = null` - - `oldValue = row before` + - `old_value = row before` Field meanings: @@ -171,16 +171,16 @@ Field meanings: - primary key string is the simplest choice - `value` - after image -- `oldValue` +- `old_value` - before image - `headers.timestamp` - commit or event timestamp in RFC3339 - `headers.txid` - commit/transaction id if available -## Why `oldValue` Matters +## Why `old_value` Matters -The live system can operate without `oldValue`, but update invalidation becomes +The live system can operate without `old_value`, but update invalidation becomes less precise. Recommended policy: diff --git a/docs/profile-state-protocol.md b/docs/profile-state-protocol.md index 6719a2b..e474b3e 100644 --- a/docs/profile-state-protocol.md +++ b/docs/profile-state-protocol.md @@ -40,6 +40,7 @@ and related live invalidation behavior. `state-protocol` owns: - profile validation for the touch config +- append-time validation for State Protocol change and control messages - canonical change derivation for touch processing - `/touch/meta` - `/touch/wait` @@ -54,6 +55,15 @@ The core engine only provides the shared plumbing: - template registry - notifier integration +Append-time validation follows the published State Protocol message shapes: + +- change messages must use `type`, `key`, `headers.operation`, and `value` for + `insert|update` +- control messages must use `headers.control` with optional `headers.offset` +- malformed change/control records are rejected with `400` +- control messages are stored in the stream but do not produce touch + invalidations + ## Schema Relationship Schemas are still optional on `state-protocol` streams. @@ -74,7 +84,7 @@ When `state-protocol` is used for WAL change events like: "type": "public.posts", "key": "42", "value": { "id": 42, "title": "Hello" }, - "oldValue": null, + "old_value": null, "headers": { "operation": "insert", "timestamp": "2026-03-16T12:00:00.000Z" diff --git a/docs/releasing.md b/docs/releasing.md index 6341498..3eb260c 100644 --- a/docs/releasing.md +++ b/docs/releasing.md @@ -78,8 +78,8 @@ This produces: 4. Inspect the package contents: ```bash -npm pack --dry-run ./dist/npm/streams-local -npm pack --dry-run ./dist/npm/streams-server +(cd ./dist/npm/streams-local && bun pm pack --dry-run) +(cd ./dist/npm/streams-server && bun pm pack --dry-run) ``` 5. Publish the packages: diff --git a/experiments/demo/live_fields_app.ts b/experiments/demo/live_fields_app.ts index ee68f0f..4fa6f2b 100644 --- a/experiments/demo/live_fields_app.ts +++ b/experiments/demo/live_fields_app.ts @@ -289,7 +289,7 @@ async function appendFieldUpdate(baseUrl: string, stream: string, update: FieldU field: update.field, text: update.value, }, - oldValue: { + old_value: { field: update.field, text: previousText, }, @@ -978,7 +978,7 @@ async function publishFieldUpdate(opts: { field: opts.field, text: opts.nextValue, }, - oldValue: { + old_value: { field: opts.field, text: opts.previousValue, }, diff --git a/experiments/demo/wal_demo_ingest.ts b/experiments/demo/wal_demo_ingest.ts index f42a9b0..5eb163d 100644 --- a/experiments/demo/wal_demo_ingest.ts +++ b/experiments/demo/wal_demo_ingest.ts @@ -71,7 +71,7 @@ type StateProtocolChange = { type: string; key: string; value?: any; - oldValue?: any; + old_value?: any; headers: { operation: "insert" | "update" | "delete"; txid?: string; @@ -152,7 +152,7 @@ function wal2jsonV1EnvelopeToStateProtocol(env: Wal2JsonV1Envelope): StateProtoc type, key: keyBefore, value: null, - oldValue: before, + old_value: before, headers: { operation: "delete", txid, timestamp }, }); continue; @@ -169,7 +169,7 @@ function wal2jsonV1EnvelopeToStateProtocol(env: Wal2JsonV1Envelope): StateProtoc type, key: oldKey, value: null, - oldValue: before, + old_value: before, headers: { operation: "delete", txid, timestamp }, }); out.push({ @@ -185,7 +185,7 @@ function wal2jsonV1EnvelopeToStateProtocol(env: Wal2JsonV1Envelope): StateProtoc type, key: oldKey, value: after, - oldValue: before, + old_value: before, headers: { operation: "update", txid, timestamp }, }); } diff --git a/experiments/loadtests/live/read_path.ts b/experiments/loadtests/live/read_path.ts index a10908b..cef184a 100644 --- a/experiments/loadtests/live/read_path.ts +++ b/experiments/loadtests/live/read_path.ts @@ -387,7 +387,7 @@ async function runTouchDriver(args: { const valueJson = `{\"id\":\"${rng.nextU32()}\",\"userId\":${JSON.stringify(encodeUser(userIdx))},\"orgId\":${JSON.stringify( encodeOrg(orgIdx) )},\"status\":${JSON.stringify(encodeStatus(statusIdx))}}`; - const evt = `{\"type\":\"public.posts\",\"key\":\"${rng.nextU32()}\",\"value\":${valueJson},\"oldValue\":${valueJson},\"headers\":{\"operation\":\"update\",\"txid\":\"${rng.nextU32()}\",\"timestamp\":\"${new Date().toISOString()}\"}}`; + const evt = `{\"type\":\"public.posts\",\"key\":\"${rng.nextU32()}\",\"value\":${valueJson},\"old_value\":${valueJson},\"headers\":{\"operation\":\"update\",\"txid\":\"${rng.nextU32()}\",\"timestamp\":\"${new Date().toISOString()}\"}}`; batch.push(evt); } const body = `[${batch.join(",")}]`; diff --git a/experiments/loadtests/live/selective_shedding.ts b/experiments/loadtests/live/selective_shedding.ts index 3bc4cdb..ba41a19 100644 --- a/experiments/loadtests/live/selective_shedding.ts +++ b/experiments/loadtests/live/selective_shedding.ts @@ -324,7 +324,7 @@ function makeEvent(args: { txid: bigint; }): any { const value = { id: String(args.id), ...args.valueDims }; - const oldValue = { id: String(args.id), ...args.oldDims }; + const beforeValue = { id: String(args.id), ...args.oldDims }; if (args.op === "insert") { return { type: args.entity, @@ -341,7 +341,7 @@ function makeEvent(args: { return { type: args.entity, key: String(args.id), - oldValue, + old_value: beforeValue, headers: { operation: "delete", txid: args.txid.toString(), @@ -353,7 +353,7 @@ function makeEvent(args: { type: args.entity, key: String(args.id), value, - oldValue, + old_value: beforeValue, headers: { operation: "update", txid: args.txid.toString(), diff --git a/experiments/loadtests/live/write_path.ts b/experiments/loadtests/live/write_path.ts index db56f92..f85e993 100644 --- a/experiments/loadtests/live/write_path.ts +++ b/experiments/loadtests/live/write_path.ts @@ -53,7 +53,7 @@ Options: --step-seconds (default: 60) --producers (default: 4) --batch-events (default: 200) - --columns (default: 128) Value/oldValue columns per row object + --columns (default: 128) Value/old_value columns per row object --row-space (default: 100000) Rows per entity per producer --ttl-ms (default: 3600000) Template inactivity TTL for activations --coarse-interval-ms (default: 100) @@ -405,7 +405,7 @@ function makeEventFactory(args: { entity: string; fields: string[]; columns: num const { id, before } = doDelete(); const key = String(id); const oldJson = makeRowJson(id, before); - return `{\"type\":${JSON.stringify(args.entity)},\"key\":${JSON.stringify(key)},\"value\":null,\"oldValue\":${oldJson},\"headers\":{\"operation\":\"delete\",\"txid\":${JSON.stringify( + return `{\"type\":${JSON.stringify(args.entity)},\"key\":${JSON.stringify(key)},\"value\":null,\"old_value\":${oldJson},\"headers\":{\"operation\":\"delete\",\"txid\":${JSON.stringify( String(txid) )},\"timestamp\":${JSON.stringify(timestamp)}}}`; } @@ -414,7 +414,7 @@ function makeEventFactory(args: { entity: string; fields: string[]; columns: num const key = String(id); const valueJson = makeRowJson(id, after); const oldJson = makeRowJson(id, before); - return `{\"type\":${JSON.stringify(args.entity)},\"key\":${JSON.stringify(key)},\"value\":${valueJson},\"oldValue\":${oldJson},\"headers\":{\"operation\":\"update\",\"txid\":${JSON.stringify( + return `{\"type\":${JSON.stringify(args.entity)},\"key\":${JSON.stringify(key)},\"value\":${valueJson},\"old_value\":${oldJson},\"headers\":{\"operation\":\"update\",\"txid\":${JSON.stringify( String(txid) )},\"timestamp\":${JSON.stringify(timestamp)}}}`; }; diff --git a/package.json b/package.json index faeab02..61641d1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "prisma-streams", - "version": "0.1.4", + "version": "0.1.5", "private": true, "description": "Prisma Streams is a Bun + TypeScript implementation of the Durable Streams HTTP protocol, with a local development server for Prisma CLI integration.", "repository": { @@ -73,6 +73,7 @@ "./package.json": "./package.json" }, "devDependencies": { + "@durable-streams/state": "0.2.5", "@durable-streams/server-conformance-tests": "^0.2.1", "bun-types": "^1.3.6", "fast-check": "^3.14.0", diff --git a/scripts/test-bun-local-package.mjs b/scripts/test-bun-local-package.mjs index f68d65b..4bca0d5 100644 --- a/scripts/test-bun-local-package.mjs +++ b/scripts/test-bun-local-package.mjs @@ -42,10 +42,11 @@ try { if (localPackageManifest.engines?.bun !== localPackageBunEngine) { throw new Error(`@prisma/streams-local should publish bun ${localPackageBunEngine}, got ${localPackageManifest.engines?.bun}`); } - const packOutput = run("npm", ["pack", "--pack-destination", packDir], localPackageDir); - const tarballName = packOutput.split(/\r?\n/).filter(Boolean).at(-1); - if (!tarballName) throw new Error("npm pack did not produce a tarball name"); - const tarballPath = join(packDir, tarballName); + const tarballPath = run("bun", ["pm", "pack", "--destination", packDir, "--quiet"], localPackageDir) + .split(/\r?\n/) + .filter(Boolean) + .at(-1); + if (!tarballPath) throw new Error("bun pm pack did not produce a tarball path"); writeFileSync( join(consumerDir, "package.json"), @@ -243,7 +244,7 @@ try { type: "posts", key: "post:1", value: { id: "post:1", title: "hello" }, - oldValue: null, + old_value: null, headers: { operation: "insert", timestamp: new Date().toISOString(), diff --git a/scripts/test-bun-server-package.mjs b/scripts/test-bun-server-package.mjs index 1ec4a98..5bba620 100644 --- a/scripts/test-bun-server-package.mjs +++ b/scripts/test-bun-server-package.mjs @@ -76,10 +76,11 @@ try { mkdirSync(consumerDir, { recursive: true }); const serverPackageDir = join(repoRoot, "dist", "npm", "streams-server"); - const packOutput = run("npm", ["pack", "--pack-destination", packDir], serverPackageDir); - const tarballName = packOutput.split(/\r?\n/).filter(Boolean).at(-1); - if (!tarballName) throw new Error("npm pack did not produce a tarball name"); - const tarballPath = join(packDir, tarballName); + const tarballPath = run("bun", ["pm", "pack", "--destination", packDir, "--quiet"], serverPackageDir) + .split(/\r?\n/) + .filter(Boolean) + .at(-1); + if (!tarballPath) throw new Error("bun pm pack did not produce a tarball path"); writeFileSync( join(consumerDir, "package.json"), diff --git a/scripts/test-node-local-package.mjs b/scripts/test-node-local-package.mjs index 6be7ecd..d412170 100644 --- a/scripts/test-node-local-package.mjs +++ b/scripts/test-node-local-package.mjs @@ -42,10 +42,11 @@ try { if (localPackageManifest.engines?.bun !== localPackageBunEngine) { throw new Error(`@prisma/streams-local should publish bun ${localPackageBunEngine}, got ${localPackageManifest.engines?.bun}`); } - const packOutput = run("npm", ["pack", "--pack-destination", packDir], localPackageDir); - const tarballName = packOutput.split(/\r?\n/).filter(Boolean).at(-1); - if (!tarballName) throw new Error("npm pack did not produce a tarball name"); - const tarballPath = join(packDir, tarballName); + const tarballPath = run("bun", ["pm", "pack", "--destination", packDir, "--quiet"], localPackageDir) + .split(/\r?\n/) + .filter(Boolean) + .at(-1); + if (!tarballPath) throw new Error("bun pm pack did not produce a tarball path"); writeFileSync( join(consumerDir, "package.json"), @@ -243,7 +244,7 @@ try { type: "posts", key: "post:1", value: { id: "post:1", title: "hello" }, - oldValue: null, + old_value: null, headers: { operation: "insert", timestamp: new Date().toISOString(), diff --git a/src/profiles/stateProtocol.ts b/src/profiles/stateProtocol.ts index bb96ed6..e33675b 100644 --- a/src/profiles/stateProtocol.ts +++ b/src/profiles/stateProtocol.ts @@ -16,6 +16,7 @@ import { isStateProtocolProfile, validateStateProtocolProfileResult, } from "./stateProtocol/validation"; +import { validateStateProtocolRecordResult } from "./stateProtocol/ingest"; const STATE_PROTOCOL_TOUCH_CAPABILITY: StreamTouchCapability = { getTouchConfig(profile) { @@ -97,4 +98,11 @@ export const STATE_PROTOCOL_STREAM_PROFILE_DEFINITION: StreamProfileDefinition = schemaRegistry: null, }); }, + + jsonIngest: { + prepareRecordResult({ profile, value }) { + if (!isStateProtocolProfile(profile)) return Result.err({ message: "invalid state-protocol profile" }); + return validateStateProtocolRecordResult(value); + }, + }, }; diff --git a/src/profiles/stateProtocol/change_event_conformance.typecheck.ts b/src/profiles/stateProtocol/change_event_conformance.typecheck.ts new file mode 100644 index 0000000..b8008cb --- /dev/null +++ b/src/profiles/stateProtocol/change_event_conformance.typecheck.ts @@ -0,0 +1,35 @@ +import type { ChangeEvent, ControlEvent } from "@durable-streams/state"; + +type Assert = T; +type AssertFalse = T; +type IsAssignable = [From] extends [To] ? true : false; + +type PrismaWalChangeEvent = { + type: "public.posts"; + key: "42"; + value: { id: number; title: string }; + old_value: { id: number; title: string }; + headers: { + operation: "update"; + txid: string; + timestamp: string; + }; +}; + +type _PrismaWalChangeEventMatchesStateProtocol = Assert< + IsAssignable> +>; + +type _StateProtocolUsesSnakeCaseBeforeImage = Assert<"old_value" extends keyof ChangeEvent ? true : false>; +type _StateProtocolDoesNotUseCamelCaseBeforeImage = AssertFalse<"oldValue" extends keyof ChangeEvent ? true : false>; + +type PrismaWalControlEvent = { + headers: { + control: "reset"; + offset: string; + }; +}; + +type _PrismaWalControlEventMatchesStateProtocol = Assert< + IsAssignable +>; diff --git a/src/profiles/stateProtocol/changes.ts b/src/profiles/stateProtocol/changes.ts index 14e4d66..7cad8ad 100644 --- a/src/profiles/stateProtocol/changes.ts +++ b/src/profiles/stateProtocol/changes.ts @@ -17,7 +17,7 @@ export function deriveStateProtocolChanges(record: unknown): CanonicalChange[] { if (typeof type !== "string" || type.trim() === "") return []; if (typeof key !== "string" || key.trim() === "") return []; - const before = Object.prototype.hasOwnProperty.call(record, "oldValue") ? (record as any).oldValue : undefined; + const before = Object.prototype.hasOwnProperty.call(record, "old_value") ? (record as any).old_value : undefined; const after = Object.prototype.hasOwnProperty.call(record, "value") ? (record as any).value : undefined; return [{ entity: type, key, op, before, after }]; diff --git a/src/profiles/stateProtocol/ingest.ts b/src/profiles/stateProtocol/ingest.ts new file mode 100644 index 0000000..5efbe34 --- /dev/null +++ b/src/profiles/stateProtocol/ingest.ts @@ -0,0 +1,115 @@ +import { Result } from "better-result"; +import { parseOffsetResult } from "../../offset"; +import type { PreparedJsonRecord } from "../profile"; +import { expectPlainObjectResult, rejectUnknownKeysResult } from "../profile"; + +const CHANGE_KEYS = ["type", "key", "value", "old_value", "headers"] as const; +const CHANGE_HEADER_KEYS = ["operation", "txid", "timestamp"] as const; +const CONTROL_KEYS = ["headers"] as const; +const CONTROL_HEADER_KEYS = ["control", "offset"] as const; + +function isDateTimeString(value: string): boolean { + if (!/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d{1,9})?(?:Z|[+-]\d{2}:\d{2})$/.test(value)) { + return false; + } + return !Number.isNaN(Date.parse(value)); +} + +function nonEmptyStringFieldResult( + value: unknown, + path: string +): Result { + if (typeof value !== "string" || value.trim() === "") { + return Result.err({ message: `${path} must be a non-empty string` }); + } + return Result.ok(value); +} + +function validateChangeRecordResult( + record: Record, + headers: Record +): Result { + const keyCheck = rejectUnknownKeysResult(record, CHANGE_KEYS, "state-protocol record"); + if (Result.isError(keyCheck)) return keyCheck; + + const headerKeyCheck = rejectUnknownKeysResult(headers, CHANGE_HEADER_KEYS, "state-protocol record.headers"); + if (Result.isError(headerKeyCheck)) return headerKeyCheck; + + const typeRes = nonEmptyStringFieldResult(record.type, "state-protocol record.type"); + if (Result.isError(typeRes)) return typeRes; + + const keyRes = nonEmptyStringFieldResult(record.key, "state-protocol record.key"); + if (Result.isError(keyRes)) return keyRes; + + const operation = headers.operation; + if (operation !== "insert" && operation !== "update" && operation !== "delete") { + return Result.err({ message: "state-protocol record.headers.operation must be insert, update, or delete" }); + } + + if ((operation === "insert" || operation === "update") && !Object.prototype.hasOwnProperty.call(record, "value")) { + return Result.err({ message: `state-protocol ${operation} records must include value` }); + } + + if (Object.prototype.hasOwnProperty.call(headers, "txid")) { + const txidRes = nonEmptyStringFieldResult(headers.txid, "state-protocol record.headers.txid"); + if (Result.isError(txidRes)) return txidRes; + } + + if (Object.prototype.hasOwnProperty.call(headers, "timestamp")) { + if (typeof headers.timestamp !== "string" || !isDateTimeString(headers.timestamp)) { + return Result.err({ message: "state-protocol record.headers.timestamp must be a valid RFC 3339 timestamp" }); + } + } + + return Result.ok({ value: record, routingKey: null }); +} + +function validateControlRecordResult( + record: Record, + headers: Record +): Result { + const keyCheck = rejectUnknownKeysResult(record, CONTROL_KEYS, "state-protocol record"); + if (Result.isError(keyCheck)) return keyCheck; + + const headerKeyCheck = rejectUnknownKeysResult(headers, CONTROL_HEADER_KEYS, "state-protocol record.headers"); + if (Result.isError(headerKeyCheck)) return headerKeyCheck; + + const control = headers.control; + if (control !== "snapshot-start" && control !== "snapshot-end" && control !== "reset") { + return Result.err({ message: "state-protocol record.headers.control must be snapshot-start, snapshot-end, or reset" }); + } + + if (Object.prototype.hasOwnProperty.call(headers, "offset")) { + if (typeof headers.offset !== "string") { + return Result.err({ message: "state-protocol record.headers.offset must be a valid stream offset string" }); + } + const offsetRes = parseOffsetResult(headers.offset); + if (Result.isError(offsetRes)) { + return Result.err({ message: "state-protocol record.headers.offset must be a valid stream offset string" }); + } + } + + return Result.ok({ value: record, routingKey: null }); +} + +export function validateStateProtocolRecordResult(value: unknown): Result { + const recordRes = expectPlainObjectResult(value, "state-protocol record"); + if (Result.isError(recordRes)) { + return Result.err({ message: "state-protocol records must be JSON objects" }); + } + + const headersRes = expectPlainObjectResult(recordRes.value.headers, "state-protocol record.headers"); + if (Result.isError(headersRes)) { + return Result.err({ message: "state-protocol record.headers must be an object" }); + } + + const hasControl = Object.prototype.hasOwnProperty.call(headersRes.value, "control"); + const hasOperation = Object.prototype.hasOwnProperty.call(headersRes.value, "operation"); + + if (hasControl && hasOperation) { + return Result.err({ message: "state-protocol record.headers cannot mix control and operation" }); + } + if (hasControl) return validateControlRecordResult(recordRes.value, headersRes.value); + if (hasOperation) return validateChangeRecordResult(recordRes.value, headersRes.value); + return Result.err({ message: "state-protocol record.headers must contain operation or control" }); +} diff --git a/src/touch/processor_worker.ts b/src/touch/processor_worker.ts index fb5d408..21bbaba 100644 --- a/src/touch/processor_worker.ts +++ b/src/touch/processor_worker.ts @@ -314,13 +314,13 @@ async function handleProcess(msg: ProcessRequest): Promise { } else { if (beforeObj === undefined) { if (onMissingBefore === "error") { - failProcess(`missing oldValue for update (entity=${entity}, templateId=${tpl.templateId})`); + failProcess(`missing old_value for update (entity=${entity}, templateId=${tpl.templateId})`); return; } } else { - // oldValue exists but missing fields / unsupported types. + // old_value exists but missing fields / unsupported types. if (onMissingBefore === "error") { - failProcess(`oldValue missing required fields for update (entity=${entity}, templateId=${tpl.templateId})`); + failProcess(`old_value missing required fields for update (entity=${entity}, templateId=${tpl.templateId})`); return; } } diff --git a/src/touch/spec.ts b/src/touch/spec.ts index e7e8eb6..4df0748 100644 --- a/src/touch/spec.ts +++ b/src/touch/spec.ts @@ -38,7 +38,7 @@ export type TouchConfig = { */ touchCoalesceWindowMs?: number; /** - * Policy when an update event is missing `oldValue` (before image). + * Policy when an update event is missing `old_value` (before image). * * - coarse: emit coarse table touches only (safe default) * - skipBefore: compute fine touches from `value` only diff --git a/test/bootstrap_from_r2.test.ts b/test/bootstrap_from_r2.test.ts index 7537f08..a556820 100644 --- a/test/bootstrap_from_r2.test.ts +++ b/test/bootstrap_from_r2.test.ts @@ -63,19 +63,35 @@ describe("bootstrap from R2", () => { body: JSON.stringify({ schema: { type: "object", + required: ["type", "key", "value", "headers"], properties: { - eventTime: { type: "string" }, - x: { type: "number" }, - service: { type: "string" }, + type: { type: "string" }, + key: { type: "string" }, + value: { + type: "object", + properties: { + x: { type: "number" }, + service: { type: "string" }, + duration: { type: "number" }, + }, + required: ["x", "service", "duration"], + }, + headers: { + type: "object", + required: ["operation", "timestamp"], + properties: { + operation: { type: "string" }, + timestamp: { type: "string" }, + }, + }, }, - required: ["x", "service"], }, search: { primaryTimestampField: "eventTime", fields: { eventTime: { kind: "date", - bindings: [{ version: 1, jsonPointer: "/eventTime" }], + bindings: [{ version: 1, jsonPointer: "/headers/timestamp" }], exact: true, column: true, exists: true, @@ -83,7 +99,7 @@ describe("bootstrap from R2", () => { }, service: { kind: "keyword", - bindings: [{ version: 1, jsonPointer: "/service" }], + bindings: [{ version: 1, jsonPointer: "/value/service" }], normalizer: "lowercase_v1", exact: true, prefix: true, @@ -91,7 +107,7 @@ describe("bootstrap from R2", () => { }, duration: { kind: "float", - bindings: [{ version: 1, jsonPointer: "/duration" }], + bindings: [{ version: 1, jsonPointer: "/value/duration" }], exact: true, column: true, exists: true, @@ -136,10 +152,17 @@ describe("bootstrap from R2", () => { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ - eventTime: `2026-03-25T10:0${i}:00.000Z`, - x: i, - service: i % 2 === 0 ? "api" : "worker", - duration: 100 + i * 10, + type: "public.requests", + key: String(i), + value: { + x: i, + service: i % 2 === 0 ? "api" : "worker", + duration: 100 + i * 10, + }, + headers: { + operation: "insert", + timestamp: `2026-03-25T10:0${i}:00.000Z`, + }, }), }) ); @@ -222,7 +245,7 @@ describe("bootstrap from R2", () => { fields: { eventTime: { kind: "date", - bindings: [{ version: 1, jsonPointer: "/eventTime" }], + bindings: [{ version: 1, jsonPointer: "/headers/timestamp" }], exact: true, column: true, exists: true, @@ -230,7 +253,7 @@ describe("bootstrap from R2", () => { }, service: { kind: "keyword", - bindings: [{ version: 1, jsonPointer: "/service" }], + bindings: [{ version: 1, jsonPointer: "/value/service" }], normalizer: "lowercase_v1", exact: true, prefix: true, @@ -238,7 +261,7 @@ describe("bootstrap from R2", () => { }, duration: { kind: "float", - bindings: [{ version: 1, jsonPointer: "/duration" }], + bindings: [{ version: 1, jsonPointer: "/value/duration" }], exact: true, column: true, exists: true, diff --git a/test/profile_state_protocol.test.ts b/test/profile_state_protocol.test.ts index 15eecb6..26192f7 100644 --- a/test/profile_state_protocol.test.ts +++ b/test/profile_state_protocol.test.ts @@ -3,6 +3,7 @@ import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { bootstrapFromR2 } from "../src/bootstrap"; +import { tableKeyFor } from "../src/touch/live_keys"; import { createProfileTestApp, fetchJsonApp, makeProfileTestConfig } from "./profile_test_utils"; describe("state-protocol profile", () => { @@ -254,4 +255,180 @@ describe("state-protocol profile", () => { rmSync(root2, { recursive: true, force: true }); } }); + + test("accepts valid control messages and ignores them for touch derivation", async () => { + const root = mkdtempSync(join(tmpdir(), "ds-profile-state-control-")); + const { app } = createProfileTestApp(root); + try { + const stream = "state-control"; + await app.fetch( + new Request(`http://local/v1/stream/${stream}`, { + method: "PUT", + headers: { "content-type": "application/json" }, + }) + ); + + const profileRes = await fetchJsonApp(app, `http://local/v1/stream/${stream}/_profile`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + apiVersion: "durable.streams/profile/v1", + profile: { + kind: "state-protocol", + touch: { + enabled: true, + }, + }, + }), + }); + expect(profileRes.status).toBe(200); + + const metaRes = await fetchJsonApp(app, `http://local/v1/stream/${stream}/touch/meta`, { method: "GET" }); + expect(metaRes.status).toBe(200); + + const appendRes = await app.fetch( + new Request(`http://local/v1/stream/${stream}`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + headers: { + control: "reset", + offset: "-1", + }, + }), + }) + ); + expect(appendRes.status).toBe(204); + + app.deps.touch.notify(stream); + await app.deps.touch.tick(); + + const waitRes = await fetchJsonApp(app, `http://local/v1/stream/${stream}/touch/wait`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + cursor: metaRes.body.cursor, + keys: [tableKeyFor("public.posts")], + timeoutMs: 0, + }), + }); + expect(waitRes.status).toBe(200); + expect(waitRes.body?.touched).toBe(false); + + const readRes = await fetchJsonApp(app, `http://local/v1/stream/${stream}?format=json`, { method: "GET" }); + expect(readRes.status).toBe(200); + expect(readRes.body).toEqual([ + { + headers: { + control: "reset", + offset: "-1", + }, + }, + ]); + } finally { + app.close(); + rmSync(root, { recursive: true, force: true }); + } + }); + + test("rejects malformed state-protocol records on append", async () => { + const root = mkdtempSync(join(tmpdir(), "ds-profile-state-append-validate-")); + const { app } = createProfileTestApp(root); + try { + const stream = "state-append-validate"; + await app.fetch( + new Request(`http://local/v1/stream/${stream}`, { + method: "PUT", + headers: { "content-type": "application/json" }, + }) + ); + + const profileRes = await fetchJsonApp(app, `http://local/v1/stream/${stream}/_profile`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + apiVersion: "durable.streams/profile/v1", + profile: { + kind: "state-protocol", + touch: { + enabled: true, + }, + }, + }), + }); + expect(profileRes.status).toBe(200); + + const cases: Array<{ name: string; body: unknown; message: string }> = [ + { + name: "update missing value", + body: { + type: "public.posts", + key: "42", + headers: { operation: "update" }, + }, + message: "must include value", + }, + { + name: "invalid timestamp", + body: { + type: "public.posts", + key: "42", + value: { id: 42 }, + headers: { operation: "insert", timestamp: "not-a-timestamp" }, + }, + message: "valid RFC 3339 timestamp", + }, + { + name: "empty txid", + body: { + type: "public.posts", + key: "42", + value: { id: 42 }, + headers: { operation: "insert", txid: "" }, + }, + message: "txid must be a non-empty string", + }, + { + name: "invalid control offset", + body: { + headers: { control: "reset", offset: "not-an-offset" }, + }, + message: "valid stream offset string", + }, + { + name: "mixed control and operation", + body: { + headers: { control: "reset", operation: "delete" }, + }, + message: "cannot mix control and operation", + }, + { + name: "control message with extra field", + body: { + type: "public.posts", + headers: { control: "snapshot-start" }, + }, + message: "state-protocol record.type is not supported", + }, + { + name: "non-object payload", + body: "hello", + message: "must be JSON objects", + }, + ]; + + for (const tc of cases) { + const appendRes = await fetchJsonApp(app, `http://local/v1/stream/${stream}`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(tc.body), + }); + expect(appendRes.status, tc.name).toBe(400); + expect(String(appendRes.body?.error?.message ?? ""), tc.name).toContain(tc.message); + } + } finally { + app.close(); + rmSync(root, { recursive: true, force: true }); + } + }); }); diff --git a/test/state_protocol_changes.test.ts b/test/state_protocol_changes.test.ts new file mode 100644 index 0000000..af0a3de --- /dev/null +++ b/test/state_protocol_changes.test.ts @@ -0,0 +1,60 @@ +import { describe, expect, test } from "bun:test"; +import { deriveStateProtocolChanges } from "../src/profiles/stateProtocol/changes"; + +describe("state-protocol change derivation", () => { + test("reads the before image from old_value", () => { + const changes = deriveStateProtocolChanges({ + type: "public.posts", + key: "42", + value: { id: 42, title: "After" }, + old_value: { id: 42, title: "Before" }, + headers: { + operation: "update", + txid: "12345", + timestamp: "2026-03-16T12:00:00.000Z", + }, + }); + + expect(changes).toEqual([ + { + entity: "public.posts", + key: "42", + op: "update", + before: { id: 42, title: "Before" }, + after: { id: 42, title: "After" }, + }, + ]); + }); + + test("ignores the legacy oldValue field", () => { + const changes = deriveStateProtocolChanges({ + type: "public.posts", + key: "42", + value: { id: 42, title: "After" }, + oldValue: { id: 42, title: "Before" }, + headers: { + operation: "update", + }, + }); + + expect(changes).toHaveLength(1); + expect(changes[0]).toEqual({ + entity: "public.posts", + key: "42", + op: "update", + before: undefined, + after: { id: 42, title: "After" }, + }); + }); + + test("ignores control messages for touch derivation", () => { + const changes = deriveStateProtocolChanges({ + headers: { + control: "reset", + offset: "-1", + }, + }); + + expect(changes).toEqual([]); + }); +}); diff --git a/test/touch_memory_journal.test.ts b/test/touch_memory_journal.test.ts index d33d53a..e2312e5 100644 --- a/test/touch_memory_journal.test.ts +++ b/test/touch_memory_journal.test.ts @@ -80,7 +80,7 @@ describe("touch storage=memory (journal cursors)", () => { type: entity, key: "post:1", value: { tenantId: "t1", userId: "456" }, - oldValue: { tenantId: "t1", userId: "123" }, + old_value: { tenantId: "t1", userId: "123" }, headers: { operation: "update" }, }; await fetchJson(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}`, { @@ -233,7 +233,7 @@ describe("touch storage=memory (journal cursors)", () => { type: entity, key: "post:2", value: { tenantId: "t1", userId: "9" }, - oldValue: { tenantId: "t1", userId: "8" }, + old_value: { tenantId: "t1", userId: "8" }, headers: { operation: "update" }, }), }); @@ -428,7 +428,7 @@ describe("touch storage=memory (journal cursors)", () => { type: entity, key: `post:${i + 1}`, value: { userId: i + 1 }, - oldValue: { userId: i + 1000 }, + old_value: { userId: i + 1000 }, headers: { operation: "update" }, })); await fetchJson(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}`, { @@ -532,7 +532,7 @@ describe("touch storage=memory (journal cursors)", () => { type: entity, key: `post:${i + 1}`, value: { userId: i + 1 }, - oldValue: { userId: i + 1000 }, + old_value: { userId: i + 1000 }, headers: { operation: "update" }, })); await fetchJson(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}`, { diff --git a/test/touch_processor.test.ts b/test/touch_processor.test.ts index 3ed182d..fe0e1fb 100644 --- a/test/touch_processor.test.ts +++ b/test/touch_processor.test.ts @@ -121,7 +121,7 @@ describe("live touches (state protocol)", () => { type: entity, key: "post:1", value: { tenantId: "t1", userId: "456" }, - oldValue: { tenantId: "t1", userId: "123" }, + old_value: { tenantId: "t1", userId: "123" }, headers: { operation: "update" }, }), }); @@ -154,7 +154,7 @@ describe("live touches (state protocol)", () => { } }); - test("onMissingBefore=coarse suppresses template invalidation when update is missing oldValue", async () => { + test("onMissingBefore=coarse suppresses template invalidation when update is missing old_value", async () => { const root = mkdtempSync(join(tmpdir(), "ds-live-missing-before-")); let app: ReturnType | null = null; let server: any | null = null; @@ -294,7 +294,7 @@ describe("live touches (state protocol)", () => { type: entity, key: "post:1", value: { tenantId: "t1", userId: "123" }, - oldValue: { tenantId: "t1", userId: "000" }, + old_value: { tenantId: "t1", userId: "000" }, headers: { operation: "update" }, }), }); @@ -335,7 +335,7 @@ describe("live touches (state protocol)", () => { type: entity, key: "post:1", value: { tenantId: "t1", userId: "123" }, - oldValue: { tenantId: "t1", userId: "123" }, + old_value: { tenantId: "t1", userId: "123" }, headers: { operation: "update" }, }), });