Skip to content

Commit 8988178

Browse files
committed
feat(webapp,clickhouse,database,core): Session primitive (server side)
Durable, typed, bidirectional I/O primitive that outlives a single run. Ship target is agent/chat use cases; run-scoped streams.pipe/streams.input are untouched and do not create Session rows. Postgres - New Session table: id, friendlyId, externalId, type (plain string), denormalised project/environment/organization scalar columns (no FKs), taskIdentifier, tags String[], metadata Json, closedAt, closedReason, expiresAt, timestamps - Point-lookup indexes only (friendlyId unique, (env, externalId) unique, expiresAt). List queries are served from ClickHouse so Postgres stays minimal and insert-heavy. Control-plane API - POST /api/v1/sessions create (idempotent via externalId) - GET /api/v1/sessions list with filters (type, tag, taskIdentifier, externalId, status ACTIVE|CLOSED|EXPIRED, period/from/to) and cursor pagination, ClickHouse-backed - GET /api/v1/sessions/:session retrieve — polymorphic: `session_` prefix hits friendlyId, otherwise externalId - PATCH /api/v1/sessions/:session update tags/metadata/externalId - POST /api/v1/sessions/:session/close terminal close (idempotent) Realtime (S2-backed) - PUT /realtime/v1/sessions/:session/:io returns S2 creds - GET /realtime/v1/sessions/:session/:io SSE subscribe - POST /realtime/v1/sessions/:session/:io/append server-side append - S2 key format: sessions/{friendlyId}/{out|in} Auth - sessions added to ResourceTypes. read:sessions:{id}, write:sessions:{id}, admin:sessions:{id} scopes work via existing JWT validation. ClickHouse - sessions_v1 ReplacingMergeTree table - SessionsReplicationService mirrors RunsReplicationService exactly: logical replication with leader-locked consumer, ConcurrentFlushScheduler, retry with exponential backoff + jitter, identical metric shape. Dedicated slot + publication (sessions_to_clickhouse_v1[_publication]). - SessionsRepository + ClickHouseSessionsRepository expose list, count, tags with cursor pagination keyed by (created_at DESC, session_id DESC). - Derived status (ACTIVE/CLOSED/EXPIRED) computed from closed_at + expires_at; in-memory fallback on list results to catch pre-replication writes. Verification - Webapp typecheck 10/10 - Core + SDK build 3/3 - sessionsReplicationService.test.ts integration tests 2/2 (insert + update round-trip via testcontainers) - Live round-trip against local dev: create -> retrieve (friendlyId and externalId) -> out.initialize -> out.append x2 -> in.send -> out.subscribe (receives records) -> close -> ClickHouse sessions_v1 shows the replicated row with closed_reason - Live list smoke: tag, type, status CLOSED, externalId, and cursor pagination
1 parent 7d7ebdd commit 8988178

25 files changed

+2647
-6
lines changed

.changeset/session-primitive.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add `Session` primitive — a durable, typed, bidirectional I/O primitive that outlives a single run, intended for agent/chat use cases. Ships the Postgres schema (`Session` table), control-plane CRUD routes (`POST/GET/PATCH /api/v1/sessions`, `POST /api/v1/sessions/:session/close` — polymorphic on friendlyId or externalId), `sessions` JWT scope, ClickHouse `sessions_v1` table, and `SessionsReplicationService` (logical replication from Postgres `Session` → ClickHouse `sessions_v1`). Run-scoped realtime streams (`streams.pipe`/`streams.input`) are unchanged and do **not** create Session rows.

apps/webapp/app/entry.server.tsx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,34 @@ import {
2323
registerRunEngineEventBusHandlers,
2424
setupBatchQueueCallbacks,
2525
} from "./v3/runEngineHandlers.server";
26+
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
27+
import { signalsEmitter } from "./services/signals.server";
28+
29+
// Start the sessions replication service (subscribes to the logical replication
30+
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
31+
// runs deterministically on webapp boot rather than lazily via a singleton
32+
// reference elsewhere in the module graph.
33+
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
34+
sessionsReplicationInstance
35+
.start()
36+
.then(() => {
37+
console.log("🗃️ Sessions replication service started");
38+
})
39+
.catch((error) => {
40+
console.error("🗃️ Sessions replication service failed to start", {
41+
error,
42+
});
43+
});
44+
45+
signalsEmitter.on(
46+
"SIGTERM",
47+
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
48+
);
49+
signalsEmitter.on(
50+
"SIGINT",
51+
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
52+
);
53+
}
2654

