Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Upcoming

- Align Prisma WAL/state-protocol events with the published spec by using
`old_value`, validating change/control message shapes on append, and
accepting control messages without generating touch invalidations.
- Use `bun pm pack` in package smoke tests and release docs so release
validation works with the repository's Bun package-manager pin.
- Add stream profiles with built-in `generic` and `state-protocol` support,
including a simplified `profile`-based `/_profile` API for live/touch setup.
- Rename state-protocol touch processing metrics and runtime state to
Expand Down
15 changes: 15 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/live-load-tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ bun run experiments/loadtests/live/write_path.ts \

- `--steps 1000,5000,10000,20000` and `--step-seconds 60`
- `--producers` and `--batch-events` (client-side throughput tuning)
- `--columns 128` (size of `value` and `oldValue` objects)
- `--columns 128` (size of `value` and `old_value` objects)
- `--coarse-interval-ms` and `--coalesce-window-ms`
- Guardrails:
- `--lag-degrade-offsets` + `--lag-recover-offsets` (hysteresis for coarse-only mode under lag)
Expand Down
10 changes: 5 additions & 5 deletions docs/live.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Touch generation consumes State Protocol records from the base stream.
"type": "public.todos",
"key": "1",
"value": { "id": "1", "tenantId": "t1", "status": "open" },
"oldValue": { "id": "1", "tenantId": "t1", "status": "done" },
"old_value": { "id": "1", "tenantId": "t1", "status": "done" },
"headers": {
"operation": "update",
"txid": "2057",
Expand All @@ -85,7 +85,7 @@ Rules:
2. `type` must be non-empty.
3. `key` must be non-empty.
4. `value` should exist for `insert|update`.
5. `oldValue` is optional but strongly recommended for precise invalidation on
5. `old_value` is optional but strongly recommended for precise invalidation on
updates.
6. Control messages are ignored for touch derivation.

Expand Down Expand Up @@ -487,7 +487,7 @@ keys from those templates. Otherwise it waits on your provided keyset as-is.
}
```

This pattern assumes your change adapter can provide the `oldValue` data needed
This pattern assumes your change adapter can provide the `old_value` data needed
for precise update invalidation. If before-images are not guaranteed, prefer
coarse waits for correctness.

Expand Down Expand Up @@ -659,7 +659,7 @@ before-images.

`touch.onMissingBefore`:

- `coarse` (default): suppress fine invalidation when `oldValue` is missing,
- `coarse` (default): suppress fine invalidation when `old_value` is missing,
but still emit coarse table touches
- `skipBefore`: best-effort after-only fine invalidation
- `error`: treat missing or insufficient before-images as a state-protocol
Expand All @@ -669,7 +669,7 @@ Guidance:

- treat touches as invalidation hints only
- always re-run the real query after `touched`
- provide `oldValue` for updates whenever possible
- provide `old_value` for updates whenever possible
- fine waits are only fully correct when the adapter can supply the before
image fields needed to derive the right watch keys
- if before-images are not guaranteed, prefer coarse waits for correctness
Expand Down
14 changes: 7 additions & 7 deletions docs/prisma-dev-pglite-live.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Each committed database change should be normalized into a JSON record like:
"type": "public.posts",
"key": "42",
"value": { "id": 42, "userId": "u1", "title": "Hello" },
"oldValue": { "id": 42, "userId": "u0", "title": "Hello" },
"old_value": { "id": 42, "userId": "u0", "title": "Hello" },
"headers": {
"operation": "update",
"txid": "12345",
Expand All @@ -152,15 +152,15 @@ Mapping guidance:
- insert:
- `headers.operation = "insert"`
- `value = row after`
- `oldValue = null` or omitted
- `old_value = null` or omitted
- update:
- `headers.operation = "update"`
- `value = row after`
- `oldValue = row before`
- `old_value = row before`
- delete:
- `headers.operation = "delete"`
- `value = null`
- `oldValue = row before`
- `old_value = row before`

Field meanings:

Expand All @@ -171,16 +171,16 @@ Field meanings:
- primary key string is the simplest choice
- `value`
- after image
- `oldValue`
- `old_value`
- before image
- `headers.timestamp`
- commit or event timestamp in RFC3339
- `headers.txid`
- commit/transaction id if available

## Why `oldValue` Matters
## Why `old_value` Matters

The live system can operate without `oldValue`, but update invalidation becomes
The live system can operate without `old_value`, but update invalidation becomes
less precise.

Recommended policy:
Expand Down
12 changes: 11 additions & 1 deletion docs/profile-state-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ and related live invalidation behavior.
`state-protocol` owns:

- profile validation for the touch config
- append-time validation for State Protocol change and control messages
- canonical change derivation for touch processing
- `/touch/meta`
- `/touch/wait`
Expand All @@ -54,6 +55,15 @@ The core engine only provides the shared plumbing:
- template registry
- notifier integration

Append-time validation follows the published State Protocol message shapes:

- change messages must use `type`, `key`, `headers.operation`, and `value` for
`insert|update`
- control messages must use `headers.control` with optional `headers.offset`
- malformed change/control records are rejected with `400`
- control messages are stored in the stream but do not produce touch
invalidations

## Schema Relationship

Schemas are still optional on `state-protocol` streams.
Expand All @@ -74,7 +84,7 @@ When `state-protocol` is used for WAL change events like:
"type": "public.posts",
"key": "42",
"value": { "id": 42, "title": "Hello" },
"oldValue": null,
"old_value": null,
"headers": {
"operation": "insert",
"timestamp": "2026-03-16T12:00:00.000Z"
Expand Down
4 changes: 2 additions & 2 deletions docs/releasing.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ This produces:
4. Inspect the package contents:

```bash
npm pack --dry-run ./dist/npm/streams-local
npm pack --dry-run ./dist/npm/streams-server
(cd ./dist/npm/streams-local && bun pm pack --dry-run)
(cd ./dist/npm/streams-server && bun pm pack --dry-run)
```

5. Publish the packages:
Expand Down
4 changes: 2 additions & 2 deletions experiments/demo/live_fields_app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ async function appendFieldUpdate(baseUrl: string, stream: string, update: FieldU
field: update.field,
text: update.value,
},
oldValue: {
old_value: {
field: update.field,
text: previousText,
},
Expand Down Expand Up @@ -978,7 +978,7 @@ async function publishFieldUpdate(opts: {
field: opts.field,
text: opts.nextValue,
},
oldValue: {
old_value: {
field: opts.field,
text: opts.previousValue,
},
Expand Down
8 changes: 4 additions & 4 deletions experiments/demo/wal_demo_ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type StateProtocolChange = {
type: string;
key: string;
value?: any;
oldValue?: any;
old_value?: any;
headers: {
operation: "insert" | "update" | "delete";
txid?: string;
Expand Down Expand Up @@ -152,7 +152,7 @@ function wal2jsonV1EnvelopeToStateProtocol(env: Wal2JsonV1Envelope): StateProtoc
type,
key: keyBefore,
value: null,
oldValue: before,
old_value: before,
headers: { operation: "delete", txid, timestamp },
});
continue;
Expand All @@ -169,7 +169,7 @@ function wal2jsonV1EnvelopeToStateProtocol(env: Wal2JsonV1Envelope): StateProtoc
type,
key: oldKey,
value: null,
oldValue: before,
old_value: before,
headers: { operation: "delete", txid, timestamp },
});
out.push({
Expand All @@ -185,7 +185,7 @@ function wal2jsonV1EnvelopeToStateProtocol(env: Wal2JsonV1Envelope): StateProtoc
type,
key: oldKey,
value: after,
oldValue: before,
old_value: before,
headers: { operation: "update", txid, timestamp },
});
}
Expand Down
2 changes: 1 addition & 1 deletion experiments/loadtests/live/read_path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ async function runTouchDriver(args: {
const valueJson = `{\"id\":\"${rng.nextU32()}\",\"userId\":${JSON.stringify(encodeUser(userIdx))},\"orgId\":${JSON.stringify(
encodeOrg(orgIdx)
)},\"status\":${JSON.stringify(encodeStatus(statusIdx))}}`;
const evt = `{\"type\":\"public.posts\",\"key\":\"${rng.nextU32()}\",\"value\":${valueJson},\"oldValue\":${valueJson},\"headers\":{\"operation\":\"update\",\"txid\":\"${rng.nextU32()}\",\"timestamp\":\"${new Date().toISOString()}\"}}`;
const evt = `{\"type\":\"public.posts\",\"key\":\"${rng.nextU32()}\",\"value\":${valueJson},\"old_value\":${valueJson},\"headers\":{\"operation\":\"update\",\"txid\":\"${rng.nextU32()}\",\"timestamp\":\"${new Date().toISOString()}\"}}`;
batch.push(evt);
}
const body = `[${batch.join(",")}]`;
Expand Down
6 changes: 3 additions & 3 deletions experiments/loadtests/live/selective_shedding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ function makeEvent(args: {
txid: bigint;
}): any {
const value = { id: String(args.id), ...args.valueDims };
const oldValue = { id: String(args.id), ...args.oldDims };
const beforeValue = { id: String(args.id), ...args.oldDims };
if (args.op === "insert") {
return {
type: args.entity,
Expand All @@ -341,7 +341,7 @@ function makeEvent(args: {
return {
type: args.entity,
key: String(args.id),
oldValue,
old_value: beforeValue,
headers: {
operation: "delete",
txid: args.txid.toString(),
Expand All @@ -353,7 +353,7 @@ function makeEvent(args: {
type: args.entity,
key: String(args.id),
value,
oldValue,
old_value: beforeValue,
headers: {
operation: "update",
txid: args.txid.toString(),
Expand Down
6 changes: 3 additions & 3 deletions experiments/loadtests/live/write_path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Options:
--step-seconds <n> (default: 60)
--producers <n> (default: 4)
--batch-events <n> (default: 200)
--columns <n> (default: 128) Value/oldValue columns per row object
--columns <n> (default: 128) Value/old_value columns per row object
--row-space <n> (default: 100000) Rows per entity per producer
--ttl-ms <n> (default: 3600000) Template inactivity TTL for activations
--coarse-interval-ms <n> (default: 100)
Expand Down Expand Up @@ -405,7 +405,7 @@ function makeEventFactory(args: { entity: string; fields: string[]; columns: num
const { id, before } = doDelete();
const key = String(id);
const oldJson = makeRowJson(id, before);
return `{\"type\":${JSON.stringify(args.entity)},\"key\":${JSON.stringify(key)},\"value\":null,\"oldValue\":${oldJson},\"headers\":{\"operation\":\"delete\",\"txid\":${JSON.stringify(
return `{\"type\":${JSON.stringify(args.entity)},\"key\":${JSON.stringify(key)},\"value\":null,\"old_value\":${oldJson},\"headers\":{\"operation\":\"delete\",\"txid\":${JSON.stringify(
String(txid)
)},\"timestamp\":${JSON.stringify(timestamp)}}}`;
}
Expand All @@ -414,7 +414,7 @@ function makeEventFactory(args: { entity: string; fields: string[]; columns: num
const key = String(id);
const valueJson = makeRowJson(id, after);
const oldJson = makeRowJson(id, before);
return `{\"type\":${JSON.stringify(args.entity)},\"key\":${JSON.stringify(key)},\"value\":${valueJson},\"oldValue\":${oldJson},\"headers\":{\"operation\":\"update\",\"txid\":${JSON.stringify(
return `{\"type\":${JSON.stringify(args.entity)},\"key\":${JSON.stringify(key)},\"value\":${valueJson},\"old_value\":${oldJson},\"headers\":{\"operation\":\"update\",\"txid\":${JSON.stringify(
String(txid)
)},\"timestamp\":${JSON.stringify(timestamp)}}}`;
};
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "prisma-streams",
"version": "0.1.4",
"version": "0.1.5",
"private": true,
"description": "Prisma Streams is a Bun + TypeScript implementation of the Durable Streams HTTP protocol, with a local development server for Prisma CLI integration.",
"repository": {
Expand Down Expand Up @@ -73,6 +73,7 @@
"./package.json": "./package.json"
},
"devDependencies": {
"@durable-streams/state": "0.2.5",
"@durable-streams/server-conformance-tests": "^0.2.1",
"bun-types": "^1.3.6",
"fast-check": "^3.14.0",
Expand Down
11 changes: 6 additions & 5 deletions scripts/test-bun-local-package.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ try {
if (localPackageManifest.engines?.bun !== localPackageBunEngine) {
throw new Error(`@prisma/streams-local should publish bun ${localPackageBunEngine}, got ${localPackageManifest.engines?.bun}`);
}
const packOutput = run("npm", ["pack", "--pack-destination", packDir], localPackageDir);
const tarballName = packOutput.split(/\r?\n/).filter(Boolean).at(-1);
if (!tarballName) throw new Error("npm pack did not produce a tarball name");
const tarballPath = join(packDir, tarballName);
const tarballPath = run("bun", ["pm", "pack", "--destination", packDir, "--quiet"], localPackageDir)
.split(/\r?\n/)
.filter(Boolean)
.at(-1);
if (!tarballPath) throw new Error("bun pm pack did not produce a tarball path");

writeFileSync(
join(consumerDir, "package.json"),
Expand Down Expand Up @@ -243,7 +244,7 @@ try {
type: "posts",
key: "post:1",
value: { id: "post:1", title: "hello" },
oldValue: null,
old_value: null,
headers: {
operation: "insert",
timestamp: new Date().toISOString(),
Expand Down
9 changes: 5 additions & 4 deletions scripts/test-bun-server-package.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ try {
mkdirSync(consumerDir, { recursive: true });

const serverPackageDir = join(repoRoot, "dist", "npm", "streams-server");
const packOutput = run("npm", ["pack", "--pack-destination", packDir], serverPackageDir);
const tarballName = packOutput.split(/\r?\n/).filter(Boolean).at(-1);
if (!tarballName) throw new Error("npm pack did not produce a tarball name");
const tarballPath = join(packDir, tarballName);
const tarballPath = run("bun", ["pm", "pack", "--destination", packDir, "--quiet"], serverPackageDir)
.split(/\r?\n/)
.filter(Boolean)
.at(-1);
if (!tarballPath) throw new Error("bun pm pack did not produce a tarball path");

writeFileSync(
join(consumerDir, "package.json"),
Expand Down
Loading
Loading