Skip to content

Commit f9588ca

Browse files
committed
fix(webapp): catch change-routing failures instead of crashing the process
The realtime change router invoked its async routing tick fire-and-forget without a catch, so a transient database error during hydration became an unhandled promise rejection, which exits the process. Failures are now caught and logged; affected feeds fall back to their full-resolve backstop.
1 parent 389e709 commit f9588ca

2 files changed

Lines changed: 31 additions & 3 deletions

File tree

apps/webapp/app/services/realtime/envChangeRouter.server.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { type ChangeRecord } from "./runChangeNotifier.server";
22
import { type RealtimeRunRow, serializeRunRow } from "./electricStreamProtocol.server";
3+
import { logger } from "~/services/logger.server";
34

45
/**
56
* EnvChangeRouter — the per-instance routing layer that turns "feeds as predicates over
@@ -182,9 +183,15 @@ export class EnvChangeRouter {
182183
};
183184
this.#envs.set(environmentId, env);
184185
env.unsubscribe = this.options.source.subscribeToEnv(environmentId, (records) => {
185-
// Fire-and-forget; the notifier doesn't await us. Errors fall through to the feeds'
186-
// backstop (a hydrate failure leaves waiters to time out into a full resolve).
187-
void this.#onBatch(environmentId, env, records);
186+
// Fire-and-forget; the notifier doesn't await us. A hydrate failure must be caught
187+
// here (an unhandled rejection exits the process); the matched feeds' waiters stay
188+
// armed and time out into the full-resolve backstop.
189+
this.#onBatch(environmentId, env, records).catch((error) => {
190+
logger.error("[envChangeRouter] failed to route a change batch", {
191+
environmentId,
192+
error,
193+
});
194+
});
188195
});
189196
return env;
190197
}

apps/webapp/test/realtime/envChangeRouter.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,27 @@ describe("EnvChangeRouter", () => {
9898
regs.forEach((r) => r.close());
9999
});
100100

101+
it("a hydrate failure doesn't reject out of the source callback; the feed times out", async () => {
102+
const src = fakeSource();
103+
const hydrateSpy = vi.fn<RowHydrator["hydrateByIds"]>(async () => {
104+
throw new Error("replica down");
105+
});
106+
const router = new EnvChangeRouter({
107+
source: src.source,
108+
hydrator: { hydrateByIds: hydrateSpy },
109+
});
110+
const reg = router.register("env_1", { kind: "run", runId: "r1" }, []);
111+
const wait = reg.waitForMatch(undefined, 50);
112+
113+
// Would be an unhandled rejection (process exit) if #onBatch's promise were unguarded.
114+
src.push("env_1", [record("r1")]);
115+
116+
const result = await wait;
117+
expect(result.reason).toBe("timeout");
118+
expect(hydrateSpy).toHaveBeenCalledTimes(1);
119+
reg.close();
120+
});
121+
101122
it("routes a run feed by exact runId", async () => {
102123
const rows = new Map([["r1", row("r1")]]);
103124
const { router, src } = makeRouter(rows);

0 commit comments

Comments
 (0)