Skip to content

Commit 3d5873c

Browse files
committed
code review fixes
1 parent 8988178 commit 3d5873c

7 files changed

Lines changed: 135 additions & 79 deletions

File tree

apps/webapp/app/routes/api.v1.sessions.$session.ts

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,23 +63,41 @@ const { action } = createActionApiRoute(
6363
return json({ error: "Session not found" }, { status: 404 });
6464
}
6565

66-
const updated = await prisma.session.update({
67-
where: { id: existing.id },
68-
data: {
69-
...(body.tags !== undefined ? { tags: body.tags } : {}),
70-
...(body.metadata !== undefined
71-
? {
72-
metadata:
73-
body.metadata === null
74-
? Prisma.JsonNull
75-
: (body.metadata as Prisma.InputJsonValue),
76-
}
77-
: {}),
78-
...(body.externalId !== undefined ? { externalId: body.externalId } : {}),
79-
},
80-
});
66+
try {
67+
const updated = await prisma.session.update({
68+
where: { id: existing.id },
69+
data: {
70+
...(body.tags !== undefined ? { tags: body.tags } : {}),
71+
...(body.metadata !== undefined
72+
? {
73+
metadata:
74+
body.metadata === null
75+
? Prisma.JsonNull
76+
: (body.metadata as Prisma.InputJsonValue),
77+
}
78+
: {}),
79+
...(body.externalId !== undefined ? { externalId: body.externalId } : {}),
80+
},
81+
});
8182

82-
return json<RetrieveSessionResponseBody>(serializeSession(updated));
83+
return json<RetrieveSessionResponseBody>(serializeSession(updated));
84+
} catch (error) {
85+
// A duplicate externalId in the same environment violates the
86+
// `(runtimeEnvironmentId, externalId)` unique constraint. Surface that
87+
// as a 409 rather than a generic 500.
88+
if (
89+
error instanceof Prisma.PrismaClientKnownRequestError &&
90+
error.code === "P2002" &&
91+
Array.isArray((error.meta as { target?: string[] })?.target) &&
92+
((error.meta as { target?: string[] }).target ?? []).includes("externalId")
93+
) {
94+
return json(
95+
{ error: "A session with this externalId already exists in this environment" },
96+
{ status: 409 }
97+
);
98+
}
99+
throw error;
100+
}
83101
}
84102
);
85103

