Skip to content

Commit 389e709

Browse files
committed
fix(webapp): stop concurrent batch subscribers sharing a realtime working set
Batch feeds derived their stream handle from the batchId, so two subscribers to the same batch on one instance shared a working-set cache entry and one could permanently suppress the other's deltas, including terminal status updates. Handles are now minted per connection with a collision-proof suffix and working-set entries are keyed by environment, so a client-echoed handle can never read or overwrite another connection's state. Also: mid-run metadata updates carry the batchId so batch feeds fast-wake instead of waiting for the backstop, and the run-engine metadata handler publishes after the write lands so hydration sees the new row.
1 parent 3f83f49 commit 389e709

5 files changed

Lines changed: 59 additions & 20 deletions

File tree

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ const { action } = createActionApiRoute(
187187
if (pgResult) {
188188
// Reflect metadata.set() on a live feed before the next lifecycle event. Publish the
189189
// internal id (the router keys single-run feeds by it, not the friendly id from the URL).
190-
publishChangeRecord({ runId: pgResult.runId, envId: env.id });
190+
publishChangeRecord({ runId: pgResult.runId, envId: env.id, batchId: pgResult.batchId });
191191
return json({ metadata: pgResult.metadata }, { status: 200 });
192192
}
193193

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ export class UpdateMetadataService {
308308
},
309309
select: {
310310
id: true,
311+
batchId: true,
311312
completedAt: true,
312313
status: true,
313314
metadata: true,
@@ -355,8 +356,9 @@ export class UpdateMetadataService {
355356

356357
return {
357358
metadata: newMetadata,
358-
// Internal id, so callers can publish realtime records keyed how the router indexes feeds.
359+
// Internal id + batchId, so callers can publish realtime records keyed how the router indexes feeds.
359360
runId: taskRun.id,
361+
batchId: taskRun.batchId,
360362
};
361363
}
362364

apps/webapp/app/services/realtime/notifierRealtimeClient.server.ts

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -364,15 +364,11 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
364364
clientVersion?: string,
365365
signal?: AbortSignal
366366
): Promise<Response> {
367-
const { offset, isLive, skipColumns } = this.#parseStreamRequest(url, requestOptions);
367+
const { offset, handle, isLive, skipColumns } = this.#parseStreamRequest(url, requestOptions);
368368

369-
// The batch set is fully defined by batchId (the route resolves it from the
370-
// friendlyId on every request), so the handle is derived and stable and there's
371-
// no createdAt window to pin.
372-
const handle = `batch-${batchId}`;
373369
const filter: RunSetFilter = { batchId };
374370

375-
if (offset !== INITIAL_OFFSET && isLive) {
371+
if (offset !== INITIAL_OFFSET && handle && isLive) {
376372
return this.#runSetLiveResponse(
377373
environment,
378374
filter,
@@ -385,11 +381,13 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
385381
);
386382
}
387383

388-
// Initial snapshot + non-live catch-up.
384+
// Initial snapshot + non-live catch-up. The handle must be per-connection, never
385+
// derived from the batchId: working sets are keyed by handle, and a shared handle
386+
// lets one subscriber's emit permanently suppress the same row for another.
389387
return this.#runSetSnapshotResponse(
390388
environment,
391389
filter,
392-
handle,
390+
handle ?? this.#mintBatchHandle(batchId),
393391
skipColumns,
394392
apiVersion,
395393
clientVersion
@@ -519,7 +517,7 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
519517
seen.set(row.id, updatedAtMs);
520518
maxUpdatedAt = Math.max(maxUpdatedAt, updatedAtMs);
521519
}
522-
this.#workingSetCache.set(handle, seen);
520+
this.#workingSetCache.set(this.#workingSetKey(environment.id, handle), seen);
523521

524522
return this.#buildResponse(buildRowsBody(changes, skipColumns), apiVersion, clientVersion, {
525523
offset: encodeOffset(maxUpdatedAt, this.#nextSeq()),
@@ -556,7 +554,8 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
556554

557555
// Working set we diff against: seeded from the cache (or the offset floor on a
558556
// miss) and advanced on each refetch within this held request.
559-
let prevSeen = this.#workingSetCache.get(handle);
557+
const workingSetKey = this.#workingSetKey(environment.id, handle);
558+
let prevSeen = this.#workingSetCache.get(workingSetKey);
560559

561560
const emitFromSerialized = (changes: SerializedRowChange[], maxUpdatedAt: number): Response => {
562561
const seq = this.#nextSeq();
@@ -614,7 +613,7 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
614613
// Merge (not replace): the router only surfaced the changed subset, so keep the
615614
// rest of the working set intact. The backstop full-resolve rebuilds it.
616615
const merged = this.#mergeWorkingSet(prevSeen, touched);
617-
this.#workingSetCache.set(handle, merged);
616+
this.#workingSetCache.set(workingSetKey, merged);
618617
prevSeen = merged;
619618

620619
if (changes.length > 0) {
@@ -636,7 +635,7 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
636635
prevSeen,
637636
offsetFloorMs
638637
);
639-
this.#workingSetCache.set(handle, touched);
638+
this.#workingSetCache.set(workingSetKey, touched);
640639
prevSeen = touched;
641640

642641
if (changes.length > 0) {
@@ -881,7 +880,23 @@ export class NotifierRealtimeClient implements RealtimeStreamClient {
881880
#mintListHandle(createdAtFilterMs: number): string {
882881
// Pins the createdAt threshold in the opaque handle so live polls reuse the
883882
// same lower bound even on a working-set cache miss.
884-
return `runs_${Math.trunc(createdAtFilterMs)}_${this.#nextSeq()}`;
883+
return `runs_${Math.trunc(createdAtFilterMs)}_${this.#mintUniqueSuffix()}`;
884+
}
885+
886+
#mintBatchHandle(batchId: string): string {
887+
return `batch_${batchId}_${this.#mintUniqueSuffix()}`;
888+
}
889+
890+
#mintUniqueSuffix(): string {
891+
// The seq alone isn't unique across instances/restarts; behind a non-sticky ALB a
892+
// collision would land two connections on one working-set cache entry.
893+
return `${this.#nextSeq()}_${randomUUID().slice(0, 8)}`;
894+
}
895+
896+
#workingSetKey(environmentId: string, handle: string): string {
897+
// The handle is client-echoed; env-prefix the key so a foreign handle can never
898+
// read or overwrite another tenant's working set.
899+
return `${environmentId}:${handle}`;
885900
}
886901

887902
#filterMsFromHandle(handle: string): number | undefined {

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -571,12 +571,11 @@ export function registerRunEngineEventBusHandlers() {
571571

572572
const { environment, runTags, batchId } = result;
573573

574-
// Realtime run-changed publish: a full record (env + tags + batchId all from the one
575-
// read above), so tag/batch feeds route by index instead of hydrate-to-classify.
576-
publishChangeRecord({ runId: run.id, envId: environment.id, tags: runTags, batchId });
577-
578574
try {
579575
await updateMetadataService.call(run.id, run.metadata, environment);
576+
// Realtime run-changed publish, after the write so the router's hydrate sees the new
577+
// row. A full record (env + tags + batchId), so feeds route by index.
578+
publishChangeRecord({ runId: run.id, envId: environment.id, tags: runTags, batchId });
580579
} catch (e) {
581580
if (e instanceof MetadataTooLargeError) {
582581
logger.warn("[runMetadataUpdated] Failed to update metadata, too large", {

apps/webapp/test/realtime/notifierRunSetCache.test.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,29 @@ describe("NotifierRealtimeClient run-set resolve coalescing + cache", () => {
127127
await snapshot(client, "batch_1");
128128
expect(results).toEqual(["miss", "hit"]);
129129
});
130+
131+
it("mints a distinct batch handle per connection and echoes a client-provided one", async () => {
132+
const { client } = makeClient();
133+
// Two subscribers to the SAME batch must never share a handle (the working-set
134+
// cache is keyed by it; sharing lets one suppress the other's deltas forever).
135+
const res1 = await snapshot(client, "batch_1");
136+
const res2 = await snapshot(client, "batch_1");
137+
const h1 = res1.headers.get("electric-handle");
138+
const h2 = res2.headers.get("electric-handle");
139+
expect(h1).toBeTruthy();
140+
expect(h1).not.toBe(h2);
141+
142+
// Catch-up under an existing handle keeps it.
143+
const res3 = await client.streamBatch(
144+
`http://localhost:3030/realtime/v1/batches/batch_1?offset=123_1&handle=${h1}`,
145+
ENV,
146+
"batch_1",
147+
CURRENT_API_VERSION,
148+
undefined,
149+
"1.0.0"
150+
);
151+
expect(res3.headers.get("electric-handle")).toBe(h1);
152+
});
130153
});
131154

132155
describe("NotifierRealtimeClient resolve admission gate (mass-reconnect stampede)", () => {
@@ -304,7 +327,7 @@ describe("NotifierRealtimeClient review fixes", () => {
304327
livePollTimeoutMs: 50,
305328
});
306329
const res = await client.streamBatch(
307-
"http://localhost:3030/realtime/v1/batches/batch_1?offset=123_1&live=true",
330+
"http://localhost:3030/realtime/v1/batches/batch_1?offset=123_1&live=true&handle=batch_batch_1_7_abc",
308331
ENV,
309332
"batch_1",
310333
CURRENT_API_VERSION,

0 commit comments

Comments
 (0)