Skip to content

Commit 6b3c823

Browse files
matt-aitkenclaude
andcommitted
fix(webapp): use composite keyset cursor for run pagination
listRunIds/listRuns order by the composite key (created_at, run_id) but the cursor predicate cut on run_id alone. That is only sound when run_id lexicographic order matches created_at order. When a burst of runs is created such that the two diverge, keyset pagination both re-includes already-returned runs (duplicates) and drops runs it should return (skips). For bulk replay this produced duplicate runs; for the dashboard and runs.list it could silently skip or repeat runs at page boundaries. - Encode cursors as the composite (created_at, run_id) key (v2_<createdAtMs>_<runId>) and cut on the matching tuple predicate ((created_at, run_id) < / > (...)). The ORDER BY is unchanged, so the table's primary-key alignment (and query performance) is preserved. - Cursors are server-issued opaque tokens (the SDK just echoes pagination.next/previous back), so this needs no client update. Legacy bare-run_id cursors decode to the old run_id-only predicate for backwards compatibility with in-flight cursors. - Add listRunIdsWithCursor for forward-only batch iteration (bulk actions) so the created_at component is sourced from the same query that orders the rows. - ClickHouse getTaskRunsQueryBuilder now also selects toUnixTimestamp64Milli(created_at) AS created_at_ms. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent fa4804e commit 6b3c823

7 files changed

Lines changed: 483 additions & 30 deletions

File tree

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix unsound keyset pagination in `ClickHouseRunsRepository` that could duplicate
7+
or skip runs. The list query orders by `(created_at, run_id)` but the cursor
8+
predicate cut on `run_id` alone, which is only sound when run_id order matches
9+
created_at order. When a burst of runs is created such that the two diverge,
10+
bulk replay could re-process already-replayed runs, and dashboard / `runs.list`
11+
pagination could silently skip or repeat runs at page boundaries.
12+
13+
Cursors now encode the composite `(created_at, run_id)` key as an opaque
14+
URL-safe base64 token, and the query cuts on the matching tuple. Cursors are
15+
server-issued opaque tokens, so this needs no SDK update; legacy bare-run_id
16+
cursors decode to the old `run_id`-only predicate for backwards compatibility
17+
with in-flight cursors.

apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import {
1010
convertRunListInputOptionsToFilterRunsOptions,
1111
} from "./runsRepository.server";
1212
import parseDuration from "parse-duration";
13+
import { decodeRunsCursor, encodeRunsCursor } from "./runsCursor.server";
14+
15+
type RunCursorRow = { runId: string; createdAt: number };
1316