apps/webapp/app/routes/api.v1.sessions.ts

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -89,38 +89,43 @@ const { action } = createActionApiRoute(
8989
let isCached = false;
9090

9191
if (body.externalId) {
92-
// Idempotent: (env, externalId) uniquely identifies the Session.
93-
const existing = await prisma.session.findUnique({
92+
// Atomic upsert — two concurrent POSTs with the same externalId both
93+
// converge to the same row without either hitting a 500 from the
94+
// unique constraint.
95+
const { id, friendlyId } = SessionId.generate();
96+
const externalId = body.externalId;
97+
const pre = await prisma.session.findFirst({
9498
where: {
95-
runtimeEnvironmentId_externalId: {
96-
runtimeEnvironmentId: authentication.environment.id,
97-
externalId: body.externalId,
98-
},
99+
runtimeEnvironmentId: authentication.environment.id,
100+
externalId,
99101
},
102+
select: { id: true },
100103
});
104+
isCached = pre !== null;
101105

102-
if (existing) {
103-
session = existing;
104-
isCached = true;
105-
} else {
106-
const { id, friendlyId } = SessionId.generate();
107-
session = await prisma.session.create({
108-
data: {
109-
id,
110-
friendlyId,
111-
externalId: body.externalId,
112-
type: body.type,
113-
taskIdentifier: body.taskIdentifier ?? null,
114-
tags: body.tags ?? [],
115-
metadata: body.metadata as Prisma.InputJsonValue | undefined,
116-
expiresAt: body.expiresAt ?? null,
117-
projectId: authentication.environment.projectId,
106+
session = await prisma.session.upsert({
107+
where: {
108+
runtimeEnvironmentId_externalId: {
118109
runtimeEnvironmentId: authentication.environment.id,
119-
environmentType: authentication.environment.type,
120-
organizationId: authentication.environment.organizationId,
110+
externalId,
121111
},
122-
});
123-
}
112+
},
113+
create: {
114+
id,
115+
friendlyId,
116+
externalId,
117+
type: body.type,
118+
taskIdentifier: body.taskIdentifier ?? null,
119+
tags: body.tags ?? [],
120+
metadata: body.metadata as Prisma.InputJsonValue | undefined,
121+
expiresAt: body.expiresAt ?? null,
122+
projectId: authentication.environment.projectId,
123+
runtimeEnvironmentId: authentication.environment.id,
124+
environmentType: authentication.environment.type,
125+
organizationId: authentication.environment.organizationId,
126+
},
127+
update: {},
128+
});
124129
} else {
125130
const { id, friendlyId } = SessionId.generate();
126131
session = await prisma.session.create({

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,18 @@ const ParamsSchema = z.object({
1717
// POST: server-side append of a single record to a session channel. Mirrors
1818
// the existing /realtime/v1/streams/:runId/:target/:streamId/append route,
1919
// scoped to a Session primitive.
20+
// S2 enforces a 1 MiB per-record limit (metered as
21+
// `8 + 2*H + Σ(header name+value) + body`). We cap the raw HTTP body at
22+
// 512 KiB so the JSON wrapper (`{"data":"...","id":"..."}`), string
23+
// escaping, and any future per-record header additions all stay comfortably
24+
// below S2's ceiling. See https://s2.dev/docs/limits.
25+
const MAX_APPEND_BODY_BYTES = 1024 * 512;
26+
2027
const { action } = createActionApiRoute(
2128
{
2229
params: ParamsSchema,
2330
method: "POST",
31+
maxContentLength: MAX_APPEND_BODY_BYTES,
2432
allowJWT: true,
2533
corsStrategy: "all",
2634
authorization: {

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ const loader = createLoaderApiRoute(
7979
},
8080
authorization: {
8181
action: "read",
82-
resource: (session) => ({ sessions: [session.friendlyId, session.externalId ?? ""] }),
82+
resource: (session) => {
83+
const ids = session.externalId
84+
? [session.friendlyId, session.externalId]
85+
: [session.friendlyId];
86+
return { sessions: ids };
87+
},
8388
superScopes: ["read:sessions", "read:all", "admin"],
8489
},
8590
},
@@ -104,19 +109,22 @@ const loader = createLoaderApiRoute(
104109

105110
const lastEventId = request.headers.get("Last-Event-ID") ?? undefined;
106111

107-
const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined;
108-
const timeoutInSeconds = timeoutInSecondsRaw ? parseInt(timeoutInSecondsRaw, 10) : undefined;
109-
110-
if (timeoutInSeconds !== undefined && isNaN(timeoutInSeconds)) {
111-
return new Response("Invalid timeout seconds", { status: 400 });
112-
}
113-
114-
if (timeoutInSeconds !== undefined && timeoutInSeconds < 1) {
115-
return new Response("Timeout seconds must be greater than 0", { status: 400 });
116-
}
117-
118-
if (timeoutInSeconds !== undefined && timeoutInSeconds > 600) {
119-
return new Response("Timeout seconds must be less than 600", { status: 400 });
112+
const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds");
113+
let timeoutInSeconds: number | undefined;
114+
if (timeoutInSecondsRaw) {
115+
// `Number()` rejects `"10abc"` as NaN; `parseInt` would silently accept
116+
// the trailing garbage and bypass the bounds checks below.
117+
const parsed = Number(timeoutInSecondsRaw);
118+
if (!Number.isFinite(parsed) || !Number.isInteger(parsed)) {
119+
return new Response("Invalid timeout seconds", { status: 400 });
120+
}
121+
if (parsed < 1) {
122+
return new Response("Timeout seconds must be greater than 0", { status: 400 });
123+
}
124+
if (parsed > 600) {
125+
return new Response("Timeout seconds must be less than 600", { status: 400 });
126+
}
127+
timeoutInSeconds = parsed;
120128
}
121129

122130
return realtimeStream.streamResponseFromSessionStream(

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@ const ParamsSchema = z.object({
1313
streamId: z.string(),
1414
});
1515

16+
// S2 enforces a 1 MiB per-record limit (metered as
17+
// `8 + 2*H + Σ(header name+value) + body`). Cap the raw HTTP body at
18+
// 512 KiB so the JSON wrapper, string escaping, and any future per-record
19+
// header additions all stay well under S2's ceiling.
20+
// See https://s2.dev/docs/limits.
21+
const MAX_APPEND_BODY_BYTES = 1024 * 512;
22+
1623
const { action } = createActionApiRoute(
1724
{
1825
params: ParamsSchema,
26+
maxContentLength: MAX_APPEND_BODY_BYTES,
1927
},
2028
async ({ request, params, authentication }) => {
2129
const run = await $replica.taskRun.findFirst({

apps/webapp/app/services/realtime/sessions.server.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@ export async function resolveSessionByIdOrExternalId(
2727
});
2828
}
2929

30-
return prisma.session.findUnique({
31-
where: {
32-
runtimeEnvironmentId_externalId: {
33-
runtimeEnvironmentId,
34-
externalId: idOrExternalId,
35-
},
36-
},
30+
// `findFirst` rather than `findUnique` per the repo rule — `findUnique`'s
31+
// implicit DataLoader has open correctness bugs in Prisma 6.x that bite
32+
// hot-path lookups exactly like this one.
33+
return prisma.session.findFirst({
34+
where: { runtimeEnvironmentId, externalId: idOrExternalId },
3735
});
3836
}
3937

packages/core/src/v3/schemas/api.ts

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,8 +1456,8 @@ export type CompleteWaitpointTokenRequestBody = z.infer<typeof CompleteWaitpoint
14561456
export const CreateSessionRequestBody = z.object({
14571457
/** Plain string discriminator — e.g. `"chat.agent"`. Not validated against an enum on the server. */
14581458
type: z.string().min(1).max(64),
1459-
/** User-supplied idempotency key. Unique per environment. */
1460-
externalId: z.string().max(256).optional(),
1459+
/** User-supplied idempotency key. Unique per environment. Empty strings are rejected. */
1460+
externalId: z.string().trim().min(1).max(256).optional(),
14611461
/** Optional pointer for task-owned session types. */
14621462
taskIdentifier: z.string().max(128).optional(),
14631463
/** Up to 10 tags for dashboard filtering. */
@@ -1495,7 +1495,10 @@ export type RetrieveSessionResponseBody = z.infer<typeof RetrieveSessionResponse
14951495
export const UpdateSessionRequestBody = z.object({
14961496
tags: z.array(z.string().max(128)).max(10).optional(),
14971497
metadata: z.record(z.unknown()).nullable().optional(),
1498-
externalId: z.string().max(256).nullable().optional(),
1498+
// Null explicitly clears the externalId; non-null values must be non-empty.
1499+
externalId: z
1500+
.union([z.literal(null), z.string().trim().min(1).max(256)])
1501+
.optional(),
14991502
});
15001503
export type UpdateSessionRequestBody = z.infer<typeof UpdateSessionRequestBody>;
15011504

@@ -1514,19 +1517,27 @@ export type SessionStatus = z.infer<typeof SessionStatus>;
15141517
* narrowing fields — both produced automatically by `zodfetchCursorPage`
15151518
* and the matching client-side search-query helper.
15161519
*/
1517-
export const ListSessionsQueryParams = z.object({
1518-
"page[size]": z.coerce.number().int().min(1).max(100).default(20),
1519-
"page[after]": z.string().optional(),
1520-
"page[before]": z.string().optional(),
1521-
"filter[type]": z.union([z.string(), z.array(z.string())]).optional(),
1522-
"filter[tags]": z.union([z.string(), z.array(z.string())]).optional(),
1523-
"filter[taskIdentifier]": z.union([z.string(), z.array(z.string())]).optional(),
1524-
"filter[externalId]": z.string().optional(),
1525-
"filter[status]": z.union([SessionStatus, z.array(SessionStatus)]).optional(),
1526-
"filter[createdAt][period]": z.string().optional(),
1527-
"filter[createdAt][from]": z.coerce.number().int().optional(),
1528-
"filter[createdAt][to]": z.coerce.number().int().optional(),
1529-
});
1520+
export const ListSessionsQueryParams = z
1521+
.object({
1522+
"page[size]": z.coerce.number().int().min(1).max(100).default(20),
1523+
"page[after]": z.string().optional(),
1524+
"page[before]": z.string().optional(),
1525+
"filter[type]": z.union([z.string(), z.array(z.string())]).optional(),
1526+
"filter[tags]": z.union([z.string(), z.array(z.string())]).optional(),
1527+
"filter[taskIdentifier]": z.union([z.string(), z.array(z.string())]).optional(),
1528+
"filter[externalId]": z.string().optional(),
1529+
"filter[status]": z.union([SessionStatus, z.array(SessionStatus)]).optional(),
1530+
"filter[createdAt][period]": z.string().optional(),
1531+
"filter[createdAt][from]": z.coerce.number().int().optional(),
1532+
"filter[createdAt][to]": z.coerce.number().int().optional(),
1533+
})
1534+
.refine(
1535+
(value) => !(value["page[after]"] && value["page[before]"]),
1536+
{
1537+
message: "Cannot pass both page[after] and page[before] on the same request",
1538+
path: ["page[before]"],
1539+
}
1540+
);
15301541
export type ListSessionsQueryParams = z.infer<typeof ListSessionsQueryParams>;
15311542

15321543
/**

0 commit comments

Comments
 (0)