diff --git a/docs/releasing.md b/docs/releasing.md index 48c7107..394684e 100644 --- a/docs/releasing.md +++ b/docs/releasing.md @@ -47,6 +47,8 @@ into temporary consumers, and verify: - Node end-to-end usage of `@prisma/streams-local` - Bun end-to-end usage of `@prisma/streams-local`, including the live `/touch/*` path +- stateful local-runtime reopen flows that must read `/_schema` and skip + duplicate first-schema installs when the registry already matches - local package exposure of `GET /v1/server/_details` and `GET /v1/stream/{name}/_routing_keys` - Bun CLI startup for `@prisma/streams-server` diff --git a/docs/schemas.md b/docs/schemas.md index 07782aa..ba85785 100644 --- a/docs/schemas.md +++ b/docs/schemas.md @@ -114,6 +114,10 @@ Important rule: - a search-only update requires an already-installed schema version - if you are installing the first schema for a stream, install `schema` and `search` together in the same `_schema` request +- first-schema installation is not idempotent after data exists; stateful + clients that reopen an existing stream must `GET /_schema` first and skip the + install when the current registry already matches the desired schema/search + configuration Not supported: diff --git a/scripts/test-bun-local-package.mjs b/scripts/test-bun-local-package.mjs index 3b525fe..ef7263c 100644 --- a/scripts/test-bun-local-package.mjs +++ b/scripts/test-bun-local-package.mjs @@ -71,6 +71,17 @@ const server = await startLocalDurableStreamsServer({ const baseUrl = server.exports.http.url; const stream = "state"; +const schemaStream = "schema-reopen"; +const schemaUpdate = { + schema: { + type: "object", + additionalProperties: false, + required: ["repo"], + properties: { + repo: { type: "string" }, + }, + }, +}; async function fetchJson(url, init) { const res = await fetch(url, init); @@ -78,6 +89,27 @@ async function fetchJson(url, init) { return { status: res.status, body: text ? JSON.parse(text) : null }; } +async function ensureSchemaInstalled(baseUrl, stream, update) { + const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" }); + if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`); + + const currentSchema = current.body?.schemas?.["1"] ?? null; + const alreadyMatches = + current.body?.currentVersion === 1 && + JSON.stringify(currentSchema) === JSON.stringify(update.schema) && + JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) && + JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null); + + if (alreadyMatches) return; + + const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(update), + }); + if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`); +} + try { const serverDetails = await fetchJson(\`\${baseUrl}/v1/server/_details\`, { method: "GET" }); if (serverDetails.status !== 200) throw new Error(\`/v1/server/_details failed: \${serverDetails.status}\`); @@ -144,6 +176,23 @@ try { } } + { + const res = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, { + method: "PUT", + headers: { "content-type": "application/json" }, + }); + if (res.status !== 201 && res.status !== 200) throw new Error(\`schema stream PUT failed: \${res.status}\`); + + await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate); + + const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify([{ repo: "alpha/repo" }]), + }); + if (append.status !== 204) throw new Error(\`schema stream append failed: \${append.status}\`); + } + const activate = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/touch/templates/activate\`, { method: "POST", headers: { "content-type": "application/json" }, @@ -213,6 +262,84 @@ try { ); run("bun", ["consumer.mjs"], consumerDir); + + writeFileSync( + join(consumerDir, "consumer-reopen.mjs"), + ` +import { startLocalDurableStreamsServer } from "@prisma/streams-local"; + +const server = await startLocalDurableStreamsServer({ + name: "${localServerName}", + port: 0, + hostname: "127.0.0.1", +}); + +const baseUrl = server.exports.http.url; +const schemaStream = "schema-reopen"; +const schemaUpdate = { + schema: { + type: "object", + additionalProperties: false, + required: ["repo"], + properties: { + repo: { type: "string" }, + }, + }, +}; + +async function fetchJson(url, init) { + const res = await fetch(url, init); + const text = await res.text(); + return { status: res.status, body: text ? JSON.parse(text) : null }; +} + +async function ensureSchemaInstalled(baseUrl, stream, update) { + const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" }); + if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`); + + const currentSchema = current.body?.schemas?.["1"] ?? null; + const alreadyMatches = + current.body?.currentVersion === 1 && + JSON.stringify(currentSchema) === JSON.stringify(update.schema) && + JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) && + JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null); + + if (alreadyMatches) return; + + const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(update), + }); + if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`); +} + +try { + await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate); + + const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify([{ repo: "beta/repo" }]), + }); + if (append.status !== 204) throw new Error(\`schema stream reopen append failed: \${append.status}\`); + + const read = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}?offset=-1&format=json\`, { + method: "GET", + }); + if (read.status !== 200) throw new Error(\`schema stream reopen read failed: \${read.status}\`); + if (JSON.stringify(read.body) !== JSON.stringify([{ repo: "alpha/repo" }, { repo: "beta/repo" }])) { + throw new Error(\`unexpected schema stream reopen read: \${JSON.stringify(read.body)}\`); + } + + console.log(JSON.stringify({ ok: true, reopen: true, url: baseUrl })); +} finally { + await server.close(); +} +` + ); + + run("bun", ["consumer-reopen.mjs"], consumerDir); } finally { rmSync(tmpRoot, { recursive: true, force: true }); } diff --git a/scripts/test-node-local-package.mjs b/scripts/test-node-local-package.mjs index 69a09a4..819927f 100644 --- a/scripts/test-node-local-package.mjs +++ b/scripts/test-node-local-package.mjs @@ -71,6 +71,17 @@ const server = await startLocalDurableStreamsServer({ const baseUrl = server.exports.http.url; const stream = "state"; +const schemaStream = "schema-reopen"; +const schemaUpdate = { + schema: { + type: "object", + additionalProperties: false, + required: ["repo"], + properties: { + repo: { type: "string" }, + }, + }, +}; async function fetchJson(url, init) { const res = await fetch(url, init); @@ -78,6 +89,27 @@ async function fetchJson(url, init) { return { status: res.status, body: text ? JSON.parse(text) : null }; } +async function ensureSchemaInstalled(baseUrl, stream, update) { + const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" }); + if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`); + + const currentSchema = current.body?.schemas?.["1"] ?? null; + const alreadyMatches = + current.body?.currentVersion === 1 && + JSON.stringify(currentSchema) === JSON.stringify(update.schema) && + JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) && + JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null); + + if (alreadyMatches) return; + + const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(update), + }); + if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`); +} + try { const serverDetails = await fetchJson(\`\${baseUrl}/v1/server/_details\`, { method: "GET" }); if (serverDetails.status !== 200) throw new Error(\`/v1/server/_details failed: \${serverDetails.status}\`); @@ -144,6 +176,23 @@ try { } } + { + const res = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, { + method: "PUT", + headers: { "content-type": "application/json" }, + }); + if (res.status !== 201 && res.status !== 200) throw new Error(\`schema stream PUT failed: \${res.status}\`); + + await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate); + + const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify([{ repo: "alpha/repo" }]), + }); + if (append.status !== 204) throw new Error(\`schema stream append failed: \${append.status}\`); + } + const activate = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/touch/templates/activate\`, { method: "POST", headers: { "content-type": "application/json" }, @@ -213,6 +262,84 @@ try { ); run("node", ["consumer.mjs"], consumerDir); + + writeFileSync( + join(consumerDir, "consumer-reopen.mjs"), + ` +import { startLocalDurableStreamsServer } from "@prisma/streams-local"; + +const server = await startLocalDurableStreamsServer({ + name: "${localServerName}", + port: 0, + hostname: "127.0.0.1", +}); + +const baseUrl = server.exports.http.url; +const schemaStream = "schema-reopen"; +const schemaUpdate = { + schema: { + type: "object", + additionalProperties: false, + required: ["repo"], + properties: { + repo: { type: "string" }, + }, + }, +}; + +async function fetchJson(url, init) { + const res = await fetch(url, init); + const text = await res.text(); + return { status: res.status, body: text ? JSON.parse(text) : null }; +} + +async function ensureSchemaInstalled(baseUrl, stream, update) { + const current = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { method: "GET" }); + if (current.status !== 200) throw new Error(\`schema get failed: \${current.status}\`); + + const currentSchema = current.body?.schemas?.["1"] ?? null; + const alreadyMatches = + current.body?.currentVersion === 1 && + JSON.stringify(currentSchema) === JSON.stringify(update.schema) && + JSON.stringify(current.body?.routingKey ?? null) === JSON.stringify(update.routingKey ?? null) && + JSON.stringify(current.body?.search ?? null) === JSON.stringify(update.search ?? null); + + if (alreadyMatches) return; + + const install = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(stream)}/_schema\`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(update), + }); + if (install.status !== 200) throw new Error(\`schema install failed: \${install.status}\`); +} + +try { + await ensureSchemaInstalled(baseUrl, schemaStream, schemaUpdate); + + const append = await fetch(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}\`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify([{ repo: "beta/repo" }]), + }); + if (append.status !== 204) throw new Error(\`schema stream reopen append failed: \${append.status}\`); + + const read = await fetchJson(\`\${baseUrl}/v1/stream/\${encodeURIComponent(schemaStream)}?offset=-1&format=json\`, { + method: "GET", + }); + if (read.status !== 200) throw new Error(\`schema stream reopen read failed: \${read.status}\`); + if (JSON.stringify(read.body) !== JSON.stringify([{ repo: "alpha/repo" }, { repo: "beta/repo" }])) { + throw new Error(\`unexpected schema stream reopen read: \${JSON.stringify(read.body)}\`); + } + + console.log(JSON.stringify({ ok: true, reopen: true, url: baseUrl })); +} finally { + await server.close(); +} +` + ); + + run("node", ["consumer-reopen.mjs"], consumerDir); } finally { rmSync(tmpRoot, { recursive: true, force: true }); } diff --git a/test/local_server.test.ts b/test/local_server.test.ts index 4b238eb..d5145b5 100644 --- a/test/local_server.test.ts +++ b/test/local_server.test.ts @@ -80,6 +80,78 @@ describe("local durable streams server", () => { }); }); + test("consumer reopen flow can read the current schema registry and skip duplicate schema install", async () => { + await withLocalRoot("schema-reopen", async () => { + const name = "schema-reopen"; + const stream = "local-schema-reopen"; + const desiredSchema = { + type: "object", + additionalProperties: false, + required: ["repo"], + properties: { + repo: { type: "string" }, + }, + }; + + async function ensureInstalledSchema(baseUrl: string): Promise { + const current = await fetch(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}/_schema`, { method: "GET" }); + expect(current.status).toBe(200); + const registry = await current.json(); + const currentSchema = registry?.schemas?.["1"] ?? null; + const alreadyMatches = + registry?.currentVersion === 1 && JSON.stringify(currentSchema) === JSON.stringify(desiredSchema); + if (alreadyMatches) return; + + const install = await fetch(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}/_schema`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ schema: desiredSchema }), + }); + expect(install.status).toBe(200); + } + + const server1 = await startLocalDurableStreamsServer({ name, port: 0 }); + try { + const baseUrl = server1.exports.http.url; + let res = await fetch(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}`, { + method: "PUT", + headers: { "content-type": "application/json" }, + }); + expect([200, 201]).toContain(res.status); + + await ensureInstalledSchema(baseUrl); + + res = await fetch(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify([{ repo: "alpha/repo" }]), + }); + expect(res.status).toBe(204); + } finally { + await server1.close(); + } + + const server2 = await startLocalDurableStreamsServer({ name, port: 0 }); + try { + const baseUrl = server2.exports.http.url; + await ensureInstalledSchema(baseUrl); + + const append = await fetch(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify([{ repo: "beta/repo" }]), + }); + expect(append.status).toBe(204); + + const read = await fetch(`${baseUrl}/v1/stream/${encodeURIComponent(stream)}?offset=-1&format=json`); + expect(read.status).toBe(200); + expect(await read.json()).toEqual([{ repo: "alpha/repo" }, { repo: "beta/repo" }]); + } finally { + await server2.close(); + } + }); + }); + test("does not create segment/cache artifacts in local mode", async () => { await withLocalRoot("files", async () => { const name = "files";