diff --git a/.changeset/session-primitive.md b/.changeset/session-primitive.md new file mode 100644 index 00000000000..ccfd3b51807 --- /dev/null +++ b/.changeset/session-primitive.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc. diff --git a/.server-changes/session-primitive.md b/.server-changes/session-primitive.md new file mode 100644 index 00000000000..80516a5c6a6 --- /dev/null +++ b/.server-changes/session-primitive.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Add `Session` primitive — a durable, typed, bidirectional I/O primitive that outlives a single run, intended for agent/chat use cases. Ships the Postgres schema (`Session` table), control-plane CRUD routes (`POST/GET/PATCH /api/v1/sessions`, `POST /api/v1/sessions/:session/close` — polymorphic on friendlyId or externalId), `sessions` JWT scope, ClickHouse `sessions_v1` table, and `SessionsReplicationService` (logical replication from Postgres `Session` → ClickHouse `sessions_v1`). Run-scoped realtime streams (`streams.pipe`/`streams.input`) are unchanged and do **not** create Session rows. diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index 4ee4f252a32..107036eb29a 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -23,6 +23,34 @@ import { registerRunEngineEventBusHandlers, setupBatchQueueCallbacks, } from "./v3/runEngineHandlers.server"; +import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server"; +import { signalsEmitter } from "./services/signals.server"; + +// Start the sessions replication service (subscribes to the logical replication +// slot, runs leader election, flushes to ClickHouse). Done at entry level so it +// runs deterministically on webapp boot rather than lazily via a singleton +// reference elsewhere in the module graph. +if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") { + sessionsReplicationInstance + .start() + .then(() => { + console.log("🗃️ Sessions replication service started"); + }) + .catch((error) => { + console.error("🗃️ Sessions replication service failed to start", { + error, + }); + }); + + signalsEmitter.on( + "SIGTERM", + sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance) + ); + signalsEmitter.on( + "SIGINT", + sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance) + ); +} const ABORT_DELAY = 30000; diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 8d72b2e51b2..2b00567c111 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1213,6 +1213,38 @@ const EnvironmentSchema = z RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"), RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"), + // Session replication (Postgres → ClickHouse sessions_v1). Shares Redis + // with the runs replicator for leader locking but has its own slot and + // publication so the two consume independently. + SESSION_REPLICATION_CLICKHOUSE_URL: z.string().optional(), + SESSION_REPLICATION_ENABLED: z.string().default("0"), + SESSION_REPLICATION_SLOT_NAME: z.string().default("sessions_to_clickhouse_v1"), + SESSION_REPLICATION_PUBLICATION_NAME: z + .string() + .default("sessions_to_clickhouse_v1_publication"), + SESSION_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(1), + SESSION_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), + SESSION_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100), + SESSION_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000), + SESSION_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000), + SESSION_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000), + SESSION_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500), + SESSION_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10), + SESSION_REPLICATION_LOG_LEVEL: z + .enum(["log", "error", "warn", "info", "debug"]) + .default("info"), + SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL: z + .enum(["log", "error", "warn", "info", "debug"]) + .default("info"), + SESSION_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"), + SESSION_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"), + SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(), + SESSION_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10), + SESSION_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"), + SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3), + SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100), + SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), + // Clickhouse CLICKHOUSE_URL: z.string(), CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"), diff --git a/apps/webapp/app/routes/api.v1.sessions.$session.close.ts b/apps/webapp/app/routes/api.v1.sessions.$session.close.ts new file mode 100644 index 00000000000..047477e47ae --- /dev/null +++ b/apps/webapp/app/routes/api.v1.sessions.$session.close.ts @@ -0,0 +1,61 @@ +import { json } from "@remix-run/server-runtime"; +import { + CloseSessionRequestBody, + type RetrieveSessionResponseBody, +} from "@trigger.dev/core/v3"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { + resolveSessionByIdOrExternalId, + serializeSession, +} from "~/services/realtime/sessions.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; + +const ParamsSchema = z.object({ + session: z.string(), +}); + +const { action } = createActionApiRoute( + { + params: ParamsSchema, + body: CloseSessionRequestBody, + maxContentLength: 1024, + method: "POST", + allowJWT: true, + corsStrategy: "all", + authorization: { + action: "admin", + resource: (params) => ({ sessions: params.session }), + superScopes: ["admin:sessions", "admin:all", "admin"], + }, + }, + async ({ authentication, params, body }) => { + const existing = await resolveSessionByIdOrExternalId( + prisma, + authentication.environment.id, + params.session + ); + + if (!existing) { + return json({ error: "Session not found" }, { status: 404 }); + } + + // Idempotent: if already closed, return the current row without clobbering + // the original closedAt / closedReason. + if (existing.closedAt) { + return json(serializeSession(existing)); + } + + const updated = await prisma.session.update({ + where: { id: existing.id }, + data: { + closedAt: new Date(), + closedReason: body.reason ?? null, + }, + }); + + return json(serializeSession(updated)); + } +); + +export { action }; diff --git a/apps/webapp/app/routes/api.v1.sessions.$session.ts b/apps/webapp/app/routes/api.v1.sessions.$session.ts new file mode 100644 index 00000000000..fa061116993 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.sessions.$session.ts @@ -0,0 +1,104 @@ +import { json } from "@remix-run/server-runtime"; +import { + type RetrieveSessionResponseBody, + UpdateSessionRequestBody, +} from "@trigger.dev/core/v3"; +import { Prisma } from "@trigger.dev/database"; +import { z } from "zod"; +import { $replica, prisma } from "~/db.server"; +import { + resolveSessionByIdOrExternalId, + serializeSession, +} from "~/services/realtime/sessions.server"; +import { + createActionApiRoute, + createLoaderApiRoute, +} from "~/services/routeBuilders/apiBuilder.server"; + +const ParamsSchema = z.object({ + session: z.string(), +}); + +export const loader = createLoaderApiRoute( + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "all", + findResource: async (params, auth) => { + return resolveSessionByIdOrExternalId($replica, auth.environment.id, params.session); + }, + authorization: { + action: "read", + resource: (session) => ({ sessions: [session.friendlyId, session.externalId ?? ""] }), + superScopes: ["read:sessions", "read:all", "admin"], + }, + }, + async ({ resource: session }) => { + return json(serializeSession(session)); + } +); + +const { action } = createActionApiRoute( + { + params: ParamsSchema, + body: UpdateSessionRequestBody, + maxContentLength: 1024 * 32, + method: "PATCH", + allowJWT: true, + corsStrategy: "all", + authorization: { + action: "admin", + resource: (params) => ({ sessions: params.session }), + superScopes: ["admin:sessions", "admin:all", "admin"], + }, + }, + async ({ authentication, params, body }) => { + const existing = await resolveSessionByIdOrExternalId( + prisma, + authentication.environment.id, + params.session + ); + + if (!existing) { + return json({ error: "Session not found" }, { status: 404 }); + } + + try { + const updated = await prisma.session.update({ + where: { id: existing.id }, + data: { + ...(body.tags !== undefined ? { tags: body.tags } : {}), + ...(body.metadata !== undefined + ? { + metadata: + body.metadata === null + ? Prisma.JsonNull + : (body.metadata as Prisma.InputJsonValue), + } + : {}), + ...(body.externalId !== undefined ? { externalId: body.externalId } : {}), + }, + }); + + return json(serializeSession(updated)); + } catch (error) { + // A duplicate externalId in the same environment violates the + // `(runtimeEnvironmentId, externalId)` unique constraint. Surface that + // as a 409 rather than a generic 500. + if ( + error instanceof Prisma.PrismaClientKnownRequestError && + error.code === "P2002" && + Array.isArray((error.meta as { target?: string[] })?.target) && + ((error.meta as { target?: string[] }).target ?? []).includes("externalId") + ) { + return json( + { error: "A session with this externalId already exists in this environment" }, + { status: 409 } + ); + } + throw error; + } + } +); + +export { action }; diff --git a/apps/webapp/app/routes/api.v1.sessions.ts b/apps/webapp/app/routes/api.v1.sessions.ts new file mode 100644 index 00000000000..34bfc497afe --- /dev/null +++ b/apps/webapp/app/routes/api.v1.sessions.ts @@ -0,0 +1,165 @@ +import { json } from "@remix-run/server-runtime"; +import { + CreateSessionRequestBody, + type CreatedSessionResponseBody, + ListSessionsQueryParams, + type ListSessionsResponseBody, + type SessionStatus, +} from "@trigger.dev/core/v3"; +import { SessionId } from "@trigger.dev/core/v3/isomorphic"; +import type { Prisma, Session } from "@trigger.dev/database"; +import { $replica, prisma, type PrismaClient } from "~/db.server"; +import { clickhouseClient } from "~/services/clickhouseInstance.server"; +import { logger } from "~/services/logger.server"; +import { serializeSession } from "~/services/realtime/sessions.server"; +import { SessionsRepository } from "~/services/sessionsRepository/sessionsRepository.server"; +import { + createActionApiRoute, + createLoaderApiRoute, +} from "~/services/routeBuilders/apiBuilder.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; + +function asArray(value: T | T[] | undefined): T[] | undefined { + if (value === undefined) return undefined; + return Array.isArray(value) ? value : [value]; +} + +export const loader = createLoaderApiRoute( + { + searchParams: ListSessionsQueryParams, + allowJWT: true, + corsStrategy: "all", + authorization: { + action: "read", + resource: (_, __, searchParams) => ({ tasks: searchParams["filter[taskIdentifier]"] }), + superScopes: ["read:sessions", "read:all", "admin"], + }, + findResource: async () => 1, + }, + async ({ searchParams, authentication }) => { + const repository = new SessionsRepository({ + clickhouse: clickhouseClient, + prisma: $replica as PrismaClient, + }); + + // `page[after]` is the forward cursor, `page[before]` is the backward + // cursor. The repository internally keys off `{cursor, direction}`. + const cursor = searchParams["page[after]"] ?? searchParams["page[before]"]; + const direction = searchParams["page[before]"] ? "backward" : "forward"; + + const { sessions: rows, pagination } = await repository.listSessions({ + organizationId: authentication.environment.organizationId, + projectId: authentication.environment.projectId, + environmentId: authentication.environment.id, + types: asArray(searchParams["filter[type]"]), + tags: asArray(searchParams["filter[tags]"]), + taskIdentifiers: asArray(searchParams["filter[taskIdentifier]"]), + externalId: searchParams["filter[externalId]"], + statuses: asArray(searchParams["filter[status]"]) as SessionStatus[] | undefined, + period: searchParams["filter[createdAt][period]"], + from: searchParams["filter[createdAt][from]"], + to: searchParams["filter[createdAt][to]"], + page: { + size: searchParams["page[size]"], + cursor, + direction, + }, + }); + + return json({ + data: rows.map((session) => + serializeSession({ + ...session, + // Columns the list query doesn't select — filled so `serializeSession` + // can operate on a narrowed payload without type errors. + projectId: authentication.environment.projectId, + environmentType: authentication.environment.type, + organizationId: authentication.environment.organizationId, + } as Session) + ), + pagination: { + ...(pagination.nextCursor ? { next: pagination.nextCursor } : {}), + ...(pagination.previousCursor ? { previous: pagination.previousCursor } : {}), + }, + }); + } +); + +const { action } = createActionApiRoute( + { + body: CreateSessionRequestBody, + method: "POST", + maxContentLength: 1024 * 32, // 32KB — metadata is the only thing that grows + }, + async ({ authentication, body }) => { + try { + let session: Session; + let isCached = false; + + if (body.externalId) { + // Atomic upsert — two concurrent POSTs with the same externalId both + // converge to the same row without either hitting a 500 from the + // unique constraint. Derive isCached from the upsert result: if the + // row pre-existed, the returned id won't match the one we just + // generated. Saves a round-trip and is race-free. + const { id, friendlyId } = SessionId.generate(); + const externalId = body.externalId; + + session = await prisma.session.upsert({ + where: { + runtimeEnvironmentId_externalId: { + runtimeEnvironmentId: authentication.environment.id, + externalId, + }, + }, + create: { + id, + friendlyId, + externalId, + type: body.type, + taskIdentifier: body.taskIdentifier ?? null, + tags: body.tags ?? [], + metadata: body.metadata as Prisma.InputJsonValue | undefined, + expiresAt: body.expiresAt ?? null, + projectId: authentication.environment.projectId, + runtimeEnvironmentId: authentication.environment.id, + environmentType: authentication.environment.type, + organizationId: authentication.environment.organizationId, + }, + update: {}, + }); + isCached = session.id !== id; + } else { + const { id, friendlyId } = SessionId.generate(); + session = await prisma.session.create({ + data: { + id, + friendlyId, + type: body.type, + taskIdentifier: body.taskIdentifier ?? null, + tags: body.tags ?? [], + metadata: body.metadata as Prisma.InputJsonValue | undefined, + expiresAt: body.expiresAt ?? null, + projectId: authentication.environment.projectId, + runtimeEnvironmentId: authentication.environment.id, + environmentType: authentication.environment.type, + organizationId: authentication.environment.organizationId, + }, + }); + } + + return json( + { ...serializeSession(session), isCached }, + { status: isCached ? 200 : 201 } + ); + } catch (error) { + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: 422 }); + } + logger.error("Failed to create session", { error }); + return json({ error: "Something went wrong" }, { status: 500 }); + } + } +); + +export { action }; diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts new file mode 100644 index 00000000000..a1e158d8821 --- /dev/null +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -0,0 +1,88 @@ +import { json } from "@remix-run/server-runtime"; +import { tryCatch } from "@trigger.dev/core/utils"; +import { nanoid } from "nanoid"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server"; +import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server"; +import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; + +const ParamsSchema = z.object({ + session: z.string(), + io: z.enum(["out", "in"]), +}); + +// POST: server-side append of a single record to a session channel. Mirrors +// the existing /realtime/v1/streams/:runId/:target/:streamId/append route, +// scoped to a Session primitive. +// S2 enforces a 1 MiB per-record limit (metered as +// `8 + 2*H + Σ(header name+value) + body`). We cap the raw HTTP body at +// 512 KiB so the JSON wrapper (`{"data":"...","id":"..."}`), string +// escaping, and any future per-record header additions all stay comfortably +// below S2's ceiling. See https://s2.dev/docs/limits. +const MAX_APPEND_BODY_BYTES = 1024 * 512; + +const { action } = createActionApiRoute( + { + params: ParamsSchema, + method: "POST", + maxContentLength: MAX_APPEND_BODY_BYTES, + allowJWT: true, + corsStrategy: "all", + authorization: { + action: "write", + resource: (params) => ({ sessions: params.session }), + superScopes: ["write:sessions", "write:all", "admin"], + }, + }, + async ({ request, params, authentication }) => { + const session = await resolveSessionByIdOrExternalId( + $replica, + authentication.environment.id, + params.session + ); + + if (!session) { + return new Response("Session not found", { status: 404 }); + } + + if (session.closedAt) { + return json( + { ok: false, error: "Cannot append to a closed session" }, + { status: 400 } + ); + } + + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2"); + + if (!(realtimeStream instanceof S2RealtimeStreams)) { + return json( + { ok: false, error: "Session channels require the S2 realtime backend" }, + { status: 501 } + ); + } + + const part = await request.text(); + const partId = request.headers.get("X-Part-Id") ?? nanoid(7); + + const [appendError] = await tryCatch( + realtimeStream.appendPartToSessionStream(part, partId, session.friendlyId, params.io) + ); + + if (appendError) { + if (appendError instanceof ServiceValidationError) { + return json( + { ok: false, error: appendError.message }, + { status: appendError.status ?? 422 } + ); + } + return json({ ok: false, error: appendError.message }, { status: 500 }); + } + + return json({ ok: true }, { status: 200 }); + } +); + +export { action }; diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts new file mode 100644 index 00000000000..ebc13511818 --- /dev/null +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts @@ -0,0 +1,140 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; +import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server"; +import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server"; +import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; +import { + createActionApiRoute, + createLoaderApiRoute, +} from "~/services/routeBuilders/apiBuilder.server"; + +const ParamsSchema = z.object({ + session: z.string(), + io: z.enum(["out", "in"]), +}); + +// PUT: initialize the S2 channel for this (session, io) pair — returns S2 +// credentials in response headers so the caller can write/read directly +// against S2. GET is handled by the loader below. +const { action } = createActionApiRoute( + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "all", + authorization: { + action: "write", + resource: (params) => ({ sessions: params.session }), + superScopes: ["write:sessions", "write:all", "admin"], + }, + }, + async ({ request, params, authentication }) => { + if (request.method !== "PUT") { + return new Response("Method not allowed", { status: 405 }); + } + + const session = await resolveSessionByIdOrExternalId( + $replica, + authentication.environment.id, + params.session + ); + + if (!session) { + return new Response("Session not found", { status: 404 }); + } + + if (session.closedAt) { + return new Response("Cannot initialize a channel on a closed session", { + status: 400, + }); + } + + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2"); + + if (!(realtimeStream instanceof S2RealtimeStreams)) { + return new Response("Session channels require the S2 realtime backend", { + status: 501, + }); + } + + const { responseHeaders } = await realtimeStream.initializeSessionStream( + session.friendlyId, + params.io + ); + + return json({ version: "v2" }, { status: 202, headers: responseHeaders }); + } +); + +// GET: SSE subscribe to a session channel. HEAD returns the last chunk index +// for resume semantics, mirroring the existing run-stream route. +const loader = createLoaderApiRoute( + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "all", + findResource: async (params, auth) => { + return resolveSessionByIdOrExternalId($replica, auth.environment.id, params.session); + }, + authorization: { + action: "read", + resource: (session) => { + const ids = session.externalId + ? [session.friendlyId, session.externalId] + : [session.friendlyId]; + return { sessions: ids }; + }, + superScopes: ["read:sessions", "read:all", "admin"], + }, + }, + async ({ params, request, resource: session, authentication }) => { + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2"); + + if (!(realtimeStream instanceof S2RealtimeStreams)) { + return new Response("Session channels require the S2 realtime backend", { + status: 501, + }); + } + + if (request.method === "HEAD") { + // No last-chunk-index on the S2 backend (clients resume via Last-Event-ID + // on the SSE stream directly). Return 200 with a zero index for + // compatibility with the run-stream shape. + return new Response(null, { + status: 200, + headers: { "X-Last-Chunk-Index": "0" }, + }); + } + + const lastEventId = request.headers.get("Last-Event-ID") ?? undefined; + + const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds"); + let timeoutInSeconds: number | undefined; + if (timeoutInSecondsRaw) { + // `Number()` rejects `"10abc"` as NaN; `parseInt` would silently accept + // the trailing garbage and bypass the bounds checks below. + const parsed = Number(timeoutInSecondsRaw); + if (!Number.isFinite(parsed) || !Number.isInteger(parsed)) { + return new Response("Invalid timeout seconds", { status: 400 }); + } + if (parsed < 1) { + return new Response("Timeout seconds must be greater than 0", { status: 400 }); + } + if (parsed > 600) { + return new Response("Timeout seconds must be less than 600", { status: 400 }); + } + timeoutInSeconds = parsed; + } + + return realtimeStream.streamResponseFromSessionStream( + request, + session.friendlyId, + params.io, + getRequestAbortSignal(), + { lastEventId, timeoutInSeconds } + ); + } +); + +export { action, loader }; diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts index facb6dd664f..deefbc20773 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts @@ -13,9 +13,17 @@ const ParamsSchema = z.object({ streamId: z.string(), }); +// S2 enforces a 1 MiB per-record limit (metered as +// `8 + 2*H + Σ(header name+value) + body`). Cap the raw HTTP body at +// 512 KiB so the JSON wrapper, string escaping, and any future per-record +// header additions all stay well under S2's ceiling. +// See https://s2.dev/docs/limits. +const MAX_APPEND_BODY_BYTES = 1024 * 512; + const { action } = createActionApiRoute( { params: ParamsSchema, + maxContentLength: MAX_APPEND_BODY_BYTES, }, async ({ request, params, authentication }) => { const run = await $replica.taskRun.findFirst({ diff --git a/apps/webapp/app/services/authorization.server.ts b/apps/webapp/app/services/authorization.server.ts index 0406c02438e..786cc161ed9 100644 --- a/apps/webapp/app/services/authorization.server.ts +++ b/apps/webapp/app/services/authorization.server.ts @@ -1,6 +1,6 @@ export type AuthorizationAction = "read" | "write" | string; // Add more actions as needed -const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints", "deployments", "inputStreams", "query", "prompts"] as const; +const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints", "deployments", "inputStreams", "query", "prompts", "sessions"] as const; export type AuthorizationResources = { [key in (typeof ResourceTypes)[number]]?: string | string[]; diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index 4a7acb60606..8e74661fe70 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -88,9 +88,42 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { return `${this.streamPrefix}/runs/${runId}/${streamId}`; } + /** + * Build an S2 stream name for a `Session`-primitive channel, addressed by + * the session's `friendlyId` and the I/O direction. Used by the session + * realtime routes to route traffic to `sessions/{friendlyId}/{out|in}`. + */ + public toSessionStreamName(friendlyId: string, io: "out" | "in"): string { + return `${this.streamPrefix}/sessions/${friendlyId}/${io}`; + } + async initializeStream( runId: string, streamId: string + ): Promise<{ responseHeaders?: Record }> { + return this.#initializeStreamByName( + this.toStreamName(runId, streamId), + `/runs/${runId}/${streamId}` + ); + } + + /** + * Initialize an S2 stream by `(sessionFriendlyId, io)` — mirrors + * {@link initializeStream} but addresses the new `sessions/*` key format. + */ + async initializeSessionStream( + friendlyId: string, + io: "out" | "in" + ): Promise<{ responseHeaders?: Record }> { + return this.#initializeStreamByName( + this.toSessionStreamName(friendlyId, io), + `/sessions/${friendlyId}/${io}` + ); + } + + async #initializeStreamByName( + prefixedName: string, + relativeName: string ): Promise<{ responseHeaders?: Record }> { const accessToken = this.skipAccessTokens ? this.token @@ -99,9 +132,7 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { return { responseHeaders: { "X-S2-Access-Token": accessToken, - "X-S2-Stream-Name": this.skipAccessTokens - ? this.toStreamName(runId, streamId) - : `/runs/${runId}/${streamId}`, + "X-S2-Stream-Name": this.skipAccessTokens ? prefixedName : relativeName, "X-S2-Basin": this.basin, "X-S2-Flush-Interval-Ms": this.flushIntervalMs.toString(), "X-S2-Max-Retries": this.maxRetries.toString(), @@ -121,8 +152,22 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { } async appendPart(part: string, partId: string, runId: string, streamId: string): Promise { - const s2Stream = this.toStreamName(runId, streamId); + return this.#appendPartByName(part, partId, this.toStreamName(runId, streamId)); + } + + /** + * Append a single record to a `Session`-primitive channel. + */ + async appendPartToSessionStream( + part: string, + partId: string, + friendlyId: string, + io: "out" | "in" + ): Promise { + return this.#appendPartByName(part, partId, this.toSessionStreamName(friendlyId, io)); + } + async #appendPartByName(part: string, partId: string, s2Stream: string): Promise { this.logger.debug(`S2 appending to stream`, { part, stream: s2Stream }); const result = await this.s2Append(s2Stream, { @@ -227,7 +272,28 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { signal: AbortSignal, options?: StreamResponseOptions ): Promise { - const s2Stream = this.toStreamName(runId, streamId); + return this.#streamResponseByName(this.toStreamName(runId, streamId), signal, options); + } + + /** + * Serve SSE from a `Session`-primitive channel addressed by + * `(friendlyId, io)`. + */ + async streamResponseFromSessionStream( + request: Request, + friendlyId: string, + io: "out" | "in", + signal: AbortSignal, + options?: StreamResponseOptions + ): Promise { + return this.#streamResponseByName(this.toSessionStreamName(friendlyId, io), signal, options); + } + + async #streamResponseByName( + s2Stream: string, + signal: AbortSignal, + options?: StreamResponseOptions + ): Promise { const startSeq = this.parseLastEventId(options?.lastEventId); this.logger.info(`S2 streaming records from stream`, { stream: s2Stream, startSeq }); diff --git a/apps/webapp/app/services/realtime/sessions.server.ts b/apps/webapp/app/services/realtime/sessions.server.ts new file mode 100644 index 00000000000..5ed67d5a691 --- /dev/null +++ b/apps/webapp/app/services/realtime/sessions.server.ts @@ -0,0 +1,57 @@ +import type { PrismaClient, Session } from "@trigger.dev/database"; +import type { SessionItem } from "@trigger.dev/core/v3"; + +/** + * Prefix that {@link SessionId.generate} attaches to every Session friendlyId. + * Used to distinguish friendlyId lookups (`session_abc...`) from externalId + * lookups on the public `GET /api/v1/sessions/:session` route. + */ +const SESSION_FRIENDLY_ID_PREFIX = "session_"; + +/** + * Resolve a session from a URL path parameter that may contain either a + * friendlyId (`session_abc...`) or a user-supplied externalId. + * + * Disambiguated by prefix: values starting with `session_` are treated as + * friendlyIds, anything else is looked up against `externalId` scoped to + * the caller's environment. + */ +export async function resolveSessionByIdOrExternalId( + prisma: Pick, + runtimeEnvironmentId: string, + idOrExternalId: string +): Promise { + if (idOrExternalId.startsWith(SESSION_FRIENDLY_ID_PREFIX)) { + return prisma.session.findFirst({ + where: { friendlyId: idOrExternalId, runtimeEnvironmentId }, + }); + } + + // `findFirst` rather than `findUnique` per the repo rule — `findUnique`'s + // implicit DataLoader has open correctness bugs in Prisma 6.x that bite + // hot-path lookups exactly like this one. + return prisma.session.findFirst({ + where: { runtimeEnvironmentId, externalId: idOrExternalId }, + }); +} + +/** + * Convert a Prisma `Session` row to the public {@link SessionItem} wire format. + * Strips internal columns (project/environment/organization ids) and narrows + * the `metadata` JSON to a record. + */ +export function serializeSession(session: Session): SessionItem { + return { + id: session.friendlyId, + externalId: session.externalId, + type: session.type, + taskIdentifier: session.taskIdentifier, + tags: session.tags, + metadata: (session.metadata ?? null) as SessionItem["metadata"], + closedAt: session.closedAt, + closedReason: session.closedReason, + expiresAt: session.expiresAt, + createdAt: session.createdAt, + updatedAt: session.updatedAt, + }; +} diff --git a/apps/webapp/app/services/sessionsReplicationInstance.server.ts b/apps/webapp/app/services/sessionsReplicationInstance.server.ts new file mode 100644 index 00000000000..c6ed1b6b088 --- /dev/null +++ b/apps/webapp/app/services/sessionsReplicationInstance.server.ts @@ -0,0 +1,72 @@ +import { ClickHouse } from "@internal/clickhouse"; +import invariant from "tiny-invariant"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { meter, provider } from "~/v3/tracer.server"; +import { SessionsReplicationService } from "./sessionsReplicationService.server"; + +export const sessionsReplicationInstance = singleton( + "sessionsReplicationInstance", + initializeSessionsReplicationInstance +); + +function initializeSessionsReplicationInstance() { + const { DATABASE_URL } = process.env; + invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set"); + + if (!env.SESSION_REPLICATION_CLICKHOUSE_URL) { + console.log("🗃️ Sessions replication service not enabled"); + return; + } + + console.log("🗃️ Sessions replication service enabled"); + + const clickhouse = new ClickHouse({ + url: env.SESSION_REPLICATION_CLICKHOUSE_URL, + name: "sessions-replication", + keepAlive: { + enabled: env.SESSION_REPLICATION_KEEP_ALIVE_ENABLED === "1", + idleSocketTtl: env.SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, + }, + logLevel: env.SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL, + compression: { + request: true, + }, + maxOpenConnections: env.SESSION_REPLICATION_MAX_OPEN_CONNECTIONS, + }); + + const service = new SessionsReplicationService({ + clickhouse: clickhouse, + pgConnectionUrl: DATABASE_URL, + serviceName: "sessions-replication", + slotName: env.SESSION_REPLICATION_SLOT_NAME, + publicationName: env.SESSION_REPLICATION_PUBLICATION_NAME, + redisOptions: { + keyPrefix: "sessions-replication:", + port: env.RUN_REPLICATION_REDIS_PORT ?? undefined, + host: env.RUN_REPLICATION_REDIS_HOST ?? undefined, + username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined, + password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined, + enableAutoPipelining: true, + ...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }, + maxFlushConcurrency: env.SESSION_REPLICATION_MAX_FLUSH_CONCURRENCY, + flushIntervalMs: env.SESSION_REPLICATION_FLUSH_INTERVAL_MS, + flushBatchSize: env.SESSION_REPLICATION_FLUSH_BATCH_SIZE, + leaderLockTimeoutMs: env.SESSION_REPLICATION_LEADER_LOCK_TIMEOUT_MS, + leaderLockExtendIntervalMs: env.SESSION_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS, + leaderLockAcquireAdditionalTimeMs: env.SESSION_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS, + leaderLockRetryIntervalMs: env.SESSION_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS, + ackIntervalSeconds: env.SESSION_REPLICATION_ACK_INTERVAL_SECONDS, + logLevel: env.SESSION_REPLICATION_LOG_LEVEL, + waitForAsyncInsert: env.SESSION_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1", + tracer: provider.getTracer("sessions-replication-service"), + meter, + insertMaxRetries: env.SESSION_REPLICATION_INSERT_MAX_RETRIES, + insertBaseDelayMs: env.SESSION_REPLICATION_INSERT_BASE_DELAY_MS, + insertMaxDelayMs: env.SESSION_REPLICATION_INSERT_MAX_DELAY_MS, + insertStrategy: env.SESSION_REPLICATION_INSERT_STRATEGY, + }); + + return service; +} diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts new file mode 100644 index 00000000000..f7f384faffc --- /dev/null +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -0,0 +1,763 @@ +import type { ClickHouse, SessionInsertArray } from "@internal/clickhouse"; +import { getSessionField } from "@internal/clickhouse"; +import { type RedisOptions } from "@internal/redis"; +import { + LogicalReplicationClient, + type MessageDelete, + type MessageInsert, + type MessageUpdate, + type PgoutputMessage, +} from "@internal/replication"; +import { + getMeter, + recordSpanError, + startSpan, + trace, + type Counter, + type Histogram, + type Meter, + type Tracer, +} from "@internal/tracing"; +import { Logger, type LogLevel } from "@trigger.dev/core/logger"; +import { tryCatch } from "@trigger.dev/core/utils"; +import { type Session } from "@trigger.dev/database"; +import EventEmitter from "node:events"; +import { ConcurrentFlushScheduler } from "./runsReplicationService.server"; + +interface TransactionEvent { + tag: "insert" | "update" | "delete"; + data: T; + raw: MessageInsert | MessageUpdate | MessageDelete; +} + +interface Transaction { + beginStartTimestamp: number; + commitLsn: string | null; + commitEndLsn: string | null; + xid: number; + events: TransactionEvent[]; + replicationLagMs: number; +} + +export type SessionsReplicationServiceOptions = { + clickhouse: ClickHouse; + pgConnectionUrl: string; + serviceName: string; + slotName: string; + publicationName: string; + redisOptions: RedisOptions; + maxFlushConcurrency?: number; + flushIntervalMs?: number; + flushBatchSize?: number; + leaderLockTimeoutMs?: number; + leaderLockExtendIntervalMs?: number; + leaderLockAcquireAdditionalTimeMs?: number; + leaderLockRetryIntervalMs?: number; + ackIntervalSeconds?: number; + acknowledgeTimeoutMs?: number; + logger?: Logger; + logLevel?: LogLevel; + tracer?: Tracer; + meter?: Meter; + waitForAsyncInsert?: boolean; + insertStrategy?: "insert" | "insert_async"; + // Retry configuration for insert operations + insertMaxRetries?: number; + insertBaseDelayMs?: number; + insertMaxDelayMs?: number; +}; + +type SessionInsert = { + _version: bigint; + session: Session; + event: "insert" | "update" | "delete"; +}; + +export type SessionsReplicationServiceEvents = { + message: [{ lsn: string; message: PgoutputMessage; service: SessionsReplicationService }]; + batchFlushed: [{ flushId: string; sessionInserts: SessionInsertArray[] }]; +}; + +export class SessionsReplicationService { + private _isSubscribed = false; + private _currentTransaction: + | (Omit, "commitEndLsn" | "replicationLagMs"> & { + commitEndLsn?: string | null; + replicationLagMs?: number; + }) + | null = null; + + private _replicationClient: LogicalReplicationClient; + private _concurrentFlushScheduler: ConcurrentFlushScheduler; + private logger: Logger; + private _isShuttingDown = false; + private _isShutDownComplete = false; + private _tracer: Tracer; + private _meter: Meter; + private _currentParseDurationMs: number | null = null; + private _lastAcknowledgedAt: number | null = null; + private _acknowledgeTimeoutMs: number; + private _latestCommitEndLsn: string | null = null; + private _lastAcknowledgedLsn: string | null = null; + private _acknowledgeInterval: NodeJS.Timeout | null = null; + // Retry configuration + private _insertMaxRetries: number; + private _insertBaseDelayMs: number; + private _insertMaxDelayMs: number; + private _insertStrategy: "insert" | "insert_async"; + + // Metrics + private _replicationLagHistogram: Histogram; + private _batchesFlushedCounter: Counter; + private _batchSizeHistogram: Histogram; + private _sessionsInsertedCounter: Counter; + private _insertRetriesCounter: Counter; + private _eventsProcessedCounter: Counter; + private _flushDurationHistogram: Histogram; + + public readonly events: EventEmitter; + + constructor(private readonly options: SessionsReplicationServiceOptions) { + this.logger = + options.logger ?? new Logger("SessionsReplicationService", options.logLevel ?? "info"); + this.events = new EventEmitter(); + this._tracer = options.tracer ?? trace.getTracer("sessions-replication-service"); + this._meter = options.meter ?? getMeter("sessions-replication"); + + // Initialize metrics + this._replicationLagHistogram = this._meter.createHistogram( + "sessions_replication.replication_lag_ms", + { + description: "Replication lag from Postgres commit to processing", + unit: "ms", + } + ); + + this._batchesFlushedCounter = this._meter.createCounter( + "sessions_replication.batches_flushed", + { + description: "Total batches flushed to ClickHouse", + } + ); + + this._batchSizeHistogram = this._meter.createHistogram("sessions_replication.batch_size", { + description: "Number of items per batch flush", + unit: "items", + }); + + this._sessionsInsertedCounter = this._meter.createCounter( + "sessions_replication.sessions_inserted", + { + description: "Session inserts to ClickHouse", + unit: "inserts", + } + ); + + this._insertRetriesCounter = this._meter.createCounter("sessions_replication.insert_retries", { + description: "Insert retry attempts", + }); + + this._eventsProcessedCounter = this._meter.createCounter( + "sessions_replication.events_processed", + { + description: "Replication events processed (inserts, updates, deletes)", + } + ); + + this._flushDurationHistogram = this._meter.createHistogram( + "sessions_replication.flush_duration_ms", + { + description: "Duration of batch flush operations", + unit: "ms", + } + ); + + this._acknowledgeTimeoutMs = options.acknowledgeTimeoutMs ?? 1_000; + + this._insertStrategy = options.insertStrategy ?? "insert"; + + this._replicationClient = new LogicalReplicationClient({ + pgConfig: { + connectionString: options.pgConnectionUrl, + }, + name: options.serviceName, + slotName: options.slotName, + publicationName: options.publicationName, + table: "Session", + redisOptions: options.redisOptions, + autoAcknowledge: false, + publicationActions: ["insert", "update", "delete"], + logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"), + leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000, + leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000, + ackIntervalSeconds: options.ackIntervalSeconds ?? 10, + leaderLockAcquireAdditionalTimeMs: options.leaderLockAcquireAdditionalTimeMs ?? 10_000, + leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500, + tracer: options.tracer, + }); + + this._concurrentFlushScheduler = new ConcurrentFlushScheduler({ + batchSize: options.flushBatchSize ?? 50, + flushInterval: options.flushIntervalMs ?? 100, + maxConcurrency: options.maxFlushConcurrency ?? 100, + callback: this.#flushBatch.bind(this), + // Key-based deduplication to reduce duplicates sent to ClickHouse + getKey: (item) => { + if (!item?.session?.id) { + this.logger.warn("Skipping replication event with null session", { event: item }); + return null; + } + return `${item.event}_${item.session.id}`; + }, + // Keep the session with the higher version (latest) + // and take the last occurrence for that version. + // Items originating from the same DB transaction have the same version. + shouldReplace: (existing, incoming) => incoming._version >= existing._version, + logger: new Logger("ConcurrentFlushScheduler", options.logLevel ?? "info"), + tracer: options.tracer, + }); + + this._replicationClient.events.on("data", async ({ lsn, log, parseDuration }) => { + this.#handleData(lsn, log, parseDuration); + }); + + this._replicationClient.events.on("heartbeat", async ({ lsn, shouldRespond }) => { + if (this._isShuttingDown) return; + if (this._isShutDownComplete) return; + + if (shouldRespond) { + this._lastAcknowledgedLsn = lsn; + await this._replicationClient.acknowledge(lsn); + } + }); + + this._replicationClient.events.on("error", (error) => { + this.logger.error("Replication client error", { + error, + }); + }); + + this._replicationClient.events.on("start", () => { + this.logger.info("Replication client started"); + }); + + this._replicationClient.events.on("acknowledge", ({ lsn }) => { + this.logger.debug("Acknowledged", { lsn }); + }); + + this._replicationClient.events.on("leaderElection", (isLeader) => { + this.logger.info("Leader election", { isLeader }); + }); + + // Initialize retry configuration + this._insertMaxRetries = options.insertMaxRetries ?? 3; + this._insertBaseDelayMs = options.insertBaseDelayMs ?? 100; + this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000; + } + + public async shutdown() { + if (this._isShuttingDown) return; + + this._isShuttingDown = true; + + this.logger.info("Initiating shutdown of sessions replication service"); + + if (!this._currentTransaction) { + this.logger.info("No transaction to commit, shutting down immediately"); + await this._replicationClient.stop(); + this._isSubscribed = false; + this._isShutDownComplete = true; + return; + } + + this._concurrentFlushScheduler.shutdown(); + } + + async start() { + if (this._isSubscribed) { + this.logger.debug("Replication client already started, skipping start"); + return; + } + + this.logger.info("Starting replication client", { + lastLsn: this._latestCommitEndLsn, + }); + + await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + + this._acknowledgeInterval = setInterval(this.#acknowledgeLatestTransaction.bind(this), 1000); + this._concurrentFlushScheduler.start(); + this._isSubscribed = true; + } + + async stop() { + this.logger.info("Stopping replication client"); + + await this._replicationClient.stop(); + + if (this._acknowledgeInterval) { + clearInterval(this._acknowledgeInterval); + this._acknowledgeInterval = null; + } + + this._isSubscribed = false; + } + + async teardown() { + this.logger.info("Teardown replication client"); + + await this._replicationClient.teardown(); + + if (this._acknowledgeInterval) { + clearInterval(this._acknowledgeInterval); + this._acknowledgeInterval = null; + } + + this._isSubscribed = false; + } + + #handleData(lsn: string, message: PgoutputMessage, parseDuration: bigint) { + this.logger.debug("Handling data", { + lsn, + tag: message.tag, + parseDuration, + }); + + this.events.emit("message", { lsn, message, service: this }); + + switch (message.tag) { + case "begin": { + if (this._isShuttingDown || this._isShutDownComplete) { + return; + } + + this._currentTransaction = { + beginStartTimestamp: Date.now(), + commitLsn: message.commitLsn, + xid: message.xid, + events: [], + }; + + this._currentParseDurationMs = Number(parseDuration) / 1_000_000; + + break; + } + case "insert": { + if (!this._currentTransaction) { + return; + } + + if (this._currentParseDurationMs) { + this._currentParseDurationMs = + this._currentParseDurationMs + Number(parseDuration) / 1_000_000; + } + + this._currentTransaction.events.push({ + tag: message.tag, + data: message.new as Session, + raw: message, + }); + break; + } + case "update": { + if (!this._currentTransaction) { + return; + } + + if (this._currentParseDurationMs) { + this._currentParseDurationMs = + this._currentParseDurationMs + Number(parseDuration) / 1_000_000; + } + + this._currentTransaction.events.push({ + tag: message.tag, + data: message.new as Session, + raw: message, + }); + break; + } + case "delete": { + if (!this._currentTransaction) { + return; + } + + if (this._currentParseDurationMs) { + this._currentParseDurationMs = + this._currentParseDurationMs + Number(parseDuration) / 1_000_000; + } + + this._currentTransaction.events.push({ + tag: message.tag, + data: message.old as Session, + raw: message, + }); + + break; + } + case "commit": { + if (!this._currentTransaction) { + return; + } + + if (this._currentParseDurationMs) { + this._currentParseDurationMs = + this._currentParseDurationMs + Number(parseDuration) / 1_000_000; + } + + const replicationLagMs = Date.now() - Number(message.commitTime / 1000n); + this._currentTransaction.commitEndLsn = message.commitEndLsn; + this._currentTransaction.replicationLagMs = replicationLagMs; + const transaction = this._currentTransaction as Transaction; + this._currentTransaction = null; + + if (transaction.commitEndLsn) { + this._latestCommitEndLsn = transaction.commitEndLsn; + } + + this.#handleTransaction(transaction); + break; + } + default: { + this.logger.debug("Unknown message tag", { + pgMessage: message, + }); + } + } + } + + #handleTransaction(transaction: Transaction) { + if (this._isShutDownComplete) return; + + if (this._isShuttingDown) { + this._replicationClient.stop().finally(() => { + this._isSubscribed = false; + this._isShutDownComplete = true; + }); + } + + // If there are no events, do nothing + if (transaction.events.length === 0) { + return; + } + + if (!transaction.commitEndLsn) { + this.logger.error("Transaction has no commit end lsn", { + transaction, + }); + + return; + } + + const lsnToUInt64Start = process.hrtime.bigint(); + + // If there are events, we need to handle them + const _version = lsnToUInt64(transaction.commitEndLsn); + + const lsnToUInt64DurationMs = Number(process.hrtime.bigint() - lsnToUInt64Start) / 1_000_000; + + this._concurrentFlushScheduler.addToBatch( + transaction.events.map((event) => ({ + _version, + session: event.data, + event: event.tag, + })) + ); + + // Record metrics + this._replicationLagHistogram.record(transaction.replicationLagMs); + + // Count events by type + for (const event of transaction.events) { + this._eventsProcessedCounter.add(1, { event_type: event.tag }); + } + + this.logger.debug("handle_transaction", { + transaction: { + xid: transaction.xid, + commitLsn: transaction.commitLsn, + commitEndLsn: transaction.commitEndLsn, + events: transaction.events.length, + parseDurationMs: this._currentParseDurationMs, + lsnToUInt64DurationMs, + version: _version.toString(), + }, + }); + } + + async #acknowledgeLatestTransaction() { + if (!this._latestCommitEndLsn) { + return; + } + + if (this._lastAcknowledgedLsn === this._latestCommitEndLsn) { + return; + } + + const now = Date.now(); + + if (this._lastAcknowledgedAt) { + const timeSinceLastAcknowledged = now - this._lastAcknowledgedAt; + // If we've already acknowledged within the last second, don't acknowledge again + if (timeSinceLastAcknowledged < this._acknowledgeTimeoutMs) { + return; + } + } + + this._lastAcknowledgedAt = now; + this._lastAcknowledgedLsn = this._latestCommitEndLsn; + + this.logger.debug("acknowledge_latest_transaction", { + commitEndLsn: this._latestCommitEndLsn, + lastAcknowledgedAt: this._lastAcknowledgedAt, + }); + + const [ackError] = await tryCatch( + this._replicationClient.acknowledge(this._latestCommitEndLsn) + ); + + if (ackError) { + this.logger.error("Error acknowledging transaction", { ackError }); + } + + if (this._isShutDownComplete && this._acknowledgeInterval) { + clearInterval(this._acknowledgeInterval); + } + } + + async #flushBatch(flushId: string, batch: Array) { + if (batch.length === 0) { + return; + } + + this.logger.debug("Flushing batch", { + flushId, + batchSize: batch.length, + }); + + const flushStartTime = performance.now(); + + await startSpan(this._tracer, "flushBatch", async (span) => { + const sessionInserts = batch + .map((item) => toSessionInsertArray(item.session, item._version, item.event === "delete")) + // batch inserts in clickhouse are more performant if the items + // are pre-sorted by the primary key + .sort((a, b) => { + const aOrgId = getSessionField(a, "organization_id"); + const bOrgId = getSessionField(b, "organization_id"); + if (aOrgId !== bOrgId) { + return aOrgId < bOrgId ? -1 : 1; + } + const aProjId = getSessionField(a, "project_id"); + const bProjId = getSessionField(b, "project_id"); + if (aProjId !== bProjId) { + return aProjId < bProjId ? -1 : 1; + } + const aEnvId = getSessionField(a, "environment_id"); + const bEnvId = getSessionField(b, "environment_id"); + if (aEnvId !== bEnvId) { + return aEnvId < bEnvId ? -1 : 1; + } + const aCreatedAt = getSessionField(a, "created_at"); + const bCreatedAt = getSessionField(b, "created_at"); + if (aCreatedAt !== bCreatedAt) { + return aCreatedAt - bCreatedAt; + } + const aSessionId = getSessionField(a, "session_id"); + const bSessionId = getSessionField(b, "session_id"); + if (aSessionId === bSessionId) return 0; + return aSessionId < bSessionId ? -1 : 1; + }); + + span.setAttribute("session_inserts", sessionInserts.length); + + this.logger.debug("Flushing inserts", { + flushId, + sessionInserts: sessionInserts.length, + }); + + const [sessionError, sessionResult] = await this.#insertWithRetry( + (attempt) => this.#insertSessionInserts(sessionInserts, attempt), + "session inserts", + flushId + ); + + if (sessionError) { + this.logger.error("Error inserting session inserts", { + error: sessionError, + flushId, + }); + recordSpanError(span, sessionError); + } + + this.logger.debug("Flushed inserts", { + flushId, + sessionInserts: sessionInserts.length, + }); + + this.events.emit("batchFlushed", { flushId, sessionInserts }); + + // Record metrics + const flushDurationMs = performance.now() - flushStartTime; + const hasErrors = sessionError !== null; + + this._batchSizeHistogram.record(batch.length); + this._flushDurationHistogram.record(flushDurationMs); + this._batchesFlushedCounter.add(1, { success: !hasErrors }); + + if (!sessionError) { + this._sessionsInsertedCounter.add(sessionInserts.length); + } + }); + } + + // New method to handle inserts with retry logic for connection errors + async #insertWithRetry( + insertFn: (attempt: number) => Promise, + operationName: string, + flushId: string + ): Promise<[Error | null, T | null]> { + let lastError: Error | null = null; + + for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) { + try { + const result = await insertFn(attempt); + return [null, result]; + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)); + + // Check if this is a retryable error + if (this.#isRetryableError(lastError)) { + const delay = this.#calculateRetryDelay(attempt); + + this.logger.warn(`Retrying SessionsReplication insert due to error`, { + operationName, + flushId, + attempt, + maxRetries: this._insertMaxRetries, + error: lastError.message, + delay, + }); + + // Record retry metric + this._insertRetriesCounter.add(1, { operation: "sessions" }); + + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + break; + } + } + + return [lastError, null]; + } + + // Retry all errors except known permanent ones + #isRetryableError(error: Error): boolean { + const errorMessage = error.message.toLowerCase(); + + // Permanent errors that should NOT be retried + const permanentErrorPatterns = [ + "authentication failed", + "permission denied", + "invalid credentials", + "table not found", + "database not found", + "column not found", + "schema mismatch", + "invalid query", + "syntax error", + "type error", + "constraint violation", + "duplicate key", + "foreign key violation", + ]; + + // If it's a known permanent error, don't retry + if (permanentErrorPatterns.some((pattern) => errorMessage.includes(pattern))) { + return false; + } + + // Retry everything else + return true; + } + + #calculateRetryDelay(attempt: number): number { + // Exponential backoff: baseDelay, baseDelay*2, baseDelay*4, etc. + const delay = Math.min( + this._insertBaseDelayMs * Math.pow(2, attempt - 1), + this._insertMaxDelayMs + ); + + // Add some jitter to prevent thundering herd + const jitter = Math.random() * 100; + return delay + jitter; + } + + #getClickhouseInsertSettings() { + if (this._insertStrategy === "insert") { + return {}; + } + + return { + async_insert: 1 as const, + async_insert_max_data_size: "1000000", + async_insert_busy_timeout_ms: 1000, + wait_for_async_insert: this.options.waitForAsyncInsert ? (1 as const) : (0 as const), + }; + } + + async #insertSessionInserts(sessionInserts: SessionInsertArray[], attempt: number) { + return await startSpan(this._tracer, "insertSessionInserts", async (span) => { + const [insertError, insertResult] = + await this.options.clickhouse.sessions.insertCompactArrays(sessionInserts, { + params: { + clickhouse_settings: this.#getClickhouseInsertSettings(), + }, + }); + + if (insertError) { + this.logger.error("Error inserting session inserts attempt", { + error: insertError, + attempt, + }); + + recordSpanError(span, insertError); + throw insertError; + } + + return insertResult; + }); + } +} + +function toSessionInsertArray( + session: Session, + version: bigint, + isDeleted: boolean +): SessionInsertArray { + return [ + session.runtimeEnvironmentId, + session.organizationId, + session.projectId, + session.id, + session.environmentType, + session.friendlyId, + session.externalId ?? "", + session.type, + session.taskIdentifier ?? "", + session.tags ?? [], + { data: session.metadata ?? null }, + session.closedAt ? session.closedAt.getTime() : null, + session.closedReason ?? "", + session.expiresAt ? session.expiresAt.getTime() : null, + session.createdAt.getTime(), + session.updatedAt.getTime(), + version.toString(), + isDeleted ? 1 : 0, + ]; +} + +function lsnToUInt64(lsn: string): bigint { + const [seg, off] = lsn.split("/"); + return (BigInt("0x" + seg) << 32n) | BigInt("0x" + off); +} diff --git a/apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts b/apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts new file mode 100644 index 00000000000..79fb06ac933 --- /dev/null +++ b/apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts @@ -0,0 +1,252 @@ +import { type ClickhouseQueryBuilder } from "@internal/clickhouse"; +import parseDuration from "parse-duration"; +import { + convertSessionListInputOptionsToFilterOptions, + type FilterSessionsOptions, + type ISessionsRepository, + type ListSessionsOptions, + type SessionListInputOptions, + type SessionTagListOptions, + type SessionsRepositoryOptions, +} from "./sessionsRepository.server"; + +export class ClickHouseSessionsRepository implements ISessionsRepository { + constructor(private readonly options: SessionsRepositoryOptions) {} + + get name() { + return "clickhouse"; + } + + async listSessionIds(options: ListSessionsOptions): Promise { + const queryBuilder = this.options.clickhouse.sessions.queryBuilder(); + applySessionFiltersToQueryBuilder( + queryBuilder, + convertSessionListInputOptionsToFilterOptions(options) + ); + + if (options.page.cursor) { + if (options.page.direction === "forward" || !options.page.direction) { + queryBuilder + .where("session_id < {sessionId: String}", { sessionId: options.page.cursor }) + .orderBy("created_at DESC, session_id DESC") + .limit(options.page.size + 1); + } else { + queryBuilder + .where("session_id > {sessionId: String}", { sessionId: options.page.cursor }) + .orderBy("created_at ASC, session_id ASC") + .limit(options.page.size + 1); + } + } else { + queryBuilder.orderBy("created_at DESC, session_id DESC").limit(options.page.size + 1); + } + + const [queryError, result] = await queryBuilder.execute(); + if (queryError) throw queryError; + + return result.map((row) => row.session_id); + } + + async listSessions(options: ListSessionsOptions) { + const sessionIds = await this.listSessionIds(options); + const hasMore = sessionIds.length > options.page.size; + + let nextCursor: string | null = null; + let previousCursor: string | null = null; + + const direction = options.page.direction ?? "forward"; + switch (direction) { + case "forward": { + previousCursor = options.page.cursor ? sessionIds.at(0) ?? null : null; + if (hasMore) { + nextCursor = sessionIds[options.page.size - 1]; + } + break; + } + case "backward": { + const reversed = [...sessionIds].reverse(); + if (hasMore) { + previousCursor = reversed.at(1) ?? null; + nextCursor = reversed.at(options.page.size) ?? null; + } else { + nextCursor = reversed.at(options.page.size - 1) ?? null; + } + break; + } + } + + const idsToReturn = + options.page.direction === "backward" && hasMore + ? sessionIds.slice(1, options.page.size + 1) + : sessionIds.slice(0, options.page.size); + + let sessions = await this.options.prisma.session.findMany({ + where: { + id: { in: idsToReturn }, + runtimeEnvironmentId: options.environmentId, + }, + orderBy: { createdAt: "desc" }, + select: { + id: true, + friendlyId: true, + externalId: true, + type: true, + taskIdentifier: true, + tags: true, + metadata: true, + closedAt: true, + closedReason: true, + expiresAt: true, + createdAt: true, + updatedAt: true, + runtimeEnvironmentId: true, + }, + }); + + // ClickHouse is slightly delayed; narrow by derived status in-memory to + // catch recent Postgres writes that haven't replicated yet. + if (options.statuses && options.statuses.length > 0) { + const wanted = new Set(options.statuses); + const now = Date.now(); + sessions = sessions.filter((s) => { + const status = + s.closedAt != null + ? "CLOSED" + : s.expiresAt != null && s.expiresAt.getTime() < now + ? "EXPIRED" + : "ACTIVE"; + return wanted.has(status); + }); + } + + return { + sessions, + pagination: { nextCursor, previousCursor }, + }; + } + + async countSessions(options: SessionListInputOptions): Promise { + const queryBuilder = this.options.clickhouse.sessions.countQueryBuilder(); + applySessionFiltersToQueryBuilder( + queryBuilder, + convertSessionListInputOptionsToFilterOptions(options) + ); + + const [queryError, result] = await queryBuilder.execute(); + if (queryError) throw queryError; + + if (result.length === 0) { + throw new Error("No count rows returned"); + } + return result[0].count; + } + + async listTags(options: SessionTagListOptions) { + const queryBuilder = this.options.clickhouse.sessions + .tagQueryBuilder() + .where("organization_id = {organizationId: String}", { + organizationId: options.organizationId, + }) + .where("project_id = {projectId: String}", { projectId: options.projectId }) + .where("environment_id = {environmentId: String}", { + environmentId: options.environmentId, + }); + + const periodMs = options.period ? parseDuration(options.period) ?? undefined : undefined; + if (periodMs) { + queryBuilder.where("created_at >= fromUnixTimestamp64Milli({period: Int64})", { + period: new Date(Date.now() - periodMs).getTime(), + }); + } + + if (options.from) { + queryBuilder.where("created_at >= fromUnixTimestamp64Milli({from: Int64})", { + from: options.from, + }); + } + + if (options.to) { + queryBuilder.where("created_at <= fromUnixTimestamp64Milli({to: Int64})", { + to: options.to, + }); + } + + if (options.query && options.query.trim().length > 0) { + queryBuilder.where("positionCaseInsensitiveUTF8(tag, {query: String}) > 0", { + query: options.query, + }); + } + + queryBuilder.orderBy("tag ASC").limit(options.limit); + + const [queryError, result] = await queryBuilder.execute(); + if (queryError) throw queryError; + + return { tags: result.map((row) => row.tag) }; + } +} + +function applySessionFiltersToQueryBuilder( + queryBuilder: ClickhouseQueryBuilder, + options: FilterSessionsOptions +) { + queryBuilder + .where("organization_id = {organizationId: String}", { + organizationId: options.organizationId, + }) + .where("project_id = {projectId: String}", { projectId: options.projectId }) + .where("environment_id = {environmentId: String}", { environmentId: options.environmentId }); + + if (options.types && options.types.length > 0) { + queryBuilder.where("type IN {types: Array(String)}", { types: options.types }); + } + + if (options.tags && options.tags.length > 0) { + queryBuilder.where("hasAny(tags, {tags: Array(String)})", { tags: options.tags }); + } + + if (options.taskIdentifiers && options.taskIdentifiers.length > 0) { + queryBuilder.where("task_identifier IN {taskIdentifiers: Array(String)}", { + taskIdentifiers: options.taskIdentifiers, + }); + } + + if (options.externalId) { + queryBuilder.where("external_id = {externalId: String}", { externalId: options.externalId }); + } + + if (options.statuses && options.statuses.length > 0) { + const conditions: string[] = []; + if (options.statuses.includes("ACTIVE")) { + conditions.push( + "(closed_at IS NULL AND (expires_at IS NULL OR expires_at > now64(3)))" + ); + } + if (options.statuses.includes("CLOSED")) { + conditions.push("closed_at IS NOT NULL"); + } + if (options.statuses.includes("EXPIRED")) { + conditions.push("(closed_at IS NULL AND expires_at IS NOT NULL AND expires_at <= now64(3))"); + } + if (conditions.length > 0) { + queryBuilder.where(`(${conditions.join(" OR ")})`); + } + } + + if (options.period) { + queryBuilder.where("created_at >= fromUnixTimestamp64Milli({period: Int64})", { + period: new Date(Date.now() - options.period).getTime(), + }); + } + + if (options.from) { + queryBuilder.where("created_at >= fromUnixTimestamp64Milli({from: Int64})", { + from: options.from, + }); + } + + if (options.to) { + queryBuilder.where("created_at <= fromUnixTimestamp64Milli({to: Int64})", { + to: options.to, + }); + } +} diff --git a/apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts b/apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts new file mode 100644 index 00000000000..cb4ebb48b6f --- /dev/null +++ b/apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts @@ -0,0 +1,198 @@ +import { type ClickHouse } from "@internal/clickhouse"; +import { type Tracer } from "@internal/tracing"; +import { type Logger, type LogLevel } from "@trigger.dev/core/logger"; +import { type Prisma } from "@trigger.dev/database"; +import parseDuration from "parse-duration"; +import { z } from "zod"; +import { type PrismaClientOrTransaction } from "~/db.server"; +import { startActiveSpan } from "~/v3/tracer.server"; +import { ClickHouseSessionsRepository } from "./clickhouseSessionsRepository.server"; + +export type SessionsRepositoryOptions = { + clickhouse: ClickHouse; + prisma: PrismaClientOrTransaction; + logger?: Logger; + logLevel?: LogLevel; + tracer?: Tracer; +}; + +/** + * Derived status values — `Session` rows don't have a stored status column. + * `ACTIVE` is the base state; `CLOSED` means `closedAt` is set; `EXPIRED` + * means `expiresAt` has passed. + */ +export const SessionStatus = z.enum(["ACTIVE", "CLOSED", "EXPIRED"]); +export type SessionStatus = z.infer; + +const SessionListInputOptionsSchema = z.object({ + organizationId: z.string(), + projectId: z.string(), + environmentId: z.string(), + // filters + types: z.array(z.string()).optional(), + tags: z.array(z.string()).optional(), + taskIdentifiers: z.array(z.string()).optional(), + externalId: z.string().optional(), + statuses: z.array(SessionStatus).optional(), + period: z.string().optional(), + from: z.number().optional(), + to: z.number().optional(), +}); + +export type SessionListInputOptions = z.infer; +export type SessionListInputFilters = Omit< + SessionListInputOptions, + "organizationId" | "projectId" | "environmentId" +>; + +export type FilterSessionsOptions = Omit & { + /** period converted to milliseconds duration */ + period: number | undefined; +}; + +type Pagination = { + page: { + size: number; + cursor?: string; + direction?: "forward" | "backward"; + }; +}; + +export type ListSessionsOptions = SessionListInputOptions & Pagination; + +type OffsetPagination = { + offset: number; + limit: number; +}; + +export type SessionTagListOptions = { + organizationId: string; + projectId: string; + environmentId: string; + period?: string; + from?: number; + to?: number; + /** Case-insensitive substring match on the tag name */ + query?: string; +} & OffsetPagination; + +export type SessionTagList = { + tags: string[]; +}; + +export type ListedSession = Prisma.SessionGetPayload<{ + select: { + id: true; + friendlyId: true; + externalId: true; + type: true; + taskIdentifier: true; + tags: true; + metadata: true; + closedAt: true; + closedReason: true; + expiresAt: true; + createdAt: true; + updatedAt: true; + runtimeEnvironmentId: true; + }; +}>; + +export interface ISessionsRepository { + name: string; + listSessionIds(options: ListSessionsOptions): Promise; + listSessions(options: ListSessionsOptions): Promise<{ + sessions: ListedSession[]; + pagination: { + nextCursor: string | null; + previousCursor: string | null; + }; + }>; + countSessions(options: SessionListInputOptions): Promise; + listTags(options: SessionTagListOptions): Promise; +} + +export class SessionsRepository implements ISessionsRepository { + private readonly clickHouseSessionsRepository: ClickHouseSessionsRepository; + + constructor(private readonly options: SessionsRepositoryOptions) { + this.clickHouseSessionsRepository = new ClickHouseSessionsRepository(options); + } + + get name() { + return "sessionsRepository"; + } + + async listSessionIds(options: ListSessionsOptions): Promise { + return startActiveSpan( + "sessionsRepository.listSessionIds", + async () => this.clickHouseSessionsRepository.listSessionIds(options), + { + attributes: { + "repository.name": "clickhouse", + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + + async listSessions(options: ListSessionsOptions) { + return startActiveSpan( + "sessionsRepository.listSessions", + async () => this.clickHouseSessionsRepository.listSessions(options), + { + attributes: { + "repository.name": "clickhouse", + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + + async countSessions(options: SessionListInputOptions) { + return startActiveSpan( + "sessionsRepository.countSessions", + async () => this.clickHouseSessionsRepository.countSessions(options), + { + attributes: { + "repository.name": "clickhouse", + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } + + async listTags(options: SessionTagListOptions) { + return startActiveSpan( + "sessionsRepository.listTags", + async () => this.clickHouseSessionsRepository.listTags(options), + { + attributes: { + "repository.name": "clickhouse", + organizationId: options.organizationId, + projectId: options.projectId, + environmentId: options.environmentId, + }, + } + ); + } +} + +export function parseSessionListInputOptions(data: unknown): SessionListInputOptions { + return SessionListInputOptionsSchema.parse(data); +} + +export function convertSessionListInputOptionsToFilterOptions( + options: SessionListInputOptions +): FilterSessionsOptions { + return { + ...options, + period: options.period ? parseDuration(options.period) ?? undefined : undefined, + }; +} diff --git a/apps/webapp/app/v3/services/adminWorker.server.ts b/apps/webapp/app/v3/services/adminWorker.server.ts index 97c94b954f0..2e4d1b066cb 100644 --- a/apps/webapp/app/v3/services/adminWorker.server.ts +++ b/apps/webapp/app/v3/services/adminWorker.server.ts @@ -4,6 +4,12 @@ import { z } from "zod"; import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; import { runsReplicationInstance } from "~/services/runsReplicationInstance.server"; +// Reference-hold the sessions-replication singleton so module evaluation runs +// its initializer (creates the ClickHouse client, subscribes to the logical +// replication slot, wires signal handlers) when the webapp boots. A bare +// side-effect import gets tree-shaken by the bundler. +import { sessionsReplicationInstance } from "~/services/sessionsReplicationInstance.server"; +void sessionsReplicationInstance; import { singleton } from "~/utils/singleton"; import { tracer } from "../tracer.server"; import { $replica } from "~/db.server"; diff --git a/apps/webapp/test/sessionsReplicationService.test.ts b/apps/webapp/test/sessionsReplicationService.test.ts new file mode 100644 index 00000000000..f6d8d4ba8b1 --- /dev/null +++ b/apps/webapp/test/sessionsReplicationService.test.ts @@ -0,0 +1,205 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { containerTest } from "@internal/testcontainers"; +import { setTimeout } from "node:timers/promises"; +import { z } from "zod"; +import { SessionsReplicationService } from "~/services/sessionsReplicationService.server"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("SessionsReplicationService", () => { + containerTest( + "replicates an insert from Postgres Session → ClickHouse sessions_v1", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + // Logical replication needs full-row images for DELETE events. + await prisma.$executeRawUnsafe(`ALTER TABLE public."Session" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "sessions-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new SessionsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "sessions-replication", + slotName: "sessions_to_clickhouse_v1", + publicationName: "sessions_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + }); + + await service.start(); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + const session = await prisma.session.create({ + data: { + id: "session_test_insert_1", + friendlyId: "session_abc123", + externalId: "my-test-session", + type: "chat.agent", + projectId: project.id, + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + taskIdentifier: "my-agent", + tags: ["user:42", "plan:pro"], + metadata: { plan: "pro", seats: 3 }, + }, + }); + + // Allow the replication pipeline to flush + await setTimeout(2000); + + const querySessions = clickhouse.reader.query({ + name: "read-sessions", + query: "SELECT * FROM trigger_dev.sessions_v1 FINAL", + schema: z.any(), + }); + + const [queryError, result] = await querySessions({}); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(1); + expect(result?.[0]).toEqual( + expect.objectContaining({ + session_id: session.id, + friendly_id: session.friendlyId, + external_id: "my-test-session", + type: "chat.agent", + project_id: project.id, + environment_id: environment.id, + organization_id: organization.id, + environment_type: "DEVELOPMENT", + task_identifier: "my-agent", + tags: ["user:42", "plan:pro"], + _is_deleted: 0, + }) + ); + + await service.stop(); + } + ); + + containerTest( + "replicates an update (close) from Postgres → ClickHouse", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."Session" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "sessions-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new SessionsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "sessions-replication", + slotName: "sessions_to_clickhouse_v1", + publicationName: "sessions_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + }); + + await service.start(); + + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + const created = await prisma.session.create({ + data: { + id: "session_test_update_1", + friendlyId: "session_update1", + type: "chat.agent", + projectId: project.id, + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + }, + }); + + await setTimeout(1000); + + await prisma.session.update({ + where: { id: created.id }, + data: { closedAt: new Date(), closedReason: "test-close" }, + }); + + await setTimeout(2000); + + const querySessions = clickhouse.reader.query({ + name: "read-sessions-closed", + query: "SELECT closed_reason, closed_at FROM trigger_dev.sessions_v1 FINAL", + schema: z.any(), + }); + + const [queryError, result] = await querySessions({}); + + expect(queryError).toBeNull(); + expect(result?.length).toBe(1); + expect(result?.[0].closed_reason).toBe("test-close"); + expect(result?.[0].closed_at).toBeDefined(); + + await service.stop(); + } + ); +}); diff --git a/internal-packages/clickhouse/schema/030_create_sessions_v1.sql b/internal-packages/clickhouse/schema/030_create_sessions_v1.sql new file mode 100644 index 00000000000..f575953ea80 --- /dev/null +++ b/internal-packages/clickhouse/schema/030_create_sessions_v1.sql @@ -0,0 +1,42 @@ +-- +goose Up + +CREATE TABLE trigger_dev.sessions_v1 +( + /* ─── identity ─────────────────────────────────────────────── */ + environment_id String, + organization_id String, + project_id String, + session_id String, + + environment_type LowCardinality(String), + friendly_id String, + external_id String DEFAULT '', + + /* ─── type discriminator ──────────────────────────────────── */ + type LowCardinality(String), + task_identifier String DEFAULT '', + + /* ─── filtering / free-form ──────────────────────────────── */ + tags Array(String) CODEC(ZSTD(1)), + metadata JSON(max_dynamic_paths = 256), + + /* ─── terminal markers ────────────────────────────────────── */ + closed_at Nullable(DateTime64(3)), + closed_reason String DEFAULT '', + expires_at Nullable(DateTime64(3)), + + /* ─── timing ─────────────────────────────────────────────── */ + created_at DateTime64(3), + updated_at DateTime64(3), + + /* ─── commit lsn ────────────────────────────────────────── */ + _version UInt64, + _is_deleted UInt8 DEFAULT 0 +) +ENGINE = ReplacingMergeTree(_version, _is_deleted) +PARTITION BY toYYYYMM(created_at) +ORDER BY (organization_id, project_id, environment_id, created_at, session_id) +SETTINGS enable_json_type = 1; + +-- +goose Down +DROP TABLE IF EXISTS trigger_dev.sessions_v1; diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index c6b8858fa9c..45f0fa485a7 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -28,6 +28,12 @@ import { } from "./taskEvents.js"; import { insertMetrics } from "./metrics.js"; import { insertLlmMetrics } from "./llmMetrics.js"; +import { + getSessionTagsQueryBuilder, + getSessionsCountQueryBuilder, + getSessionsQueryBuilder, + insertSessionsCompactArrays, +} from "./sessions.js"; import { getGlobalModelMetrics, getGlobalModelComparison, @@ -57,6 +63,7 @@ export type * from "./metrics.js"; export type * from "./llmMetrics.js"; export type * from "./llmModelAggregates.js"; export type * from "./errors.js"; +export type * from "./sessions.js"; export type * from "./client/queryBuilder.js"; // Re-export column constants, indices, and type-safe accessors @@ -69,6 +76,8 @@ export { getPayloadField, } from "./taskRuns.js"; +export { SESSION_COLUMNS, SESSION_INDEX, getSessionField } from "./sessions.js"; + // TSQL query execution export { executeTSQL, @@ -251,6 +260,15 @@ export class ClickHouse { }; } + get sessions() { + return { + insertCompactArrays: insertSessionsCompactArrays(this.writer), + queryBuilder: getSessionsQueryBuilder(this.reader), + countQueryBuilder: getSessionsCountQueryBuilder(this.reader), + tagQueryBuilder: getSessionTagsQueryBuilder(this.reader), + }; + } + get taskEventsV2() { return { insert: insertTaskEventsV2(this.writer), diff --git a/internal-packages/clickhouse/src/sessions.ts b/internal-packages/clickhouse/src/sessions.ts new file mode 100644 index 00000000000..567fe65511e --- /dev/null +++ b/internal-packages/clickhouse/src/sessions.ts @@ -0,0 +1,184 @@ +import { ClickHouseSettings } from "@clickhouse/client"; +import { z } from "zod"; +import { ClickhouseReader, ClickhouseWriter } from "./client/types.js"; + +export const SessionV1 = z.object({ + environment_id: z.string(), + organization_id: z.string(), + project_id: z.string(), + session_id: z.string(), + environment_type: z.string(), + friendly_id: z.string(), + external_id: z.string().default(""), + type: z.string(), + task_identifier: z.string().default(""), + tags: z.array(z.string()).default([]), + metadata: z.unknown(), + closed_at: z.number().int().nullish(), + closed_reason: z.string().default(""), + expires_at: z.number().int().nullish(), + created_at: z.number().int(), + updated_at: z.number().int(), + _version: z.string(), + _is_deleted: z.number().int().default(0), +}); + +export type SessionV1 = z.input; + +// Column order for compact format - must match ClickHouse table schema +export const SESSION_COLUMNS = [ + "environment_id", + "organization_id", + "project_id", + "session_id", + "environment_type", + "friendly_id", + "external_id", + "type", + "task_identifier", + "tags", + "metadata", + "closed_at", + "closed_reason", + "expires_at", + "created_at", + "updated_at", + "_version", + "_is_deleted", +] as const; + +export type SessionColumnName = (typeof SESSION_COLUMNS)[number]; + +export const SESSION_INDEX = Object.fromEntries(SESSION_COLUMNS.map((col, idx) => [col, idx])) as { + readonly [K in SessionColumnName]: number; +}; + +export type SessionFieldTypes = { + environment_id: string; + organization_id: string; + project_id: string; + session_id: string; + environment_type: string; + friendly_id: string; + external_id: string; + type: string; + task_identifier: string; + tags: string[]; + metadata: { data: unknown }; + closed_at: number | null; + closed_reason: string; + expires_at: number | null; + created_at: number; + updated_at: number; + _version: string; + _is_deleted: number; +}; + +/** + * Type-safe tuple representing a Session insert array. + * Order matches {@link SESSION_COLUMNS} exactly. + */ +export type SessionInsertArray = [ + environment_id: string, + organization_id: string, + project_id: string, + session_id: string, + environment_type: string, + friendly_id: string, + external_id: string, + type: string, + task_identifier: string, + tags: string[], + metadata: { data: unknown }, + closed_at: number | null, + closed_reason: string, + expires_at: number | null, + created_at: number, + updated_at: number, + _version: string, + _is_deleted: number, +]; + +export function getSessionField( + session: SessionInsertArray, + field: K +): SessionFieldTypes[K] { + return session[SESSION_INDEX[field]] as SessionFieldTypes[K]; +} + +export function insertSessionsCompactArrays(ch: ClickhouseWriter, settings?: ClickHouseSettings) { + return ch.insertCompactRaw({ + name: "insertSessionsCompactArrays", + table: "trigger_dev.sessions_v1", + columns: SESSION_COLUMNS, + settings: { + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + ...settings, + }, + }); +} + +export function insertSessions(ch: ClickhouseWriter, settings?: ClickHouseSettings) { + return ch.insert({ + name: "insertSessions", + table: "trigger_dev.sessions_v1", + schema: SessionV1, + settings: { + enable_json_type: 1, + type_json_skip_duplicated_paths: 1, + ...settings, + }, + }); +} + +// ─── read path ─────────────────────────────────────────────────── + +export const SessionV1QueryResult = z.object({ + session_id: z.string(), +}); + +export type SessionV1QueryResult = z.infer; + +/** + * Base query builder for listing Sessions. Filters + pagination are composed + * on top of this; callers can chain `.where(...).orderBy(...).limit(...)`. + */ +export function getSessionsQueryBuilder(ch: ClickhouseReader, settings?: ClickHouseSettings) { + return ch.queryBuilder({ + name: "getSessions", + baseQuery: "SELECT session_id FROM trigger_dev.sessions_v1 FINAL", + schema: SessionV1QueryResult, + settings, + }); +} + +export function getSessionsCountQueryBuilder( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilder({ + name: "getSessionsCount", + baseQuery: "SELECT count() as count FROM trigger_dev.sessions_v1 FINAL", + schema: z.object({ count: z.number().int() }), + settings, + }); +} + +export const SessionTagsQueryResult = z.object({ + tag: z.string(), +}); + +export type SessionTagsQueryResult = z.infer; + +export function getSessionTagsQueryBuilder( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilder({ + name: "getSessionTags", + baseQuery: "SELECT DISTINCT arrayJoin(tags) as tag FROM trigger_dev.sessions_v1", + schema: SessionTagsQueryResult, + settings, + }); +} diff --git a/internal-packages/database/prisma/migrations/20260419000000_add_sessions_table/migration.sql b/internal-packages/database/prisma/migrations/20260419000000_add_sessions_table/migration.sql new file mode 100644 index 00000000000..4cd7e543223 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260419000000_add_sessions_table/migration.sql @@ -0,0 +1,33 @@ +-- CreateTable +CREATE TABLE "Session" ( + "id" TEXT NOT NULL, + "friendlyId" TEXT NOT NULL, + "externalId" TEXT, + "type" TEXT NOT NULL, + "projectId" TEXT NOT NULL, + "runtimeEnvironmentId" TEXT NOT NULL, + "environmentType" "RuntimeEnvironmentType" NOT NULL, + "organizationId" TEXT NOT NULL, + "taskIdentifier" TEXT, + "tags" TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[], + "metadata" JSONB, + "closedAt" TIMESTAMP(3), + "closedReason" TEXT, + "expiresAt" TIMESTAMP(3), + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "Session_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "Session_friendlyId_key" + ON "Session"("friendlyId"); + +-- CreateIndex +CREATE UNIQUE INDEX "Session_runtimeEnvironmentId_externalId_key" + ON "Session"("runtimeEnvironmentId", "externalId"); + +-- CreateIndex +CREATE INDEX "Session_expiresAt_idx" + ON "Session"("expiresAt"); diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 1462752d8dd..8510cd0c556 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -671,6 +671,47 @@ enum TaskTriggerSource { SCHEDULED } +/// Durable, typed, bidirectional I/O primitive. Owns two S2 streams (.out / .in). +/// The row is essentially static — no status, no counters, no pointers. No +/// foreign keys: project/runtimeEnvironment/organization ids are plain +/// scalar columns (matches TaskRun pattern). List-style queries are served +/// from ClickHouse, not Postgres, so only point-lookup indexes live here. +model Session { + id String @id @default(cuid()) + friendlyId String @unique + /// User-supplied identifier scoped to the environment. Used for + /// idempotent upsert and for resolving sessions via the public API. + externalId String? + + /// Plain string — intentionally not an enum. + type String + + /// Denormalized scoping columns — no FK relations. + projectId String + runtimeEnvironmentId String + environmentType RuntimeEnvironmentType + organizationId String + + /// Informational pointer for task-owned types. Never changes after create. + taskIdentifier String? + + tags String[] @default([]) + metadata Json? + + /// Terminal markers — written once, never flipped back. + closedAt DateTime? + closedReason String? + expiresAt DateTime? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + /// Idempotency: `(env, externalId)` uniquely identifies a session. + /// PostgreSQL treats NULLs as distinct, so `externalId=NULL` rows never collide. + @@unique([runtimeEnvironmentId, externalId]) + @@index([expiresAt]) +} + model TaskRun { id String @id @default(cuid()) diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index a230f8c7450..66575c7c178 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -97,6 +97,7 @@ export const BatchId = new IdUtil("batch"); export const BulkActionId = new IdUtil("bulk"); export const AttemptId = new IdUtil("attempt"); export const ErrorId = new IdUtil("error"); +export const SessionId = new IdUtil("session"); export class IdGenerator { private alphabet: string; diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 6d324a10d11..99ba76b3b33 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1449,6 +1449,148 @@ export const CompleteWaitpointTokenRequestBody = z.object({ }); export type CompleteWaitpointTokenRequestBody = z.infer; +/** + * Request body for `POST /api/v1/sessions`. Creates a Session — the durable, + * typed, bidirectional I/O primitive that outlives a single run. + */ +export const CreateSessionRequestBody = z.object({ + /** Plain string discriminator — e.g. `"chat.agent"`. Not validated against an enum on the server. */ + type: z.string().min(1).max(64), + /** User-supplied idempotency key. Unique per environment. Empty strings are rejected. */ + externalId: z + .string() + .trim() + .min(1) + .max(256) + .refine((v) => !v.startsWith("session_"), { + message: "externalId cannot start with 'session_' (reserved prefix for internal friendlyIds)", + }) + .optional(), + /** Optional pointer for task-owned session types. */ + taskIdentifier: z.string().max(128).optional(), + /** Up to 10 tags for dashboard filtering. */ + tags: z.array(z.string().max(128)).max(10).optional(), + /** Arbitrary JSON metadata. */ + metadata: z.record(z.unknown()).optional(), + /** Absolute expiry timestamp for retention. */ + expiresAt: z.coerce.date().optional(), +}); +export type CreateSessionRequestBody = z.infer; + +export const SessionItem = z.object({ + id: z.string(), + externalId: z.string().nullable(), + type: z.string(), + taskIdentifier: z.string().nullable(), + tags: z.array(z.string()), + metadata: z.record(z.unknown()).nullable(), + closedAt: z.coerce.date().nullable(), + closedReason: z.string().nullable(), + expiresAt: z.coerce.date().nullable(), + createdAt: z.coerce.date(), + updatedAt: z.coerce.date(), +}); +export type SessionItem = z.infer; + +export const CreatedSessionResponseBody = SessionItem.extend({ + isCached: z.boolean(), +}); +export type CreatedSessionResponseBody = z.infer; + +export const RetrieveSessionResponseBody = SessionItem; +export type RetrieveSessionResponseBody = z.infer; + +export const UpdateSessionRequestBody = z.object({ + tags: z.array(z.string().max(128)).max(10).optional(), + metadata: z.record(z.unknown()).nullable().optional(), + // Null explicitly clears the externalId; non-null values must be non-empty. + externalId: z + .union([ + z.literal(null), + z + .string() + .trim() + .min(1) + .max(256) + .refine((v) => !v.startsWith("session_"), { + message: + "externalId cannot start with 'session_' (reserved prefix for internal friendlyIds)", + }), + ]) + .optional(), +}); +export type UpdateSessionRequestBody = z.infer; + +export const CloseSessionRequestBody = z.object({ + reason: z.string().max(256).optional(), +}); +export type CloseSessionRequestBody = z.infer; + +export const SessionStatus = z.enum(["ACTIVE", "CLOSED", "EXPIRED"]); +export type SessionStatus = z.infer; + +/** + * Server-side validation schema for `GET /api/v1/sessions`. Follows the same + * cursor-pagination convention as runs/waitpoints (`page[size]`, + * `page[after]`, `page[before]`) and uses the `filter[*]` prefix for + * narrowing fields — both produced automatically by `zodfetchCursorPage` + * and the matching client-side search-query helper. + */ +export const ListSessionsQueryParams = z + .object({ + "page[size]": z.coerce.number().int().min(1).max(100).default(20), + "page[after]": z.string().optional(), + "page[before]": z.string().optional(), + "filter[type]": z.union([z.string(), z.array(z.string())]).optional(), + "filter[tags]": z.union([z.string(), z.array(z.string())]).optional(), + "filter[taskIdentifier]": z.union([z.string(), z.array(z.string())]).optional(), + "filter[externalId]": z.string().optional(), + "filter[status]": z.union([SessionStatus, z.array(SessionStatus)]).optional(), + "filter[createdAt][period]": z.string().optional(), + "filter[createdAt][from]": z.coerce.number().int().optional(), + "filter[createdAt][to]": z.coerce.number().int().optional(), + }) + .refine( + (value) => !(value["page[after]"] && value["page[before]"]), + { + message: "Cannot pass both page[after] and page[before] on the same request", + path: ["page[before]"], + } + ); +export type ListSessionsQueryParams = z.infer; + +/** + * Client-facing list options — flattened shape that + * {@link ApiClient.listSessions} converts into the `filter[*]` / `page[*]` + * query string before sending. + */ +export const ListSessionsOptions = z.object({ + limit: z.number().int().min(1).max(100).optional(), + after: z.string().optional(), + before: z.string().optional(), + type: z.union([z.string(), z.array(z.string())]).optional(), + tag: z.union([z.string(), z.array(z.string())]).optional(), + taskIdentifier: z.union([z.string(), z.array(z.string())]).optional(), + externalId: z.string().optional(), + status: z.union([SessionStatus, z.array(SessionStatus)]).optional(), + period: z.string().optional(), + from: z.union([z.number(), z.date()]).optional(), + to: z.union([z.number(), z.date()]).optional(), +}); +export type ListSessionsOptions = z.infer; + +export const ListedSessionItem = SessionItem; +export type ListedSessionItem = z.infer; + +export const ListSessionsResponseBody = z.object({ + data: z.array(ListedSessionItem), + pagination: z.object({ + next: z.string().optional(), + previous: z.string().optional(), + }), +}); +export type ListSessionsResponseBody = z.infer; + export const CompleteWaitpointTokenResponseBody = z.object({ success: z.literal(true), });