Skip to content

Commit 7ad68ab

Browse files
committed
feat(webapp): add a new backend for the realtime runs feed
Adds an opt-in backend for realtime run subscriptions (single runs, tag lists, and batches), selected per environment by a feature flag with a global env switch, both defaulting off so nothing changes until enabled. Run changes are signalled over Redis pub/sub; a live subscription wakes, refetches the current rows from a read replica, and re-emits them, resolving tag and batch membership from ClickHouse. Concurrent subscribers watching the same runs, tags, or batch share a single resolve-and-hydrate per short window, so read load scales with distinct filters rather than connection count.
1 parent fa4804e commit 7ad68ab

29 files changed

Lines changed: 3725 additions & 6 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add a new backend for the realtime runs feed (single runs, tags, and batches) that scales under high concurrency, available behind a feature flag

apps/webapp/app/entry.server.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
registerRunEngineEventBusHandlers,
2828
setupBatchQueueCallbacks,
2929
} from "./v3/runEngineHandlers.server";
30+
import { registerRunChangeNotifierHandlers } from "./services/realtime/runChangeNotifierHandlers.server";
3031
// Touch the sessions replication singleton at entry so it boots deterministically
3132
// on webapp startup. The singleton's initializer wires start (gated on
3233
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
@@ -269,6 +270,9 @@ process.on("uncaughtException", (error, origin) => {
269270

270271
singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
271272
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
273+
// Attach the run-changed notifier delegations to the engine event bus.
274+
// No-ops (registers nothing) unless REALTIME_NOTIFIER_ENABLED=1.
275+
singleton("RunChangeNotifierHandlers", registerRunChangeNotifierHandlers);
272276

273277
// Wrapped in singleton() so Remix's dev-mode CJS reloads don't append
274278
// duplicate copies of the processor — Sentry's processor list lives in

apps/webapp/app/env.server.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,31 @@ const EnvironmentSchema = z
300300
.int()
301301
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds
302302

303+
// Master switch for the notifier-backed realtime feed.
304+
// "0" (default) = the existing realtime path serves everything, publishes are
305+
// no-ops, and no notifier Redis connections are opened (zero-overhead off).
306+
// "1" = run-changed signals are published and the per-org `realtimeBackend`
307+
// feature flag selects the backend per request.
308+
REALTIME_NOTIFIER_ENABLED: z.string().default("0"),
309+
// Backstop wait before a live notifier request refetches the run (ms).
310+
REALTIME_NOTIFIER_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(5_000),
311+
// Hard cap on the tag-list snapshot size served by the notifier feed.
312+
REALTIME_NOTIFIER_MAX_LIST_RESULTS: z.coerce.number().int().default(1_000),
313+
// Short-TTL coalescing cache for the multi-run (tag-list/batch) resolve+hydrate.
314+
// Concurrent same-filter feeds share one ClickHouse resolve + Postgres hydrate
315+
// within this window, so an env-wide wake doesn't fan out into per-feed queries.
316+
// Staleness budget: a newly-matching run is visible within ~ttl + poll interval.
317+
REALTIME_NOTIFIER_RUNSET_CACHE_TTL_MS: z.coerce.number().int().default(1_000),
318+
REALTIME_NOTIFIER_RUNSET_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
319+
// Cap on the per-handle working-set cache (runId -> updatedAt) the notifier keeps
320+
// for diffing multi-run live polls.
321+
REALTIME_NOTIFIER_WORKING_SET_MAX_ENTRIES: z.coerce.number().int().default(10_000),
322+
// Quantize the tag-list createdAt lower bound to this epoch-aligned bucket (ms) so
323+
// same-tag feeds that pin their window within the same bucket share one resolve+
324+
// hydrate cache entry. Floored, so the window only ever widens by < bucket. 0
325+
// disables bucketing (each feed keeps its exact lower bound).
326+
REALTIME_NOTIFIER_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
327+
303328
PUBSUB_REDIS_HOST: z
304329
.string()
305330
.optional()

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
77
import { authenticateApiRequest } from "~/services/apiAuth.server";
88
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
99
import { logger } from "~/services/logger.server";
10+
import { publishRunChanged } from "~/services/realtime/runChangeNotifierInstance.server";
1011
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
1112

1213
// Pull the existing tags out of a buffer entry's serialised payload so
@@ -90,6 +91,8 @@ export async function action({ request, params }: ActionFunctionArgs) {
9091
},
9192
data: { runTags: { push: newTags } },
9293
});
94+
// Delegate a run-changed notify (no-op unless enabled).
95+
publishRunChanged({ runId: taskRun.id, environmentId: env.id });
9396
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
9497
},
9598
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates

