Skip to content

Commit c6c3b4e

Browse files
matt-aitkenclaude
andcommitted
fix(webapp): use composite keyset cursor for run pagination
listRunIds/listRuns order by the composite key (created_at, run_id) but the cursor predicate cut on run_id alone. That is only sound when run_id lexicographic order matches created_at order. When a burst of runs is created such that the two diverge, keyset pagination both re-includes already-returned runs (duplicates) and drops runs it should return (skips). For bulk replay this produced duplicate runs; for the dashboard and runs.list it could silently skip or repeat runs at page boundaries. - Encode cursors as the composite (created_at, run_id) key (v2_<createdAtMs>_<runId>) and cut on the matching tuple predicate ((created_at, run_id) < / > (...)). The ORDER BY is unchanged, so the table's primary-key alignment (and query performance) is preserved. - Cursors are server-issued opaque tokens (the SDK just echoes pagination.next/previous back), so this needs no client update. Legacy bare-run_id cursors decode to the old run_id-only predicate for backwards compatibility with in-flight cursors. - Add listRunIdsWithCursor for forward-only batch iteration (bulk actions) so the created_at component is sourced from the same query that orders the rows. - ClickHouse getTaskRunsQueryBuilder now also selects toUnixTimestamp64Milli(created_at) AS created_at_ms. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent fa4804e commit c6c3b4e

10 files changed

Lines changed: 493 additions & 71 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix run pagination that could duplicate or skip runs: the query orders by `(created_at, run_id)` but the cursor cut on `run_id` alone, which diverges when run_id order doesn't match created_at order (e.g. bulk replay re-processing runs). Cursors now encode the composite key as an opaque token and cut on the matching tuple; legacy bare-run_id cursors stay supported for in-flight pagination.

apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ export class TestTaskPresenter {
203203
prisma: this.replica as PrismaClient,
204204
});
205205

