Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/session-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc.
6 changes: 6 additions & 0 deletions .server-changes/session-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add `Session` primitive — a durable, typed, bidirectional I/O primitive that outlives a single run, intended for agent/chat use cases. Ships the Postgres schema (`Session` table), control-plane CRUD routes (`POST/GET/PATCH /api/v1/sessions`, `POST /api/v1/sessions/:session/close` — polymorphic on friendlyId or externalId), `sessions` JWT scope, ClickHouse `sessions_v1` table, and `SessionsReplicationService` (logical replication from Postgres `Session` → ClickHouse `sessions_v1`). Run-scoped realtime streams (`streams.pipe`/`streams.input`) are unchanged and do **not** create Session rows.
28 changes: 28 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
import { signalsEmitter } from "./services/signals.server";

// Start the sessions replication service (subscribes to the logical replication
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
// runs deterministically on webapp boot rather than lazily via a singleton
// reference elsewhere in the module graph.
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
sessionsReplicationInstance
.start()
.then(() => {
console.log("🗃️ Sessions replication service started");
})
.catch((error) => {
console.error("🗃️ Sessions replication service failed to start", {
error,
});
});

signalsEmitter.on(
"SIGTERM",
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
);
signalsEmitter.on(
"SIGINT",
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
);
}

const ABORT_DELAY = 30000;

Expand Down
32 changes: 32 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,38 @@ const EnvironmentSchema = z
RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"),
RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"),

// Session replication (Postgres → ClickHouse sessions_v1). Shares Redis
// with the runs replicator for leader locking but has its own slot and
// publication so the two consume independently.
SESSION_REPLICATION_CLICKHOUSE_URL: z.string().optional(),
SESSION_REPLICATION_ENABLED: z.string().default("0"),
SESSION_REPLICATION_SLOT_NAME: z.string().default("sessions_to_clickhouse_v1"),
SESSION_REPLICATION_PUBLICATION_NAME: z
.string()
.default("sessions_to_clickhouse_v1_publication"),
SESSION_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(1),
SESSION_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
SESSION_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
SESSION_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
SESSION_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
SESSION_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
SESSION_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
SESSION_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
SESSION_REPLICATION_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
SESSION_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
SESSION_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"),
SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
SESSION_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
SESSION_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),

// Clickhouse
CLICKHOUSE_URL: z.string(),
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
Expand Down
61 changes: 61 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.$session.close.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { json } from "@remix-run/server-runtime";
import {
CloseSessionRequestBody,
type RetrieveSessionResponseBody,
} from "@trigger.dev/core/v3";
import { z } from "zod";
import { prisma } from "~/db.server";
import {
resolveSessionByIdOrExternalId,
serializeSession,
} from "~/services/realtime/sessions.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
session: z.string(),
});

const { action } = createActionApiRoute(
{
params: ParamsSchema,
body: CloseSessionRequestBody,
maxContentLength: 1024,
method: "POST",
allowJWT: true,
corsStrategy: "all",
authorization: {
action: "admin",
resource: (params) => ({ sessions: params.session }),
superScopes: ["admin:sessions", "admin:all", "admin"],
},
},
async ({ authentication, params, body }) => {
const existing = await resolveSessionByIdOrExternalId(
prisma,
authentication.environment.id,
params.session
);

if (!existing) {
return json({ error: "Session not found" }, { status: 404 });
}

// Idempotent: if already closed, return the current row without clobbering
// the original closedAt / closedReason.
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(serializeSession(existing));
}

const updated = await prisma.session.update({
where: { id: existing.id },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});

return json<RetrieveSessionResponseBody>(serializeSession(updated));
}
);

export { action };
104 changes: 104 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.$session.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { json } from "@remix-run/server-runtime";
import {
type RetrieveSessionResponseBody,
UpdateSessionRequestBody,
} from "@trigger.dev/core/v3";
import { Prisma } from "@trigger.dev/database";
import { z } from "zod";
import { $replica, prisma } from "~/db.server";
import {
resolveSessionByIdOrExternalId,
serializeSession,
} from "~/services/realtime/sessions.server";
import {
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
session: z.string(),
});

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
findResource: async (params, auth) => {
return resolveSessionByIdOrExternalId($replica, auth.environment.id, params.session);
},
authorization: {
action: "read",
resource: (session) => ({ sessions: [session.friendlyId, session.externalId ?? ""] }),
superScopes: ["read:sessions", "read:all", "admin"],
},
},
async ({ resource: session }) => {
return json<RetrieveSessionResponseBody>(serializeSession(session));
}
);

const { action } = createActionApiRoute(
{
params: ParamsSchema,
body: UpdateSessionRequestBody,
maxContentLength: 1024 * 32,
method: "PATCH",
allowJWT: true,
corsStrategy: "all",
authorization: {
action: "admin",
resource: (params) => ({ sessions: params.session }),
superScopes: ["admin:sessions", "admin:all", "admin"],
},
},
async ({ authentication, params, body }) => {
const existing = await resolveSessionByIdOrExternalId(
prisma,
authentication.environment.id,
params.session
);

if (!existing) {
return json({ error: "Session not found" }, { status: 404 });
}

try {
const updated = await prisma.session.update({
where: { id: existing.id },
data: {
...(body.tags !== undefined ? { tags: body.tags } : {}),
...(body.metadata !== undefined
? {
metadata:
body.metadata === null
? Prisma.JsonNull
: (body.metadata as Prisma.InputJsonValue),
}
: {}),
...(body.externalId !== undefined ? { externalId: body.externalId } : {}),
},
});

return json<RetrieveSessionResponseBody>(serializeSession(updated));
} catch (error) {
// A duplicate externalId in the same environment violates the
// `(runtimeEnvironmentId, externalId)` unique constraint. Surface that
// as a 409 rather than a generic 500.
if (
error instanceof Prisma.PrismaClientKnownRequestError &&
error.code === "P2002" &&
Array.isArray((error.meta as { target?: string[] })?.target) &&
((error.meta as { target?: string[] }).target ?? []).includes("externalId")
) {
return json(
{ error: "A session with this externalId already exists in this environment" },
{ status: 409 }
);
}
throw error;
}
}
);

export { action };
Loading
Loading