diff --git a/.server-changes/realtime-redis-connection-leak.md b/.server-changes/realtime-redis-connection-leak.md new file mode 100644 index 00000000000..e27b200174e --- /dev/null +++ b/.server-changes/realtime-redis-connection-leak.md @@ -0,0 +1,10 @@ +--- +area: webapp +type: fix +--- + +Fix Redis connection leak in realtime streams and broken abort signal propagation. + +**Redis connections**: Non-blocking methods (ingestData, appendPart, getLastChunkIndex) now share a single Redis connection instead of creating one per request. streamResponse still uses dedicated connections (required for XREAD BLOCK) but now tears them down immediately via disconnect() instead of graceful quit(), with a 15s inactivity fallback. + +**Abort signal**: request.signal is broken in Remix/Express due to a Node.js undici GC bug (nodejs/node#55428) that severs the signal chain when Remix clones the Request internally. Added getRequestAbortSignal() wired to Express res.on("close") via httpAsyncStorage, which fires reliably on client disconnect. All SSE/streaming routes updated to use it. diff --git a/apps/webapp/CLAUDE.md b/apps/webapp/CLAUDE.md index dff3ca4eb85..a4de6ab57b7 100644 --- a/apps/webapp/CLAUDE.md +++ b/apps/webapp/CLAUDE.md @@ -59,6 +59,17 @@ Use the `chrome-devtools` MCP server to visually verify local dashboard changes. Routes use Remix flat-file convention with dot-separated segments: `api.v1.tasks.$taskId.trigger.ts` -> `/api/v1/tasks/:taskId/trigger` +## Abort Signals + +**Never use `request.signal`** for detecting client disconnects. It is broken due to a Node.js bug ([nodejs/node#55428](https://github.com/nodejs/node/issues/55428)) where the AbortSignal chain is severed when Remix internally clones the Request object. Instead, use `getRequestAbortSignal()` from `app/services/httpAsyncStorage.server.ts`, which is wired directly to Express `res.on("close")` and fires reliably. + +```typescript +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; + +// In route handlers, SSE streams, or any server-side code: +const signal = getRequestAbortSignal(); +``` + ## Environment Variables Access via `env` export from `app/env.server.ts`. **Never use `process.env` directly.** diff --git a/apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts b/apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts index d690b3d083f..17a5bda620a 100644 --- a/apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/TasksStreamPresenter.server.ts @@ -1,6 +1,7 @@ import { type TaskRunAttempt } from "@trigger.dev/database"; import { eventStream } from "remix-utils/sse/server"; import { type PrismaClient, prisma } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; import { projectPubSub } from "~/v3/services/projectPubSub.server"; @@ -63,7 +64,9 @@ export class TasksStreamPresenter { const subscriber = await projectPubSub.subscribe(`project:${project.id}:*`); - return eventStream(request.signal, (send, close) => { + const signal = getRequestAbortSignal(); + + return eventStream(signal, (send, close) => { const safeSend = (args: { event?: string; data: string }) => { try { send(args); @@ -95,7 +98,7 @@ export class TasksStreamPresenter { }); pinger = setInterval(() => { - if (request.signal.aborted) { + if (signal.aborted) { return close(); } diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts index 822c10a8101..aabd83bc9bb 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts @@ -1,6 +1,7 @@ import { type ActionFunctionArgs } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -129,7 +130,7 @@ export const loader = createLoaderApiRoute( run.realtimeStreamsVersion ); - return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, request.signal, { + return realtimeStream.streamResponse(request, run.friendlyId, params.streamId, getRequestAbortSignal(), { lastEventId, timeoutInSeconds, }); diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts index 98c348a023a..b16b1ca7922 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts @@ -1,6 +1,7 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { getInputStreamWaitpoint, deleteInputStreamWaitpoint, @@ -162,7 +163,7 @@ const loader = createLoaderApiRoute( request, run.friendlyId, `$trigger.input:${params.streamId}`, - request.signal, + getRequestAbortSignal(), { lastEventId, timeoutInSeconds, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx index b5763bb4e9c..1295adb7842 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx @@ -21,6 +21,7 @@ import { $replica } from "~/db.server"; import { useEnvironment } from "~/hooks/useEnvironment"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { requireUserId } from "~/services/session.server"; import { cn } from "~/utils/cn"; @@ -89,7 +90,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { run.realtimeStreamsVersion ); - return realtimeStream.streamResponse(request, run.friendlyId, streamKey, request.signal, { + return realtimeStream.streamResponse(request, run.friendlyId, streamKey, getRequestAbortSignal(), { lastEventId, }); }; diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx index 2cfbe6a10b8..a4a5b8900b6 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.ai-generate-payload.tsx @@ -5,6 +5,7 @@ import { z } from "zod"; import { env } from "~/env.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { requireUserId } from "~/services/session.server"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; import { inflate } from "node:zlib"; @@ -92,7 +93,7 @@ export async function action({ request, params }: ActionFunctionArgs) { const result = streamText({ model: openai(env.AI_RUN_FILTER_MODEL ?? "gpt-5-mini"), temperature: 1, - abortSignal: request.signal, + abortSignal: getRequestAbortSignal(), system: systemPrompt, prompt, tools: { diff --git a/apps/webapp/app/services/httpAsyncStorage.server.ts b/apps/webapp/app/services/httpAsyncStorage.server.ts index 7b709e4bf19..24b5c23f871 100644 --- a/apps/webapp/app/services/httpAsyncStorage.server.ts +++ b/apps/webapp/app/services/httpAsyncStorage.server.ts @@ -5,6 +5,7 @@ export type HttpLocalStorage = { path: string; host: string; method: string; + abortController: AbortController; }; const httpLocalStorage = new AsyncLocalStorage(); @@ -18,3 +19,15 @@ export function runWithHttpContext(context: HttpLocalStorage, fn: () => T): T export function getHttpContext(): HttpLocalStorage | undefined { return httpLocalStorage.getStore(); } + +// Fallback signal that is never aborted, safe for tests and non-Express contexts. +const neverAbortedSignal = new AbortController().signal; + +/** + * Returns an AbortSignal wired to the Express response's "close" event. + * This bypasses the broken request.signal chain in @remix-run/express + * (caused by Node.js undici GC bug nodejs/node#55428). + */ +export function getRequestAbortSignal(): AbortSignal { + return httpLocalStorage.getStore()?.abortController.signal ?? neverAbortedSignal; +} diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index e742f770a99..99ad10c8ee4 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -7,7 +7,7 @@ export type RealtimeStreamsOptions = { redis: RedisOptions | undefined; logger?: Logger; logLevel?: LogLevel; - inactivityTimeoutMs?: number; // Close stream after this many ms of no new data (default: 60000) + inactivityTimeoutMs?: number; // Close stream after this many ms of no new data (default: 15000) }; // Legacy constant for backward compatibility (no longer written, but still recognized when reading) @@ -23,10 +23,23 @@ type StreamChunk = export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { private logger: Logger; private inactivityTimeoutMs: number; + // Shared connection for short-lived non-blocking operations (XADD, XREVRANGE, EXPIRE). + // Lazily created on first use so we don't open a connection if only streamResponse is called. + private _sharedRedis: Redis | undefined; constructor(private options: RealtimeStreamsOptions) { this.logger = options.logger ?? new Logger("RedisRealtimeStreams", options.logLevel ?? "info"); - this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 60000; // Default: 60 seconds + this.inactivityTimeoutMs = options.inactivityTimeoutMs ?? 15000; // Default: 15 seconds + } + + private get sharedRedis(): Redis { + if (!this._sharedRedis) { + this._sharedRedis = new Redis({ + ...this.options.redis, + connectionName: "realtime:shared", + }); + } + return this._sharedRedis; } async initializeStream( @@ -43,7 +56,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { signal: AbortSignal, options?: StreamResponseOptions ): Promise { - const redis = new Redis(this.options.redis ?? {}); + const redis = new Redis({ ...this.options.redis, connectionName: "realtime:streamResponse" }); const streamKey = `stream:${runId}:${streamId}`; let isCleanedUp = false; @@ -269,7 +282,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { async function cleanup() { if (isCleanedUp) return; isCleanedUp = true; - await redis.quit().catch(console.error); + // disconnect() tears down the TCP socket immediately, which causes any + // pending XREAD BLOCK to reject right away instead of waiting for the + // block timeout to elapse. quit() would queue behind the blocking command. + redis.disconnect(); } signal.addEventListener("abort", cleanup, { once: true }); @@ -290,22 +306,12 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { clientId: string, resumeFromChunk?: number ): Promise { - const redis = new Redis(this.options.redis ?? {}); + const redis = this.sharedRedis; const streamKey = `stream:${runId}:${streamId}`; const startChunk = resumeFromChunk ?? 0; // Start counting from the resume point, not from 0 let currentChunkIndex = startChunk; - const self = this; - - async function cleanup() { - try { - await redis.quit(); - } catch (error) { - self.logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error }); - } - } - try { const textStream = stream.pipeThrough(new TextDecoderStream()); const reader = textStream.getReader(); @@ -361,13 +367,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { this.logger.error("[RealtimeStreams][ingestData] Error in ingestData:", { error }); return new Response(null, { status: 500 }); - } finally { - await cleanup(); } } async appendPart(part: string, partId: string, runId: string, streamId: string): Promise { - const redis = new Redis(this.options.redis ?? {}); + const redis = this.sharedRedis; const streamKey = `stream:${runId}:${streamId}`; await redis.xadd( @@ -386,12 +390,10 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { // Set TTL for cleanup when stream is done await redis.expire(streamKey, env.REALTIME_STREAM_TTL); - - await redis.quit(); } async getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise { - const redis = new Redis(this.options.redis ?? {}); + const redis = this.sharedRedis; const streamKey = `stream:${runId}:${streamId}`; try { @@ -460,10 +462,6 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { }); // Return -1 to indicate we don't know what the server has return -1; - } finally { - await redis.quit().catch((err) => { - this.logger.error("[RedisRealtimeStreams][getLastChunkIndex] Error in cleanup:", { err }); - }); } } diff --git a/apps/webapp/app/utils/sse.server.ts b/apps/webapp/app/utils/sse.server.ts index 56e7b191af7..c8ecce4a859 100644 --- a/apps/webapp/app/utils/sse.server.ts +++ b/apps/webapp/app/utils/sse.server.ts @@ -1,5 +1,6 @@ import { eventStream } from "remix-utils/sse/server"; import { env } from "~/env.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; type SseProps = { @@ -22,6 +23,8 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }: return new Response("SSE disabled", { status: 200 }); } + const signal = getRequestAbortSignal(); + let pinger: NodeJS.Timeout | undefined = undefined; let updater: NodeJS.Timeout | undefined = undefined; let timeout: NodeJS.Timeout | undefined = undefined; @@ -32,7 +35,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }: clearTimeout(timeout); }; - return eventStream(request.signal, (send, close) => { + return eventStream(signal, (send, close) => { const safeSend = (args: { event?: string; data: string }) => { try { send(args); @@ -60,7 +63,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }: }; pinger = setInterval(() => { - if (request.signal.aborted) { + if (signal.aborted) { return abort(); } @@ -68,7 +71,7 @@ export function sse({ request, pingInterval = 1000, updateInterval = 348, run }: }, pingInterval); updater = setInterval(() => { - if (request.signal.aborted) { + if (signal.aborted) { return abort(); } diff --git a/apps/webapp/app/utils/sse.ts b/apps/webapp/app/utils/sse.ts index 8f396c092e9..f48cc9e31f9 100644 --- a/apps/webapp/app/utils/sse.ts +++ b/apps/webapp/app/utils/sse.ts @@ -2,6 +2,7 @@ import { type LoaderFunctionArgs } from "@remix-run/node"; import { type Params } from "@remix-run/router"; import { eventStream } from "remix-utils/sse/server"; import { setInterval } from "timers/promises"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; export type SendFunction = Parameters[1]>[0]; @@ -89,15 +90,17 @@ export function createSSELoader(options: SSEOptions) { throw new Response("Internal Server Error", { status: 500 }); }); + const requestAbortSignal = getRequestAbortSignal(); + const combinedSignal = AbortSignal.any([ - request.signal, + requestAbortSignal, timeoutSignal, internalController.signal, ]); log("Start"); - request.signal.addEventListener( + requestAbortSignal.addEventListener( "abort", () => { log(`request signal aborted`); diff --git a/apps/webapp/remix.config.js b/apps/webapp/remix.config.js index ae2f18cd72e..a4ad1bd228e 100644 --- a/apps/webapp/remix.config.js +++ b/apps/webapp/remix.config.js @@ -30,6 +30,7 @@ module.exports = { "redlock", "parse-duration", "uncrypto", + "std-env", ], browserNodeBuiltinsPolyfill: { modules: { diff --git a/apps/webapp/server.ts b/apps/webapp/server.ts index b2cc9387332..e266c6985c8 100644 --- a/apps/webapp/server.ts +++ b/apps/webapp/server.ts @@ -145,9 +145,11 @@ if (ENABLE_CLUSTER && cluster.isPrimary) { app.use((req, res, next) => { // Generate a unique request ID for each request const requestId = nanoid(); + const abortController = new AbortController(); + res.on("close", () => abortController.abort()); runWithHttpContext( - { requestId, path: req.url, host: req.hostname, method: req.method }, + { requestId, path: req.url, host: req.hostname, method: req.method, abortController }, next ); });