Skip to content

Commit 1aa6373

Browse files
committed
fix(webapp): fix Redis connection leak in realtime streams and broken abort signal propagation
Pool Redis connections for non-blocking ops (ingestData, appendPart, getLastChunkIndex) using a shared singleton instead of new Redis() per request. Use redis.disconnect() for immediate teardown in streamResponse cleanup. Add 15s inactivity timeout fallback. Fix broken request.signal in Remix/Express by wiring Express res.on('close') to an AbortController via httpAsyncStorage. All SSE/streaming routes now use getRequestAbortSignal() which fires reliably on client disconnect, bypassing the Node.js undici GC bug (nodejs/node#55428) that severs the signal chain.
1 parent 0c33de8 commit 1aa6373

File tree

12 files changed

+74
-36
lines changed

12 files changed

+74
-36
lines changed

apps/webapp/CLAUDE.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ Use the `chrome-devtools` MCP server to visually verify local dashboard changes.
5959
Routes use Remix flat-file convention with dot-separated segments:
6060
`api.v1.tasks.$taskId.trigger.ts` -> `/api/v1/tasks/:taskId/trigger`
6161

62+
## Abort Signals
63+
64+
**Never use `request.signal`** for detecting client disconnects. It is broken due to a Node.js bug ([nodejs/node#55428](https://github.com/nodejs/node/issues/55428)) where the AbortSignal chain is severed when Remix internally clones the Request object. Instead, use `getRequestAbortSignal()` from `app/services/httpAsyncStorage.server.ts`, which is wired directly to Express `res.on("close")` and fires reliably.
65+
66+
```typescript
67+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
68+
69+
// In route handlers, SSE streams, or any server-side code:
70+
const signal = getRequestAbortSignal();
71+
```
72+
6273
## Environment Variables
6374

6475
Access via `env` export from `app/env.server.ts`. **Never use `process.env` directly.**

apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { type TaskRunAttempt } from "@trigger.dev/database";
22
import { eventStream } from "remix-utils/sse/server";
33
import { type PrismaClient, prisma } from "~/db.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
45
import { logger } from "~/services/logger.server";
56
import { projectPubSub } from "~/v3/services/projectPubSub.server";
67

@@ -63,7 +64,9 @@ export class TasksStreamPresenter {
6364

6465
const subscriber = await projectPubSub.subscribe(`project:${project.id}:*`);
6566

66-
return eventStream(request.signal, (send, close) => {
67+
const signal = getRequestAbortSignal();
68+
69+
return eventStream(signal, (send, close) => {
6770
const safeSend = (args: { event?: string; data: string }) => {
6871
try {
6972
send(args);
@@ -95,7 +98,7 @@ export class TasksStreamPresenter {
9598
});
9699

97100
pinger = setInterval(() => {
98-
if (request.signal.aborted) {
101+
if (signal.aborted) {
99102
return close();
100103
}
101104

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { type ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
45
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
56
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
67
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
@@ -129,7 +130,7 @@ export const loader = createLoaderApiRoute(
129130
run.realtimeStreamsVersion
130131
);
131132

132-
return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, request.signal, {
133+
return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, getRequestAbortSignal(), {
133134
lastEventId,
134135
timeoutInSeconds,
135136
});

apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
45
import {
56
getInputStreamWaitpoint,
67
deleteInputStreamWaitpoint,
@@ -162,7 +163,7 @@ const loader = createLoaderApiRoute(
162163
request,
163164
run.friendlyId,
164165
`$trigger.input:${params.streamId}`,
165-
request.signal,
166+
getRequestAbortSignal(),
166167
{
167168
lastEventId,
168169
timeoutInSeconds,

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { $replica } from "~/db.server";
2121
import { useEnvironment } from "~/hooks/useEnvironment";
2222
import { useOrganization } from "~/hooks/useOrganizations";
2323
import { useProject } from "~/hooks/useProject";
24+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
2425
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
2526
import { requireUserId } from "~/services/session.server";
2627
import { cn } from "~/utils/cn";
@@ -89,7 +90,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
8990
run.realtimeStreamsVersion
9091
);
9192

92-
return realtimeStream.streamResponse(request, run.friendlyId, streamKey, request.signal, {
93+
return realtimeStream.streamResponse(request, run.friendlyId, streamKey, getRequestAbortSignal(), {
9394
lastEventId,
9495
});
9596
};

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { z } from "zod";
55
import { env } from "~/env.server";
66
import { findProjectBySlug } from "~/models/project.server";
77
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
8+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
89
import { requireUserId } from "~/services/session.server";
910
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
1011
import { inflate } from "node:zlib";
@@ -92,7 +93,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
9293
const result = streamText({
9394
model: openai(env.AI_RUN_FILTER_MODEL ?? "gpt-5-mini"),
9495
temperature: 1,
95-
abortSignal: request.signal,
96+
abortSignal: getRequestAbortSignal(),
9697
system: systemPrompt,
9798
prompt,
9899
tools: {

apps/webapp/app/services/httpAsyncStorage.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export type HttpLocalStorage = {
55
path: string;
66
host: string;
77
method: string;
8+
abortController: AbortController;
89
};
910

1011
const httpLocalStorage = new AsyncLocalStorage<HttpLocalStorage>();
@@ -18,3 +19,15 @@ export function runWithHttpContext<T>(context: HttpLocalStorage, fn: () => T): T
1819
export function getHttpContext(): HttpLocalStorage | undefined {
1920
return httpLocalStorage.getStore();
2021
}
22+
23+
// Fallback signal that is never aborted, safe for tests and non-Express contexts.
24+
const neverAbortedSignal = new AbortController().signal;
25+
26+
/**
27+
* Returns an AbortSignal wired to the Express response's "close" event.
28+
* This bypasses the broken request.signal chain in @remix-run/express
29+
* (caused by Node.js undici GC bug nodejs/node#55428).
30+
*/
31+
export function getRequestAbortSignal(): AbortSignal {
32+
return httpLocalStorage.getStore()?.abortController.signal ?? neverAbortedSignal;
33+
}

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,23 @@ type StreamChunk =
2323
export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
2424
private logger: Logger;
2525
private inactivityTimeoutMs: number;
26+
// Shared connection for short-lived non-blocking operations (XADD, XREVRANGE, EXPIRE).
27+
// Lazily created on first use so we don't open a connection if only streamResponse is called.
28+
private _sharedRedis: Redis | undefined;
2629

2730
constructor(private options: RealtimeStreamsOptions) {
2831
this.logger = options.logger ?? new Logger("RedisRealtimeStreams", options.logLevel ?? "info");
29-
this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 60000; // Default: 60 seconds
32+
this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 15000; // Default: 15 seconds
33+
}
34+
35+
private get sharedRedis(): Redis {
36+
if (!this._sharedRedis) {
37+
this._sharedRedis = new Redis({
38+
...this.options.redis,
39+
connectionName: "realtime:shared",
40+
});
41+
}
42+
return this._sharedRedis;
3043
}
3144

3245
async initializeStream(
@@ -43,7 +56,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
4356
signal: AbortSignal,
4457
options?: StreamResponseOptions
4558
): Promise<Response> {
46-
const redis = new Redis(this.options.redis ?? {});
59+
const redis = new Redis({ ...this.options.redis, connectionName: "realtime:streamResponse" });
4760
const streamKey = `stream:${runId}:${streamId}`;
4861
let isCleanedUp = false;
4962

@@ -269,7 +282,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
269282
async function cleanup() {
270283
if (isCleanedUp) return;
271284
isCleanedUp = true;
272-
await redis.quit().catch(console.error);
285+
// disconnect() tears down the TCP socket immediately, which causes any
286+
// pending XREAD BLOCK to reject right away instead of waiting for the
287+
// block timeout to elapse. quit() would queue behind the blocking command.
288+
redis.disconnect();
273289
}
274290

275291
signal.addEventListener("abort", cleanup, { once: true });
@@ -290,22 +306,12 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
290306
clientId: string,
291307
resumeFromChunk?: number
292308
): Promise<Response> {
293-
const redis = new Redis(this.options.redis ?? {});
309+
const redis = this.sharedRedis;
294310
const streamKey = `stream:${runId}:${streamId}`;
295311
const startChunk = resumeFromChunk ?? 0;
296312
// Start counting from the resume point, not from 0
297313
let currentChunkIndex = startChunk;
298314

299-
const self = this;
300-
301-
async function cleanup() {
302-
try {
303-
await redis.quit();
304-
} catch (error) {
305-
self.logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error });
306-
}
307-
}
308-
309315
try {
310316
const textStream = stream.pipeThrough(new TextDecoderStream());
311317
const reader = textStream.getReader();
@@ -361,13 +367,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
361367
this.logger.error("[RealtimeStreams][ingestData] Error in ingestData:", { error });
362368

363369
return new Response(null, { status: 500 });
364-
} finally {
365-
await cleanup();
366370
}
367371
}
368372

369373
async appendPart(part: string, partId: string, runId: string, streamId: string): Promise<void> {
370-
const redis = new Redis(this.options.redis ?? {});
374+
const redis = this.sharedRedis;
371375
const streamKey = `stream:${runId}:${streamId}`;
372376

373377
await redis.xadd(
@@ -386,12 +390,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
386390

387391
// Set TTL for cleanup when stream is done
388392
await redis.expire(streamKey, env.REALTIME_STREAM_TTL);
389-
390-
await redis.quit();
391393
}
392394

393395
async getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number> {
394-
const redis = new Redis(this.options.redis ?? {});
396+
const redis = this.sharedRedis;
395397
const streamKey = `stream:${runId}:${streamId}`;
396398

397399
try {
@@ -460,10 +462,6 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
460462
});
461463
// Return -1 to indicate we don't know what the server has
462464
return -1;
463-
} finally {
464-
await redis.quit().catch((err) => {
465-
this.logger.error("[RedisRealtimeStreams][getLastChunkIndex] Error in cleanup:", { err });
466-
});
467465
}
468466
}
469467

apps/webapp/app/utils/sse.server.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { eventStream } from "remix-utils/sse/server";
22
import { env } from "~/env.server";
3+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
34
import { logger } from "~/services/logger.server";
45

56
type SseProps = {
@@ -22,6 +23,8 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
2223
return new Response("SSE disabled", { status: 200 });
2324
}
2425

26+
const signal = getRequestAbortSignal();
27+
2528
let pinger: NodeJS.Timeout | undefined = undefined;
2629
let updater: NodeJS.Timeout | undefined = undefined;
2730
let timeout: NodeJS.Timeout | undefined = undefined;
@@ -32,7 +35,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
3235
clearTimeout(timeout);
3336
};
3437

35-
return eventStream(request.signal, (send, close) => {
38+
return eventStream(signal, (send, close) => {
3639
const safeSend = (args: { event?: string; data: string }) => {
3740
try {
3841
send(args);
@@ -60,15 +63,15 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }:
6063
};
6164

6265
pinger = setInterval(() => {
63-
if (request.signal.aborted) {
66+
if (signal.aborted) {
6467
return abort();
6568
}
6669

6770
safeSend({ event: "ping", data: new Date().toISOString() });
6871
}, pingInterval);
6972

7073
updater = setInterval(() => {
71-
if (request.signal.aborted) {
74+
if (signal.aborted) {
7275
return abort();
7376
}
7477

apps/webapp/app/utils/sse.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { type LoaderFunctionArgs } from "@remix-run/node";
22
import { type Params } from "@remix-run/router";
33
import { eventStream } from "remix-utils/sse/server";
44
import { setInterval } from "timers/promises";
5+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
56

67
export type SendFunction = Parameters<Parameters<typeof eventStream>[1]>[0];
78

@@ -89,15 +90,17 @@ export function createSSELoader(options: SSEOptions) {
8990
throw new Response("Internal Server Error", { status: 500 });
9091
});
9192

93+
const requestAbortSignal = getRequestAbortSignal();
94+
9295
const combinedSignal = AbortSignal.any([
93-
request.signal,
96+
requestAbortSignal,
9497
timeoutSignal,
9598
internalController.signal,
9699
]);
97100

98101
log("Start");
99102

100-
request.signal.addEventListener(
103+
requestAbortSignal.addEventListener(
101104
"abort",
102105
() => {
103106
log(`request signal aborted`);

0 commit comments

Comments
 (0)