apps/webapp/app/routes/realtime.v1.batches.$batchId.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { z } from "zod";
22
import { $replica } from "~/db.server";
33
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
4-
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
4+
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
55
import { anyResource, createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
66

77
const ParamsSchema = z.object({
@@ -33,7 +33,11 @@ export const loader = createLoaderApiRoute(
3333
},
3434
},
3535
async ({ authentication, request, resource: batchRun, apiVersion }) => {
36-
return realtimeClient.streamBatch(
36+
// Pick the Electric proxy or the notifier-backed batch feed
37+
// per org (defaults to Electric). Both implement streamBatch.
38+
const client = await resolveRealtimeStreamClient(authentication.environment);
39+
40+
return client.streamBatch(
3741
request.url,
3842
authentication.environment,
3943
batchRun.id,

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
44
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
5-
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
5+
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
66
import {
77
anyResource,
88
createLoaderApiRoute,
@@ -48,7 +48,12 @@ export const loader = createLoaderApiRoute(
4848
},
4949
},
5050
async ({ authentication, request, resource: run, apiVersion }) => {
51-
return realtimeClient.streamRun(
51+
// Pick the Electric proxy or the notifier-backed shim per org (defaults to
52+
// Electric; controlled by REALTIME_NOTIFIER_ENABLED + the realtimeBackend
53+
// feature flag). Both implement the same streamRun contract.
54+
const client = await resolveRealtimeStreamClient(authentication.environment);
55+
56+
return client.streamRun(
5257
request.url,
5358
authentication.environment,
5459
run.id,

apps/webapp/app/routes/realtime.v1.runs.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { z } from "zod";
22
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
3-
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
3+
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
44
import {
55
anyResource,
66
createLoaderApiRoute,
@@ -39,7 +39,11 @@ export const loader = createLoaderApiRoute(
3939
},
4040
},
4141
async ({ searchParams, authentication, request, apiVersion }) => {
42-
return realtimeClient.streamRuns(
42+
// Pick the Electric proxy or the notifier-backed tag-list feed per org
43+
// (defaults to Electric). Both implement streamRuns.
44+
const client = await resolveRealtimeStreamClient(authentication.environment);
45+
46+
return client.streamRuns(
4347
request.url,
4448
authentication.environment,
4549
searchParams,
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Tiny in-process bounded TTL cache shared by the realtime feeds.
3+
*
4+
* Entries expire after `ttlMs`. An expired entry is evicted when read (`get`); on
5+
* write, if the cache is at `maxEntries`, expired entries are swept and, if it's
6+
* still full (pathologically all live), the oldest insertion is dropped. Node is
7+
* single-threaded so no locking is needed. Used where a miss is cheap and
8+
* correctness-safe (read-through hydration, per-handle working sets, per-org flag
9+
* resolution).
10+
*
11+
* A stored value of `undefined` cannot be distinguished from a miss; callers that
12+
* need to cache "absence" should store an explicit sentinel (e.g. `null`).
13+
*/
14+
export class BoundedTtlCache<V> {
15+
readonly #entries = new Map<string, { value: V; expiresAt: number }>();
16+
17+
constructor(
18+
private readonly ttlMs: number,
19+
private readonly maxEntries: number
20+
) {}
21+
22+
get(key: string): V | undefined {
23+
const entry = this.#entries.get(key);
24+
if (!entry) {
25+
return undefined;
26+
}
27+
if (entry.expiresAt > Date.now()) {
28+
return entry.value;
29+
}
30+
// Evict on read so expired entries don't linger until the next at-capacity
31+
// sweep — important for read-heavy / low-churn caches (per-handle working sets).
32+
this.#entries.delete(key);
33+
return undefined;
34+
}
35+
36+
set(key: string, value: V): void {
37+
if (this.#entries.size >= this.maxEntries) {
38+
const now = Date.now();
39+
for (const [key, entry] of this.#entries) {
40+
if (entry.expiresAt <= now) {
41+
this.#entries.delete(key);
42+
}
43+
}
44+
if (this.#entries.size >= this.maxEntries) {
45+
const oldest = this.#entries.keys().next().value;
46+
if (oldest !== undefined) {
47+
this.#entries.delete(oldest);
48+
}
49+
}
50+
}
51+
this.#entries.set(key, { value, expiresAt: Date.now() + this.ttlMs });
52+
}
53+
54+
get size(): number {
55+
return this.#entries.size;
56+
}
57+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { type ClickHouse } from "@internal/clickhouse";
2+
import { type PrismaClientOrTransaction } from "~/db.server";
3+
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
4+
import { type RunListFilter, type RunListResolver } from "./runReader.server";
5+
6+
export type ClickHouseRunListResolverOptions = {
7+
/** Resolves the per-organization ClickHouse client (multi-tenant routing). */
8+
getClickhouse: (organizationId: string) => Promise<ClickHouse>;
9+
prisma: PrismaClientOrTransaction;
10+
};
11+
12+
/**
13+
* Resolves the realtime tag/list filter into matching run ids via ClickHouse
14+
* `listRunIds`. Tag matching is contains-ANY (OR), the same
15+
* semantics the dashboard runs list uses. Filter-only: ids only, hydrated from
16+
* Postgres by id afterward. This keeps the realtime tag feed off the Postgres
17+
* `runTags` GIN index entirely.
18+
*
19+
* (Multi-tag subscribeToRunsWithTag is therefore OR, not the AND that Electric's
20+
* `runTags @> ARRAY[...]` shape used. Restoring AND is a follow-up: add a
21+
* `hasAll` mode to the ClickHouse runs filter and use it here.)
22+
*/
23+
export class ClickHouseRunListResolver implements RunListResolver {
24+
constructor(private readonly options: ClickHouseRunListResolverOptions) {}
25+
26+
async resolveMatchingRunIds(filter: RunListFilter): Promise<string[]> {
27+
const clickhouse = await this.options.getClickhouse(filter.organizationId);
28+
const repository = new RunsRepository({ clickhouse, prisma: this.options.prisma });
29+
30+
return repository.listRunIds({
31+
organizationId: filter.organizationId,
32+
projectId: filter.projectId,
33+
environmentId: filter.environmentId,
34+
tags: filter.tags && filter.tags.length > 0 ? filter.tags : undefined,
35+
batchId: filter.batchId,
36+
from: filter.createdAtAfter?.getTime(),
37+
page: { size: filter.limit },
38+
});
39+
}
40+
}

0 commit comments

Comments
 (0)