1417
export class ClickHouseRunsRepository implements IRunsRepository {
1518
constructor(private readonly options: RunsRepositoryOptions) {}
@@ -18,25 +21,52 @@ export class ClickHouseRunsRepository implements IRunsRepository {
1821
return "clickhouse";
1922
}
2023

21-
async listRunIds(options: ListRunsOptions) {
24+
/**
25+
* Runs the keyset-paginated query and returns `{ runId, createdAt }` rows
26+
* (one extra beyond `page.size` to signal "has more"). The ordering is always
27+
* the composite `(created_at, run_id)`; the cursor predicate must match it.
28+
*
29+
* Composite cursors carry both components, so we cut on the
30+
* `(created_at, run_id)` tuple — sound regardless of how run_id order relates
31+
* to created_at order. Legacy bare-run_id cursors fall back to the old
32+
* `run_id`-only predicate (knowingly unsound) for backwards compatibility
33+
* with in-flight cursors.
34+
*/
35+
private async listRunRows(options: ListRunsOptions): Promise<RunCursorRow[]> {
2236
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
2337
applyRunFiltersToQueryBuilder(
2438
queryBuilder,
2539
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
2640
);
2741

42+
const forward = options.page.direction === "forward" || !options.page.direction;
43+
2844
if (options.page.cursor) {
29-
if (options.page.direction === "forward" || !options.page.direction) {
30-
queryBuilder
31-
.where("run_id < {runId: String}", { runId: options.page.cursor })
32-
.orderBy("created_at DESC, run_id DESC")
33-
.limit(options.page.size + 1);
45+
const decoded = decodeRunsCursor(options.page.cursor);
46+
47+
if (forward) {
48+
if (decoded.kind === "composite") {
49+
queryBuilder.where(
50+
"(created_at, run_id) < (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})",
51+
{ cursorCreatedAt: decoded.createdAt, runId: decoded.runId }
52+
);
53+
} else {
54+
queryBuilder.where("run_id < {runId: String}", { runId: decoded.runId });
55+
}
56+
queryBuilder.orderBy("created_at DESC, run_id DESC");
3457
} else {
35-
queryBuilder
36-
.where("run_id > {runId: String}", { runId: options.page.cursor })
37-
.orderBy("created_at ASC, run_id ASC")
38-
.limit(options.page.size + 1);
58+
if (decoded.kind === "composite") {
59+
queryBuilder.where(
60+
"(created_at, run_id) > (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})",
61+
{ cursorCreatedAt: decoded.createdAt, runId: decoded.runId }
62+
);
63+
} else {
64+
queryBuilder.where("run_id > {runId: String}", { runId: decoded.runId });
65+
}
66+
queryBuilder.orderBy("created_at ASC, run_id ASC");
3967
}
68+
69+
queryBuilder.limit(options.page.size + 1);
4070
} else {
4171
// Initial page - no cursor provided
4272
queryBuilder.orderBy("created_at DESC, run_id DESC").limit(options.page.size + 1);
@@ -48,8 +78,31 @@ export class ClickHouseRunsRepository implements IRunsRepository {
4878
throw queryError;
4979
}
5080

51-
const runIds = result.map((row) => row.run_id);
52-
return runIds;
81+
return result.map((row) => ({ runId: row.run_id, createdAt: row.created_at_ms }));
82+
}
83+
84+
async listRunIds(options: ListRunsOptions) {
85+
const rows = await this.listRunRows(options);
86+
return rows.map((row) => row.runId);
87+
}
88+
89+
/**
90+
* Forward-only batch iteration (bulk actions). Returns up to `page.size` run
91+
* ids plus the composite cursor for the next batch (null when this batch is
92+
* empty). The `created_at` component comes from the same query that orders the
93+
* rows, so the next batch's tuple predicate is always consistent.
94+
*/
95+
async listRunIdsWithCursor(
96+
options: ListRunsOptions
97+
): Promise<{ runIds: string[]; nextCursor: string | null }> {
98+
const rows = await this.listRunRows(options);
99+
const batch = rows.slice(0, options.page.size);
100+
const last = batch.at(-1);
101+
102+
return {
103+
runIds: batch.map((row) => row.runId),
104+
nextCursor: last ? encodeRunsCursor(last.createdAt, last.runId) : null,
105+
};
53106
}
54107

55108
async listFriendlyRunIds(options: ListRunsOptions) {
@@ -76,10 +129,15 @@ export class ClickHouseRunsRepository implements IRunsRepository {
76129
}
77130

78131
async listRuns(options: ListRunsOptions) {
79-
const runIds = await this.listRunIds(options);
132+
const rows = await this.listRunRows(options);
80133

81134
// If there are more runs than the page size, we need to fetch the next page
82-
const hasMore = runIds.length > options.page.size;
135+
const hasMore = rows.length > options.page.size;
136+
137+
// Cursors carry both (created_at, run_id) so the next/prev page predicate
138+
// matches the composite ordering — see runsCursor.server.ts.
139+
const cursorFor = (row: RunCursorRow | undefined): string | null =>
140+
row ? encodeRunsCursor(row.createdAt, row.runId) : null;
83141

84142
let nextCursor: string | null = null;
85143
let previousCursor: string | null = null;
@@ -88,30 +146,31 @@ export class ClickHouseRunsRepository implements IRunsRepository {
88146
const direction = options.page.direction ?? "forward";
89147
switch (direction) {
90148
case "forward": {
91-
previousCursor = options.page.cursor ? runIds.at(0) ?? null : null;
149+
previousCursor = options.page.cursor ? cursorFor(rows.at(0)) : null;
92150
if (hasMore) {
93-
// The next cursor should be the last run ID from this page
94-
nextCursor = runIds[options.page.size - 1];
151+
// The next cursor should be the last run from this page
152+
nextCursor = cursorFor(rows[options.page.size - 1]);
95153
}
96154
break;
97155
}
98156
case "backward": {
99-
const reversedRunIds = [...runIds].reverse();
157+
const reversedRows = [...rows].reverse();
100158
if (hasMore) {
101-
previousCursor = reversedRunIds.at(1) ?? null;
102-
nextCursor = reversedRunIds.at(options.page.size) ?? null;
159+
previousCursor = cursorFor(reversedRows.at(1));
160+
nextCursor = cursorFor(reversedRows.at(options.page.size));
103161
} else {
104-
nextCursor = reversedRunIds.at(options.page.size - 1) ?? null;
162+
nextCursor = cursorFor(reversedRows.at(options.page.size - 1));
105163
}
106164

107165
break;
108166
}
109167
}
110168

111-
const runIdsToReturn =
169+
const runIdsToReturn = (
112170
options.page.direction === "backward" && hasMore
113-
? runIds.slice(1, options.page.size + 1)
114-
: runIds.slice(0, options.page.size);
171+
? rows.slice(1, options.page.size + 1)
172+
: rows.slice(0, options.page.size)
173+
).map((row) => row.runId);
115174

116175
let runs = await this.options.prisma.taskRun.findMany({
117176
where: {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Cursor encoding for keyset pagination over `(created_at, run_id)`.
3+
*
4+
* The list query orders by the composite key `(created_at, run_id)`, so a sound
5+
* cursor must carry BOTH components — cutting on `run_id` alone re-includes and
6+
* skips rows whenever `run_id` order diverges from `created_at` order.
7+
*
8+
* A cursor is an opaque URL-safe base64 token wrapping `{ c: createdAtMs, r:
9+
* runId }`. Cursors are server-issued (the SDK just echoes
10+
* `pagination.next`/`previous` back), so this format needs no client update.
11+
*
12+
* Legacy cursors were the bare internal run_id (a cuid). They are detected by
13+
* decode failure: a cuid base64-decodes to non-JSON bytes, so it falls through
14+
* to `{ kind: "legacy" }` and the old (knowingly unsound) `run_id`-only
15+
* predicate. In-flight legacy cursors keep working and drain naturally.
16+
*/
17+
18+
import { z } from "zod";
19+
20+
export type DecodedRunsCursor =
21+
| { kind: "composite"; createdAt: number; runId: string }
22+
| { kind: "legacy"; runId: string };
23+
24+
// `c` = created_at (ms since epoch), `r` = run_id. Short keys keep the token small.
25+
const CompositeCursor = z.object({
26+
c: z.number().int(),
27+
r: z.string().min(1),
28+
});
29+
30+
export function encodeRunsCursor(createdAtMs: number, runId: string): string {
31+
return Buffer.from(JSON.stringify({ c: createdAtMs, r: runId })).toString("base64url");
32+
}
33+
34+
export function decodeRunsCursor(cursor: string): DecodedRunsCursor {
35+
try {
36+
const parsed = CompositeCursor.safeParse(
37+
JSON.parse(Buffer.from(cursor, "base64url").toString("utf8"))
38+
);
39+
if (parsed.success) {
40+
return { kind: "composite", createdAt: parsed.data.c, runId: parsed.data.r };
41+
}
42+
} catch {
43+
// JSON.parse threw — not a composite cursor.
44+
}
45+
46+
return { kind: "legacy", runId: cursor };
47+
}

apps/webapp/app/services/runsRepository/runsRepository.server.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ export type TagList = {
130130
export interface IRunsRepository {
131131
name: string;
132132
listRunIds(options: ListRunsOptions): Promise<string[]>;
133+
/**
134+
* Forward-only batch iteration (bulk actions). Returns up to `page.size` run
135+
* ids and the composite cursor for the next batch (null when the batch is
136+
* empty). Keeping cursor construction here ensures the `created_at` component
137+
* comes from the same source as the ordering.
138+
*/
139+
listRunIdsWithCursor(
140+
options: ListRunsOptions
141+
): Promise<{ runIds: string[]; nextCursor: string | null }>;
133142
/** Returns friendly IDs (e.g., run_xxx) instead of internal UUIDs. Used for ClickHouse task_events queries. */
134143
listFriendlyRunIds(options: ListRunsOptions): Promise<string[]>;
135144
listRuns(options: ListRunsOptions): Promise<{
@@ -169,6 +178,23 @@ export class RunsRepository implements IRunsRepository {
169178
);
170179
}
171180

181+
async listRunIdsWithCursor(
182+
options: ListRunsOptions
183+
): Promise<{ runIds: string[]; nextCursor: string | null }> {
184+
return startActiveSpan(
185+
"runsRepository.listRunIdsWithCursor",
186+
async () => this.clickHouseRunsRepository.listRunIdsWithCursor(options),
187+
{
188+
attributes: {
189+
"repository.name": "clickhouse",
190+
organizationId: options.organizationId,
191+
projectId: options.projectId,
192+
environmentId: options.environmentId,
193+
},
194+
}
195+
);
196+
}
197+
172198
async listFriendlyRunIds(options: ListRunsOptions): Promise<string[]> {
173199
return startActiveSpan(
174200
"runsRepository.listFriendlyRunIds",

apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,10 @@ export class BulkActionService extends BaseService {
159159
throw new Error(`Bulk action group has invalid query name: ${group.queryName}`);
160160
}
161161

162-
// 2. Get the runs to process in this batch
163-
const runIds = await runsRepository.listRunIds({
162+
// 2. Get the runs to process in this batch, plus the cursor for the next
163+
// batch. The cursor is a composite (created_at, run_id) keyset cursor so the
164+
// next batch can't re-include or skip runs.
165+
const { runIds: runIdsToProcess, nextCursor } = await runsRepository.listRunIdsWithCursor({
164166
...filters,
165167
page: {
166168
size: env.BULK_ACTION_BATCH_SIZE,
@@ -172,8 +174,6 @@ export class BulkActionService extends BaseService {
172174
// 3. Process the runs
173175
let successCount = 0;
174176
let failureCount = 0;
175-
// Slice because we fetch an extra for the cursor
176-
const runIdsToProcess = runIds.slice(0, env.BULK_ACTION_BATCH_SIZE);
177177

178178
switch (group.type) {
179179
case BulkActionType.CANCEL: {
@@ -292,7 +292,7 @@ export class BulkActionService extends BaseService {
292292
const updatedGroup = await this._prisma.bulkActionGroup.update({
293293
where: { id: bulkActionId },
294294
data: {
295-
cursor: runIdsToProcess.at(runIdsToProcess.length - 1),
295+
cursor: nextCursor,
296296
successCount: {
297297
increment: successCount,
298298
},

0 commit comments

Comments
 (0)