2755
const ABORT_DELAY = 30000;
2856

apps/webapp/app/env.server.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,6 +1213,38 @@ const EnvironmentSchema = z
12131213
RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"),
12141214
RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"),
12151215

1216+
// Session replication (Postgres → ClickHouse sessions_v1). Shares Redis
1217+
// with the runs replicator for leader locking but has its own slot and
1218+
// publication so the two consume independently.
1219+
SESSION_REPLICATION_CLICKHOUSE_URL: z.string().optional(),
1220+
SESSION_REPLICATION_ENABLED: z.string().default("0"),
1221+
SESSION_REPLICATION_SLOT_NAME: z.string().default("sessions_to_clickhouse_v1"),
1222+
SESSION_REPLICATION_PUBLICATION_NAME: z
1223+
.string()
1224+
.default("sessions_to_clickhouse_v1_publication"),
1225+
SESSION_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(1),
1226+
SESSION_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
1227+
SESSION_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
1228+
SESSION_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
1229+
SESSION_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
1230+
SESSION_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
1231+
SESSION_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
1232+
SESSION_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
1233+
SESSION_REPLICATION_LOG_LEVEL: z
1234+
.enum(["log", "error", "warn", "info", "debug"])
1235+
.default("info"),
1236+
SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL: z
1237+
.enum(["log", "error", "warn", "info", "debug"])
1238+
.default("info"),
1239+
SESSION_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
1240+
SESSION_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"),
1241+
SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
1242+
SESSION_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
1243+
SESSION_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
1244+
SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
1245+
SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
1246+
SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),
1247+
12161248
// Clickhouse
12171249
CLICKHOUSE_URL: z.string(),
12181250
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import {
3+
CloseSessionRequestBody,
4+
type RetrieveSessionResponseBody,
5+
} from "@trigger.dev/core/v3";
6+
import { z } from "zod";
7+
import { prisma } from "~/db.server";
8+
import {
9+
resolveSessionByIdOrExternalId,
10+
serializeSession,
11+
} from "~/services/realtime/sessions.server";
12+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
13+
14+
const ParamsSchema = z.object({
15+
session: z.string(),
16+
});
17+
18+
const { action } = createActionApiRoute(
19+
{
20+
params: ParamsSchema,
21+
body: CloseSessionRequestBody,
22+
maxContentLength: 1024,
23+
method: "POST",
24+
allowJWT: true,
25+
corsStrategy: "all",
26+
authorization: {
27+
action: "admin",
28+
resource: (params) => ({ sessions: params.session }),
29+
superScopes: ["admin:sessions", "admin:all", "admin"],
30+
},
31+
},
32+
async ({ authentication, params, body }) => {
33+
const existing = await resolveSessionByIdOrExternalId(
34+
prisma,
35+
authentication.environment.id,
36+
params.session
37+
);
38+
39+
if (!existing) {
40+
return json({ error: "Session not found" }, { status: 404 });
41+
}
42+
43+
// Idempotent: if already closed, return the current row without clobbering
44+
// the original closedAt / closedReason.
45+
if (existing.closedAt) {
46+
return json<RetrieveSessionResponseBody>(serializeSession(existing));
47+
}
48+
49+
const updated = await prisma.session.update({
50+
where: { id: existing.id },
51+
data: {
52+
closedAt: new Date(),
53+
closedReason: body.reason ?? null,
54+
},
55+
});
56+
57+
return json<RetrieveSessionResponseBody>(serializeSession(updated));
58+
}
59+
);
60+
61+
export { action };
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import {
3+
type RetrieveSessionResponseBody,
4+
UpdateSessionRequestBody,
5+
} from "@trigger.dev/core/v3";
6+
import { Prisma } from "@trigger.dev/database";
7+
import { z } from "zod";
8+
import { $replica, prisma } from "~/db.server";
9+
import {
10+
resolveSessionByIdOrExternalId,
11+
serializeSession,
12+
} from "~/services/realtime/sessions.server";
13+
import {
14+
createActionApiRoute,
15+
createLoaderApiRoute,
16+
} from "~/services/routeBuilders/apiBuilder.server";
17+
18+
const ParamsSchema = z.object({
19+
session: z.string(),
20+
});
21+
22+
export const loader = createLoaderApiRoute(
23+
{
24+
params: ParamsSchema,
25+
allowJWT: true,
26+
corsStrategy: "all",
27+
findResource: async (params, auth) => {
28+
return resolveSessionByIdOrExternalId($replica, auth.environment.id, params.session);
29+
},
30+
authorization: {
31+
action: "read",
32+
resource: (session) => ({ sessions: [session.friendlyId, session.externalId ?? ""] }),
33+
superScopes: ["read:sessions", "read:all", "admin"],
34+
},
35+
},
36+
async ({ resource: session }) => {
37+
return json<RetrieveSessionResponseBody>(serializeSession(session));
38+
}
39+
);
40+
41+
const { action } = createActionApiRoute(
42+
{
43+
params: ParamsSchema,
44+
body: UpdateSessionRequestBody,
45+
maxContentLength: 1024 * 32,
46+
method: "PATCH",
47+
allowJWT: true,
48+
corsStrategy: "all",
49+
authorization: {
50+
action: "admin",
51+
resource: (params) => ({ sessions: params.session }),
52+
superScopes: ["admin:sessions", "admin:all", "admin"],
53+
},
54+
},
55+
async ({ authentication, params, body }) => {
56+
const existing = await resolveSessionByIdOrExternalId(
57+
prisma,
58+
authentication.environment.id,
59+
params.session
60+
);
61+
62+
if (!existing) {
63+
return json({ error: "Session not found" }, { status: 404 });
64+
}
65+
66+
const updated = await prisma.session.update({
67+
where: { id: existing.id },
68+
data: {
69+
...(body.tags !== undefined ? { tags: body.tags } : {}),
70+
...(body.metadata !== undefined
71+
? {
72+
metadata:
73+
body.metadata === null
74+
? Prisma.JsonNull
75+
: (body.metadata as Prisma.InputJsonValue),
76+
}
77+
: {}),
78+
...(body.externalId !== undefined ? { externalId: body.externalId } : {}),
79+
},
80+
});
81+
82+
return json<RetrieveSessionResponseBody>(serializeSession(updated));
83+
}
84+
);
85+
86+
export { action };
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import {
3+
CreateSessionRequestBody,
4+
type CreatedSessionResponseBody,
5+
ListSessionsQueryParams,
6+
type ListSessionsResponseBody,
7+
type SessionStatus,
8+
} from "@trigger.dev/core/v3";
9+
import { SessionId } from "@trigger.dev/core/v3/isomorphic";
10+
import type { Prisma, Session } from "@trigger.dev/database";
11+
import { $replica, prisma, type PrismaClient } from "~/db.server";
12+
import { clickhouseClient } from "~/services/clickhouseInstance.server";
13+
import { serializeSession } from "~/services/realtime/sessions.server";
14+
import { SessionsRepository } from "~/services/sessionsRepository/sessionsRepository.server";
15+
import {
16+
createActionApiRoute,
17+
createLoaderApiRoute,
18+
} from "~/services/routeBuilders/apiBuilder.server";
19+
import { ServiceValidationError } from "~/v3/services/common.server";
20+
21+
function asArray<T>(value: T | T[] | undefined): T[] | undefined {
22+
if (value === undefined) return undefined;
23+
return Array.isArray(value) ? value : [value];
24+
}
25+
26+
export const loader = createLoaderApiRoute(
27+
{
28+
searchParams: ListSessionsQueryParams,
29+
findResource: async () => 1,
30+
},
31+
async ({ searchParams, authentication }) => {
32+
const repository = new SessionsRepository({
33+
clickhouse: clickhouseClient,
34+
prisma: $replica as PrismaClient,
35+
});
36+
37+
// `page[after]` is the forward cursor, `page[before]` is the backward
38+
// cursor. The repository internally keys off `{cursor, direction}`.
39+
const cursor = searchParams["page[after]"] ?? searchParams["page[before]"];
40+
const direction = searchParams["page[before]"] ? "backward" : "forward";
41+
42+
const { sessions: rows, pagination } = await repository.listSessions({
43+
organizationId: authentication.environment.organizationId,
44+
projectId: authentication.environment.projectId,
45+
environmentId: authentication.environment.id,
46+
types: asArray(searchParams["filter[type]"]),
47+
tags: asArray(searchParams["filter[tags]"]),
48+
taskIdentifiers: asArray(searchParams["filter[taskIdentifier]"]),
49+
externalId: searchParams["filter[externalId]"],
50+
statuses: asArray(searchParams["filter[status]"]) as SessionStatus[] | undefined,
51+
period: searchParams["filter[createdAt][period]"],
52+
from: searchParams["filter[createdAt][from]"],
53+
to: searchParams["filter[createdAt][to]"],
54+
page: {
55+
size: searchParams["page[size]"],
56+
cursor,
57+
direction,
58+
},
59+
});
60+
61+
return json<ListSessionsResponseBody>({
62+
data: rows.map((session) =>
63+
serializeSession({
64+
...session,
65+
// Columns the list query doesn't select — filled so `serializeSession`
66+
// can operate on a narrowed payload without type errors.
67+
projectId: authentication.environment.projectId,
68+
environmentType: authentication.environment.type,
69+
organizationId: authentication.environment.organizationId,
70+
} as Session)
71+
),
72+
pagination: {
73+
...(pagination.nextCursor ? { next: pagination.nextCursor } : {}),
74+
...(pagination.previousCursor ? { previous: pagination.previousCursor } : {}),
75+
},
76+
});
77+
}
78+
);
79+
80+
const { action } = createActionApiRoute(
81+
{
82+
body: CreateSessionRequestBody,
83+
method: "POST",
84+
maxContentLength: 1024 * 32, // 32KB — metadata is the only thing that grows
85+
},
86+
async ({ authentication, body }) => {
87+
try {
88+
let session: Session;
89+
let isCached = false;
90+
91+
if (body.externalId) {
92+
// Idempotent: (env, externalId) uniquely identifies the Session.
93+
const existing = await prisma.session.findUnique({
94+
where: {
95+
runtimeEnvironmentId_externalId: {
96+
runtimeEnvironmentId: authentication.environment.id,
97+
externalId: body.externalId,
98+
},
99+
},
100+
});
101+
102+
if (existing) {
103+
session = existing;
104+
isCached = true;
105+
} else {
106+
const { id, friendlyId } = SessionId.generate();
107+
session = await prisma.session.create({
108+
data: {
109+
id,
110+
friendlyId,
111+
externalId: body.externalId,
112+
type: body.type,
113+
taskIdentifier: body.taskIdentifier ?? null,
114+
tags: body.tags ?? [],
115+
metadata: body.metadata as Prisma.InputJsonValue | undefined,
116+
expiresAt: body.expiresAt ?? null,
117+
projectId: authentication.environment.projectId,
118+
runtimeEnvironmentId: authentication.environment.id,
119+
environmentType: authentication.environment.type,
120+
organizationId: authentication.environment.organizationId,
121+
},
122+
});
123+
}
124+
} else {
125+
const { id, friendlyId } = SessionId.generate();
126+
session = await prisma.session.create({
127+
data: {
128+
id,
129+
friendlyId,
130+
type: body.type,
131+
taskIdentifier: body.taskIdentifier ?? null,
132+
tags: body.tags ?? [],
133+
metadata: body.metadata as Prisma.InputJsonValue | undefined,
134+
expiresAt: body.expiresAt ?? null,
135+
projectId: authentication.environment.projectId,
136+
runtimeEnvironmentId: authentication.environment.id,
137+
environmentType: authentication.environment.type,
138+
organizationId: authentication.environment.organizationId,
139+
},
140+
});
141+
}
142+
143+
return json<CreatedSessionResponseBody>(
144+
{ ...serializeSession(session), isCached },
145+
{ status: isCached ? 200 : 201 }
146+
);
147+
} catch (error) {
148+
if (error instanceof ServiceValidationError) {
149+
return json({ error: error.message }, { status: 422 });
150+
}
151+
if (error instanceof Error) {
152+
return json({ error: error.message }, { status: 500 });
153+
}
154+
return json({ error: "Something went wrong" }, { status: 500 });
155+
}
156+
}
157+
);
158+
159+
export { action };

0 commit comments

Comments
 (0)