206-
const runIds = await runsRepository.listRunIds({
206+
const { runIds } = await runsRepository.listRunIds({
207207
organizationId: environment.organizationId,
208208
environmentId: environment.id,
209209
projectId: environment.projectId,

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.live.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
5858
return { count: 0, since: newRunsSince };
5959
}
6060

61-
const newRunIds = await runsRepository.listRunIds({
61+
const { runIds: newRunIds } = await runsRepository.listRunIds({
6262
organizationId: project.organizationId,
6363
projectId: project.id,
6464
environmentId: environment.id,

apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts

Lines changed: 98 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@ import {
44
type FilterRunsOptions,
55
type IRunsRepository,
66
type ListRunsOptions,
7+
type RunIdsPage,
78
type RunListInputOptions,
89
type RunsRepositoryOptions,
910
type TagListOptions,
1011
convertRunListInputOptionsToFilterRunsOptions,
1112
} from "./runsRepository.server";
1213
import parseDuration from "parse-duration";
14+
import { decodeRunsCursor, encodeRunsCursor } from "./runsCursor.server";
15+
16+
type RunCursorRow = { runId: string; createdAt: number };
1317

1418
export class ClickHouseRunsRepository implements IRunsRepository {
1519
constructor(private readonly options: RunsRepositoryOptions) {}
@@ -18,25 +22,52 @@ export class ClickHouseRunsRepository implements IRunsRepository {
1822
return "clickhouse";
1923
}
2024

21-
async listRunIds(options: ListRunsOptions) {
25+
/**
26+
* Runs the keyset-paginated query and returns `{ runId, createdAt }` rows
27+
* (one extra beyond `page.size` to signal "has more"). The ordering is always
28+
* the composite `(created_at, run_id)`; the cursor predicate must match it.
29+
*
30+
* Composite cursors carry both components, so we cut on the
31+
* `(created_at, run_id)` tuple — sound regardless of how run_id order relates
32+
* to created_at order. Legacy bare-run_id cursors fall back to the old
33+
* `run_id`-only predicate (knowingly unsound) for backwards compatibility
34+
* with in-flight cursors.
35+
*/
36+
private async listRunRows(options: ListRunsOptions): Promise<RunCursorRow[]> {
2237
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
2338
applyRunFiltersToQueryBuilder(
2439
queryBuilder,
2540
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
2641
);
2742

43+
const forward = options.page.direction === "forward" || !options.page.direction;
44+
2845
if (options.page.cursor) {
29-
if (options.page.direction === "forward" || !options.page.direction) {
30-
queryBuilder
31-
.where("run_id < {runId: String}", { runId: options.page.cursor })
32-
.orderBy("created_at DESC, run_id DESC")
33-
.limit(options.page.size + 1);
46+
const decoded = decodeRunsCursor(options.page.cursor);
47+
48+
if (forward) {
49+
if (decoded.kind === "composite") {
50+
queryBuilder.where(
51+
"(created_at, run_id) < (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})",
52+
{ cursorCreatedAt: decoded.createdAt, runId: decoded.runId }
53+
);
54+
} else {
55+
queryBuilder.where("run_id < {runId: String}", { runId: decoded.runId });
56+
}
57+
queryBuilder.orderBy("created_at DESC, run_id DESC");
3458
} else {
35-
queryBuilder
36-
.where("run_id > {runId: String}", { runId: options.page.cursor })
37-
.orderBy("created_at ASC, run_id ASC")
38-
.limit(options.page.size + 1);
59+
if (decoded.kind === "composite") {
60+
queryBuilder.where(
61+
"(created_at, run_id) > (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})",
62+
{ cursorCreatedAt: decoded.createdAt, runId: decoded.runId }
63+
);
64+
} else {
65+
queryBuilder.where("run_id > {runId: String}", { runId: decoded.runId });
66+
}
67+
queryBuilder.orderBy("created_at ASC, run_id ASC");
3968
}
69+
70+
queryBuilder.limit(options.page.size + 1);
4071
} else {
4172
// Initial page - no cursor provided
4273
queryBuilder.orderBy("created_at DESC, run_id DESC").limit(options.page.size + 1);
@@ -48,75 +79,89 @@ export class ClickHouseRunsRepository implements IRunsRepository {
4879
throw queryError;
4980
}
5081

51-
const runIds = result.map((row) => row.run_id);
52-
return runIds;
82+
return result.map((row) => ({ runId: row.run_id, createdAt: row.created_at_ms }));
5383
}
5484

55-
async listFriendlyRunIds(options: ListRunsOptions) {
56-
// First get internal IDs from ClickHouse
57-
const internalIds = await this.listRunIds(options);
85+
/**
86+
* A keyset-paginated page of run ids ordered by `(created_at, run_id)`, plus
87+
* the cursors to page forward/backward. Cursors are composite tokens that
88+
* match the ordering, so pagination can't duplicate or skip runs even when
89+
* run_id order diverges from created_at order. This is the single source of
90+
* cursor construction — `listRuns` and bulk actions both build on it.
91+
*/
92+
async listRunIds(options: ListRunsOptions): Promise<RunIdsPage> {
93+
const rows = await this.listRunRows(options);
5894

59-
if (internalIds.length === 0) {
60-
return [];
61-
}
62-
63-
// Then get friendly IDs from Prisma
64-
const runs = await this.options.prisma.taskRun.findMany({
65-
where: {
66-
id: {
67-
in: internalIds,
68-
},
69-
},
70-
select: {
71-
friendlyId: true,
72-
},
73-
});
74-
75-
return runs.map((run) => run.friendlyId);
76-
}
77-
78-
async listRuns(options: ListRunsOptions) {
79-
const runIds = await this.listRunIds(options);
95+
// listRunRows fetches one extra row beyond page.size to detect "has more".
96+
const hasMore = rows.length > options.page.size;
8097

81-
// If there are more runs than the page size, we need to fetch the next page
82-
const hasMore = runIds.length > options.page.size;
98+
const cursorFor = (row: RunCursorRow | undefined): string | null =>
99+
row ? encodeRunsCursor(row.createdAt, row.runId) : null;
83100

84101
let nextCursor: string | null = null;
85102
let previousCursor: string | null = null;
86103

87-
//get cursors for next and previous pages
88104
const direction = options.page.direction ?? "forward";
89105
switch (direction) {
90106
case "forward": {
91-
previousCursor = options.page.cursor ? runIds.at(0) ?? null : null;
107+
previousCursor = options.page.cursor ? cursorFor(rows.at(0)) : null;
92108
if (hasMore) {
93-
// The next cursor should be the last run ID from this page
94-
nextCursor = runIds[options.page.size - 1];
109+
// The next cursor is the last run on this page.
110+
nextCursor = cursorFor(rows[options.page.size - 1]);
95111
}
96112
break;
97113
}
98114
case "backward": {
99-
const reversedRunIds = [...runIds].reverse();
115+
const reversedRows = [...rows].reverse();
100116
if (hasMore) {
101-
previousCursor = reversedRunIds.at(1) ?? null;
102-
nextCursor = reversedRunIds.at(options.page.size) ?? null;
117+
previousCursor = cursorFor(reversedRows.at(1));
118+
nextCursor = cursorFor(reversedRows.at(options.page.size));
103119
} else {
104-
nextCursor = reversedRunIds.at(options.page.size - 1) ?? null;
120+
nextCursor = cursorFor(reversedRows.at(options.page.size - 1));
105121
}
106-
107122
break;
108123
}
109124
}
110125

111-
const runIdsToReturn =
112-
options.page.direction === "backward" && hasMore
113-
? runIds.slice(1, options.page.size + 1)
114-
: runIds.slice(0, options.page.size);
126+
const runIds = (
127+
direction === "backward" && hasMore
128+
? rows.slice(1, options.page.size + 1)
129+
: rows.slice(0, options.page.size)
130+
).map((row) => row.runId);
131+
132+
return { runIds, pagination: { nextCursor, previousCursor } };
133+
}
134+
135+
async listFriendlyRunIds(options: ListRunsOptions) {
136+
// First get internal IDs from ClickHouse
137+
const { runIds } = await this.listRunIds(options);
138+
139+
if (runIds.length === 0) {
140+
return [];
141+
}
142+
143+
// Then get friendly IDs from Prisma
144+
const runs = await this.options.prisma.taskRun.findMany({
145+
where: {
146+
id: {
147+
in: runIds,
148+
},
149+
},
150+
select: {
151+
friendlyId: true,
152+
},
153+
});
154+
155+
return runs.map((run) => run.friendlyId);
156+
}
157+
158+
async listRuns(options: ListRunsOptions) {
159+
const { runIds, pagination } = await this.listRunIds(options);
115160

116161
let runs = await this.options.prisma.taskRun.findMany({
117162
where: {
118163
id: {
119-
in: runIdsToReturn,
164+
in: runIds,
120165
},
121166
},
122167
orderBy: {
@@ -163,10 +208,7 @@ export class ClickHouseRunsRepository implements IRunsRepository {
163208

164209
return {
165210
runs,
166-
pagination: {
167-
nextCursor,
168-
previousCursor,
169-
},
211+
pagination,
170212
};
171213
}
172214

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Cursor encoding for keyset pagination over `(created_at, run_id)`.
3+
*
4+
* The list query orders by the composite key `(created_at, run_id)`, so a sound
5+
* cursor must carry BOTH components — cutting on `run_id` alone re-includes and
6+
* skips rows whenever `run_id` order diverges from `created_at` order.
7+
*
8+
* A cursor is an opaque URL-safe base64 token wrapping `{ c: createdAtMs, r:
9+
* runId }`. Cursors are server-issued (the SDK just echoes
10+
* `pagination.next`/`previous` back), so this format needs no client update.
11+
*
12+
* Legacy cursors were the bare internal run_id (a cuid). They are detected by
13+
* decode failure: a cuid base64-decodes to non-JSON bytes, so it falls through
14+
* to `{ kind: "legacy" }` and the old (knowingly unsound) `run_id`-only
15+
* predicate. In-flight legacy cursors keep working and drain naturally.
16+
*/
17+
18+
import { z } from "zod";
19+
20+
export type DecodedRunsCursor =
21+
| { kind: "composite"; createdAt: number; runId: string }
22+
| { kind: "legacy"; runId: string };
23+
24+
// `c` = created_at (ms since epoch), `r` = run_id. Short keys keep the token small.
25+
const CompositeCursor = z.object({
26+
c: z.number().int(),
27+
r: z.string().min(1),
28+
});
29+
30+
export function encodeRunsCursor(createdAtMs: number, runId: string): string {
31+
return Buffer.from(JSON.stringify({ c: createdAtMs, r: runId })).toString("base64url");
32+
}
33+
34+
export function decodeRunsCursor(cursor: string): DecodedRunsCursor {
35+
try {
36+
const parsed = CompositeCursor.safeParse(
37+
JSON.parse(Buffer.from(cursor, "base64url").toString("utf8"))
38+
);
39+
if (parsed.success) {
40+
return { kind: "composite", createdAt: parsed.data.c, runId: parsed.data.r };
41+
}
42+
} catch {
43+
// JSON.parse threw — not a composite cursor.
44+
}
45+
46+
return { kind: "legacy", runId: cursor };
47+
}

apps/webapp/app/services/runsRepository/runsRepository.server.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,25 @@ export type TagList = {
127127
tags: string[];
128128
};
129129

130+
export type CursorPagination = {
131+
nextCursor: string | null;
132+
previousCursor: string | null;
133+
};
134+
135+
export type RunIdsPage = {
136+
runIds: string[];
137+
pagination: CursorPagination;
138+
};
139+
130140
export interface IRunsRepository {
131141
name: string;
132-
listRunIds(options: ListRunsOptions): Promise<string[]>;
142+
/**
143+
* A keyset-paginated page of run ids plus the cursors to navigate
144+
* forward/backward. The cursors are opaque composite `(created_at, run_id)`
145+
* tokens, so pagination can't duplicate or skip runs. This is the single
146+
* cursor-aware list primitive — `listRuns` and bulk actions build on it.
147+
*/
148+
listRunIds(options: ListRunsOptions): Promise<RunIdsPage>;
133149
/** Returns friendly IDs (e.g., run_xxx) instead of internal UUIDs. Used for ClickHouse task_events queries. */
134150
listFriendlyRunIds(options: ListRunsOptions): Promise<string[]>;
135151
listRuns(options: ListRunsOptions): Promise<{
@@ -154,7 +170,7 @@ export class RunsRepository implements IRunsRepository {
154170
return "runsRepository";
155171
}
156172

157-
async listRunIds(options: ListRunsOptions): Promise<string[]> {
173+
async listRunIds(options: ListRunsOptions): Promise<RunIdsPage> {
158174
return startActiveSpan(
159175
"runsRepository.listRunIds",
160176
async () => this.clickHouseRunsRepository.listRunIds(options),

apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,13 @@ export class BulkActionService extends BaseService {
159159
throw new Error(`Bulk action group has invalid query name: ${group.queryName}`);
160160
}
161161

162-
// 2. Get the runs to process in this batch
163-
const runIds = await runsRepository.listRunIds({
162+
// 2. Get the runs to process in this batch, plus the cursor for the next
163+
// batch. The cursor is a composite (created_at, run_id) keyset cursor so the
164+
// next batch can't re-include or skip runs.
165+
const {
166+
runIds: runIdsToProcess,
167+
pagination: { nextCursor },
168+
} = await runsRepository.listRunIds({
164169
...filters,
165170
page: {
166171
size: env.BULK_ACTION_BATCH_SIZE,
@@ -172,8 +177,6 @@ export class BulkActionService extends BaseService {
172177
// 3. Process the runs
173178
let successCount = 0;
174179
let failureCount = 0;
175-
// Slice because we fetch an extra for the cursor
176-
const runIdsToProcess = runIds.slice(0, env.BULK_ACTION_BATCH_SIZE);
177180

178181
switch (group.type) {
179182
case BulkActionType.CANCEL: {
@@ -274,7 +277,10 @@ export class BulkActionService extends BaseService {
274277
}
275278
}
276279

277-
const isFinished = runIdsToProcess.length === 0;
280+
// A null nextCursor means there is no further page — this batch was the
281+
// last (or there were no runs at all), so the action is complete. (An empty
282+
// batch also yields a null cursor.)
283+
const isFinished = nextCursor === null;
278284

279285
logger.debug("Bulk action group processed batch", {
280286
bulkActionId,
@@ -292,7 +298,8 @@ export class BulkActionService extends BaseService {
292298
const updatedGroup = await this._prisma.bulkActionGroup.update({
293299
where: { id: bulkActionId },
294300
data: {
295-
cursor: runIdsToProcess.at(runIdsToProcess.length - 1),
301+
// Json column: leave unchanged when there's no next cursor (finished).
302+
cursor: nextCursor ?? undefined,
296303
successCount: {
297304
increment: successCount,
298305
},

0 commit comments

Comments
 (0)