Skip to content

Commit 021e088

Browse files
committed
Use resolveEventRepositoryForStore everywhere
1 parent 78cc4f5 commit 021e088

11 files changed

+89
-22
lines changed

apps/webapp/app/routes/api.v1.runs.$runId.events.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ export const loader = createLoaderApiRoute(
3131
},
3232
},
3333
async ({ resource: run, authentication }) => {
34-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
34+
const eventRepository = resolveEventRepositoryForStore(
35+
run.taskEventStore,
36+
authentication.environment.organization.id
37+
);
3538

3639
const runEvents = await eventRepository.getRunEvents(
3740
getTaskEventStoreTableForRun(run),

apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ export const loader = createLoaderApiRoute(
3838
},
3939
},
4040
async ({ params, resource: run, authentication }) => {
41-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
41+
const eventRepository = resolveEventRepositoryForStore(
42+
run.taskEventStore,
43+
authentication.environment.organization.id
44+
);
4245
const eventStore = getTaskEventStoreTableForRun(run);
4346

4447
const span = await eventRepository.getSpan(

apps/webapp/app/routes/api.v1.runs.$runId.trace.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ export const loader = createLoaderApiRoute(
3636
},
3737
},
3838
async ({ resource: run, authentication }) => {
39-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
39+
const eventRepository = resolveEventRepositoryForStore(
40+
run.taskEventStore,
41+
authentication.environment.organization.id
42+
);
4043

4144
const traceSummary = await eventRepository.getTraceDetailedSummary(
4245
getTaskEventStoreTableForRun(run),

apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
3333
return new Response("Not found", { status: 404 });
3434
}
3535

36-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
36+
const eventRepository = resolveEventRepositoryForStore(
37+
run.taskEventStore,
38+
run.organizationId ?? ""
39+
);
3740

3841
const runEvents = await eventRepository.getRunEvents(
3942
getTaskEventStoreTableForRun(run),

apps/webapp/app/v3/eventRepository/index.server.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,21 @@ export const EVENT_STORE_TYPES = {
1616

1717
export type EventStoreType = (typeof EVENT_STORE_TYPES)[keyof typeof EVENT_STORE_TYPES];
1818

19+
/**
20+
* Resolve the event repository for a run's persisted `taskEventStore` value and org.
21+
* Postgres-backed runs use the Prisma `eventRepository`; ClickHouse-backed runs use
22+
* `clickhouseFactory.getEventRepositoryForOrganizationSync`.
23+
*/
24+
export function resolveEventRepositoryForStore(
25+
store: string,
26+
organizationId: string
27+
): IEventRepository {
28+
if (store === EVENT_STORE_TYPES.CLICKHOUSE || store === EVENT_STORE_TYPES.CLICKHOUSE_V2) {
29+
return clickhouseFactory.getEventRepositoryForOrganizationSync(store, organizationId).repository;
30+
}
31+
return eventRepository;
32+
}
33+
1934
export async function getConfiguredEventRepository(
2035
organizationId: string
2136
): Promise<{ repository: IEventRepository; store: EventStoreType }> {

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAle
2727
import { TaskRunErrorCodes } from "@trigger.dev/core/v3";
2828

2929
export function registerRunEngineEventBusHandlers() {
30-
engine.eventBus.on("runSucceeded", async ({ time, run }) => {
30+
engine.eventBus.on("runSucceeded", async ({ time, run, organization }) => {
3131
const [taskRunError, taskRun] = await tryCatch(
3232
$replica.taskRun.findFirstOrThrow({
3333
where: {
@@ -60,7 +60,10 @@ export function registerRunEngineEventBusHandlers() {
6060
return;
6161
}
6262

63-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
63+
const eventRepository = resolveEventRepositoryForStore(
64+
run.taskEventStore,
65+
taskRun.organizationId ?? organization.id
66+
);
6467

6568
const [completeSuccessfulRunEventError] = await tryCatch(
6669
eventRepository.completeSuccessfulRunEvent({
@@ -91,7 +94,7 @@ export function registerRunEngineEventBusHandlers() {
9194
});
9295

9396
// Handle events
94-
engine.eventBus.on("runFailed", async ({ time, run }) => {
97+
engine.eventBus.on("runFailed", async ({ time, run, organization }) => {
9598
const sanitizedError = sanitizeError(run.error);
9699
const exception = createExceptionPropertiesFromError(sanitizedError);
97100

@@ -127,7 +130,10 @@ export function registerRunEngineEventBusHandlers() {
127130
return;
128131
}
129132

130-
const eventRepository = resolveEventRepositoryForStore(taskRun.taskEventStore);
133+
const eventRepository = resolveEventRepositoryForStore(
134+
run.taskEventStore,
135+
taskRun.organizationId ?? organization.id
136+
);
131137

132138
const [completeFailedRunEventError] = await tryCatch(
133139
eventRepository.completeFailedRunEvent({
@@ -181,7 +187,10 @@ export function registerRunEngineEventBusHandlers() {
181187
return;
182188
}
183189

184-
const eventRepository = resolveEventRepositoryForStore(taskRun.taskEventStore);
190+
const eventRepository = resolveEventRepositoryForStore(
191+
run.taskEventStore,
192+
taskRun.organizationId ?? ""
193+
);
185194

186195
const [createAttemptFailedRunEventError] = await tryCatch(
187196
eventRepository.createAttemptFailedRunEvent({
@@ -282,7 +291,10 @@ export function registerRunEngineEventBusHandlers() {
282291
return;
283292
}
284293

285-
const eventRepository = resolveEventRepositoryForStore(blockedRun.taskEventStore);
294+
const eventRepository = resolveEventRepositoryForStore(
295+
blockedRun.taskEventStore,
296+
blockedRun.organizationId ?? ""
297+
);
286298

287299
const [completeCachedRunEventError] = await tryCatch(
288300
eventRepository.completeCachedRunEvent({
@@ -305,7 +317,7 @@ export function registerRunEngineEventBusHandlers() {
305317
}
306318
);
307319

308-
engine.eventBus.on("runExpired", async ({ time, run }) => {
320+
engine.eventBus.on("runExpired", async ({ time, run, organization }) => {
309321
if (!run.ttl) {
310322
return;
311323
}
@@ -342,7 +354,10 @@ export function registerRunEngineEventBusHandlers() {
342354
return;
343355
}
344356

345-
const eventRepository = resolveEventRepositoryForStore(taskRun.taskEventStore);
357+
const eventRepository = resolveEventRepositoryForStore(
358+
taskRun.taskEventStore,
359+
taskRun.organizationId ?? organization.id
360+
);
346361

347362
const [completeExpiredRunEventError] = await tryCatch(
348363
eventRepository.completeExpiredRunEvent({
@@ -360,7 +375,7 @@ export function registerRunEngineEventBusHandlers() {
360375
}
361376
});
362377

363-
engine.eventBus.on("runCancelled", async ({ time, run }) => {
378+
engine.eventBus.on("runCancelled", async ({ time, run, organization }) => {
364379
const [taskRunError, taskRun] = await tryCatch(
365380
$replica.taskRun.findFirstOrThrow({
366381
where: {
@@ -393,7 +408,10 @@ export function registerRunEngineEventBusHandlers() {
393408
return;
394409
}
395410

396-
const eventRepository = resolveEventRepositoryForStore(taskRun.taskEventStore);
411+
const eventRepository = resolveEventRepositoryForStore(
412+
taskRun.taskEventStore,
413+
taskRun.organizationId ?? organization.id
414+
);
397415

398416
const error = createJsonErrorObject(run.error);
399417

@@ -413,7 +431,7 @@ export function registerRunEngineEventBusHandlers() {
413431
}
414432
});
415433

416-
engine.eventBus.on("runRetryScheduled", async ({ time, run, environment, retryAt }) => {
434+
engine.eventBus.on("runRetryScheduled", async ({ time, run, environment, retryAt, organization }) => {
417435
try {
418436
if (retryAt && time && time >= retryAt) {
419437
return;
@@ -426,7 +444,10 @@ export function registerRunEngineEventBusHandlers() {
426444
retryMessage += ` after OOM`;
427445
}
428446

429-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
447+
const eventRepository = resolveEventRepositoryForStore(
448+
run.taskEventStore ?? "taskEvent",
449+
organization.id
450+
);
430451

431452
await eventRepository.recordEvent(retryMessage, {
432453
startTime: BigInt(time.getTime() * 1000000),

apps/webapp/app/v3/services/cancelTaskRunV1.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ export class CancelTaskRunServiceV1 extends BaseService {
101101
},
102102
});
103103

104-
const eventRepository = resolveEventRepositoryForStore(cancelledTaskRun.taskEventStore);
104+
const eventRepository = resolveEventRepositoryForStore(
105+
cancelledTaskRun.taskEventStore,
106+
cancelledTaskRun.runtimeEnvironment.organizationId
107+
);
105108

106109
const [cancelRunEventError] = await tryCatch(
107110
eventRepository.cancelRunEvent({

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,10 @@ export class CompleteAttemptService extends BaseService {
163163
env,
164164
});
165165

166-
const eventRepository = resolveEventRepositoryForStore(taskRunAttempt.taskRun.taskEventStore);
166+
const eventRepository = resolveEventRepositoryForStore(
167+
taskRunAttempt.taskRun.taskEventStore,
168+
taskRunAttempt.taskRun.organizationId ?? ""
169+
);
167170

168171
const [completeSuccessfulRunEventError] = await tryCatch(
169172
eventRepository.completeSuccessfulRunEvent({
@@ -316,7 +319,10 @@ export class CompleteAttemptService extends BaseService {
316319
exitRun(taskRunAttempt.taskRunId);
317320
}
318321

319-
const eventRepository = resolveEventRepositoryForStore(taskRunAttempt.taskRun.taskEventStore);
322+
const eventRepository = resolveEventRepositoryForStore(
323+
taskRunAttempt.taskRun.taskEventStore,
324+
taskRunAttempt.taskRun.organizationId ?? ""
325+
);
320326

321327
const [completeFailedRunEventError] = await tryCatch(
322328
eventRepository.completeFailedRunEvent({
@@ -538,7 +544,10 @@ export class CompleteAttemptService extends BaseService {
538544
}) {
539545
const retryAt = new Date(executionRetry.timestamp);
540546

541-
const eventRepository = resolveEventRepositoryForStore(taskRunAttempt.taskRun.taskEventStore);
547+
const eventRepository = resolveEventRepositoryForStore(
548+
taskRunAttempt.taskRun.taskEventStore,
549+
taskRunAttempt.taskRun.organizationId ?? ""
550+
);
542551

543552
// Retry the task run
544553
await eventRepository.recordEvent(

apps/webapp/app/v3/services/crashTaskRun.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ export class CrashTaskRunService extends BaseService {
120120
},
121121
});
122122

123-
const eventRepository = resolveEventRepositoryForStore(crashedTaskRun.taskEventStore);
123+
const eventRepository = resolveEventRepositoryForStore(
124+
crashedTaskRun.taskEventStore,
125+
crashedTaskRun.runtimeEnvironment.organizationId
126+
);
124127

125128
const [createAttemptFailedEventError] = await tryCatch(
126129
eventRepository.completeFailedRunEvent({

apps/webapp/app/v3/services/expireEnqueuedRun.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ export class ExpireEnqueuedRunService extends BaseService {
7878
},
7979
});
8080

81-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
81+
const eventRepository = resolveEventRepositoryForStore(
82+
run.taskEventStore,
83+
run.runtimeEnvironment.organization.id
84+
);
8285

8386
if (run.ttl) {
8487
const [completeExpiredRunEventError] = await tryCatch(

0 commit comments

Comments
 (0)