Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4e57592
Implement organization-scoped ClickHouse instances
matt-aitken Mar 26, 2026
56b4e14
Better replication performance
matt-aitken Mar 26, 2026
35b446f
Removed dynamic imports
matt-aitken Mar 26, 2026
d369fc1
otlpExporter.server reverted to main
matt-aitken Mar 27, 2026
2e585b6
Switch to a DataStore registry
matt-aitken Mar 31, 2026
aa8f102
Admin page for adding data stores
matt-aitken Apr 1, 2026
62ec253
New admin page, lots of improvements to make it more robust and testable
matt-aitken Apr 2, 2026
5700df8
Use the clickhouseFactory directly. WIP on new event repository
matt-aitken Apr 2, 2026
24f5bc6
Errors switched to using org-specific clickhouses
matt-aitken Apr 2, 2026
456d3e0
RunPresenter: use org event repository
matt-aitken Apr 2, 2026
cdde4dc
SpanPresenter: use org event repository
matt-aitken Apr 2, 2026
82351a5
Move metrics sending to the event repository
matt-aitken Apr 4, 2026
e2bc8de
We don't need separate stores for metrics
matt-aitken Apr 4, 2026
757a07f
Back to main runReplicationService
matt-aitken Apr 4, 2026
8798599
Run Replication using the factory
matt-aitken Apr 6, 2026
9dfd82a
Get the tracer from the provider
matt-aitken Apr 7, 2026
ce0cd1d
Use resolveEventRepositoryForStore everywhere
matt-aitken Apr 9, 2026
471753f
Admin data stores editing
matt-aitken Apr 14, 2026
a6628e8
Reload the data store every minute
matt-aitken Apr 14, 2026
7820d0e
Work with Postgres EventRepository for self-hosters
matt-aitken Apr 16, 2026
3073289
Error fingerprint should use the logs client
matt-aitken Apr 16, 2026
0f94684
Retry the initial boot using p-retry
matt-aitken Apr 16, 2026
edfaba6
Fix Devin Review bugs: postgres event repository fallback and error p…
devin-ai-integration[bot] Apr 16, 2026
7da8bc6
Fall back to Postgres eventRepository for non-ClickHouse stores
devin-ai-integration[bot] Apr 17, 2026
40f95a7
Move Postgres event repository fallback out of ClickHouse factory
devin-ai-integration[bot] Apr 17, 2026
20fad45
Make OrganizationDataStoresRegistry deterministic on overlap
devin-ai-integration[bot] Apr 17, 2026
5aff3ed
try/catch getting the clickhouse client
matt-aitken Apr 17, 2026
33d0bd0
Better handling for no org id on task run
matt-aitken Apr 17, 2026
34d5611
Test for deterministic data store resolution if there are multiple en…
matt-aitken Apr 17, 2026
6f75772
Separate the clickhouseFactoryInstance so the db client isn't pulled …
matt-aitken Apr 17, 2026
a4cbdf1
Use the same ClickHouse settings for org specific clients
matt-aitken Apr 17, 2026
fbddd28
Update apps/webapp/app/services/dataStores/organizationDataStoresRegi…
matt-aitken Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .cursor/mcp.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
{
"mcpServers": {}
"mcpServers": {
"linear": {
"url": "https://mcp.linear.app/mcp"
}
}
}
6 changes: 6 additions & 0 deletions .server-changes/organization-scoped-clickhouse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Organization-scoped ClickHouse routing enables customers with HIPAA and other data security requirements to use dedicated database instances
11 changes: 11 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ containerTest("should use both", async ({ prisma, redisOptions }) => {
});
```

## Code Style

### Imports

**Prefer static imports over dynamic imports.** Only use dynamic `import()` when:
- Circular dependencies cannot be resolved otherwise
- Code splitting is genuinely needed for performance
- The module must be loaded conditionally at runtime

Dynamic imports add unnecessary overhead in hot paths and make code harder to analyze. If you find yourself using `await import()`, ask if a regular `import` statement would work instead.

## Changesets and Server Changes

When modifying any public package (`packages/*` or `integrations/*`), add a changeset:
Expand Down
16 changes: 14 additions & 2 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,10 @@ const EnvironmentSchema = z
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),
OBJECT_STORE_DEFAULT_PROTOCOL: z
.string()
.regex(/^[a-z0-9]+$/)
.optional(),

ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
Expand Down Expand Up @@ -1306,9 +1309,18 @@ const EnvironmentSchema = z
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),

// Organization data stores registry
ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce
.number()
.int()
.default(60 * 1000), // 1 minute

// LLM cost tracking
LLM_COST_TRACKING_ENABLED: BoolEnv.default(true),
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce
.number()
.int()
.default(5 * 60 * 1000), // 5 minutes
LLM_PRICING_SEED_ON_STARTUP: BoolEnv.default(false),
LLM_PRICING_READY_TIMEOUT_MS: z.coerce.number().int().default(500),
LLM_METRICS_BATCH_SIZE: z.coerce.number().int().default(5000),
Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { type Project, type RuntimeEnvironment, type TaskRunStatus } from "@trig
import assertNever from "assert-never";
import { z } from "zod";
import { API_VERSIONS, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { logger } from "~/services/logger.server";
import { CoercedDate } from "~/utils/zod";
import { ServiceValidationError } from "~/v3/services/baseService.server";
Expand Down Expand Up @@ -259,7 +259,8 @@ export class ApiRunListPresenter extends BasePresenter {
options.machines = searchParams["filter[machine]"];
}

const presenter = new NextRunListPresenter(this._replica, clickhouseClient);
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const presenter = new NextRunListPresenter(this._replica, clickhouse);

logger.debug("Calling RunListPresenter", { options });

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type PrismaClient } from "@trigger.dev/database";
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { getRunFiltersFromRequest } from "../RunFilters.server";
import { BasePresenter } from "./basePresenter.server";
Expand All @@ -24,8 +24,9 @@ export class CreateBulkActionPresenter extends BasePresenter {
Object.fromEntries(new URL(request.url).searchParams)
);

const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const runsRepository = new RunsRepository({
clickhouse: clickhouseClient,
clickhouse,
prisma: this._replica as PrismaClient,
});

Expand Down
11 changes: 7 additions & 4 deletions apps/webapp/app/presenters/v3/RunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr
import { prisma, type PrismaClient } from "~/db.server";
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
import { getUsername } from "~/utils/username";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { isFinalRunStatus } from "~/v3/taskStatus";
import { env } from "~/env.server";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";

type Result = Awaited<ReturnType<RunPresenter["call"]>>;
export type Run = Result["run"];
Expand Down Expand Up @@ -145,10 +145,13 @@ export class RunPresenter {
};
}

const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
const repository = await getEventRepositoryForStore(
run.taskEventStore,
run.runtimeEnvironment.organizationId
);

// get the events
let traceSummary = await eventRepository.getTraceSummary(
let traceSummary = await repository.getTraceSummary(
getTaskEventStoreTableForRun(run),
run.runtimeEnvironment.id,
run.traceId,
Expand Down Expand Up @@ -272,7 +275,7 @@ export class RunPresenter {
overridesBySpanId: traceSummary.overridesBySpanId,
linkedRunIdBySpanId,
},
maximumLiveReloadingSetting: eventRepository.maximumLiveReloadingSetting,
maximumLiveReloadingSetting: repository.maximumLiveReloadingSetting,
};
}
}
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { BasePresenter } from "./basePresenter.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { type PrismaClient } from "@trigger.dev/database";
import { timeFilters } from "~/components/runs/v3/SharedFilters";

Expand Down Expand Up @@ -37,8 +37,9 @@ export class RunTagListPresenter extends BasePresenter {
}: TagListOptions) {
const hasFilters = Boolean(name?.trim());

const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const runsRepository = new RunsRepository({
clickhouse: clickhouseClient,
clickhouse,
prisma: this._replica as PrismaClient,
});

Expand Down
25 changes: 10 additions & 15 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { BasePresenter } from "./basePresenter.server";
import { WaitpointPresenter } from "./WaitpointPresenter.server";
import { engine } from "~/v3/runEngine.server";
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";
import { safeJsonParse } from "~/utils/json";
import {
Expand All @@ -30,6 +29,7 @@ import {
extractAIToolCallData,
extractAIEmbedData,
} from "~/components/runs/v3/ai";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";

export type PromptSpanData = {
slug: string;
Expand All @@ -42,9 +42,7 @@ export type PromptSpanData = {
config?: string;
};

function extractPromptSpanData(
properties: Record<string, unknown>
): PromptSpanData | undefined {
function extractPromptSpanData(properties: Record<string, unknown>): PromptSpanData | undefined {
// Properties come as an unflattened nested object from ClickHouse,
// e.g. { prompt: { slug: "...", version: 3, ... } }
const prompt = properties.prompt;
Expand Down Expand Up @@ -132,14 +130,17 @@ export class SpanPresenter extends BasePresenter {

const { traceId } = parentRun;

const eventRepository = resolveEventRepositoryForStore(parentRun.taskEventStore);
const repository = await getEventRepositoryForStore(
parentRun.taskEventStore,
project.organizationId
);

const eventStore = getTaskEventStoreTableForRun(parentRun);

const run = await this.getRun({
eventStore,
traceId,
eventRepository,
eventRepository: repository,
spanId,
linkedRunId,
createdAt: parentRun.createdAt,
Expand All @@ -161,7 +162,7 @@ export class SpanPresenter extends BasePresenter {
projectId: parentRun.projectId,
createdAt: parentRun.createdAt,
completedAt: parentRun.completedAt,
eventRepository,
eventRepository: repository,
});

if (!span) {
Expand Down Expand Up @@ -592,10 +593,7 @@ export class SpanPresenter extends BasePresenter {
triggeredRuns,
aiData:
span.properties && typeof span.properties === "object"
? extractAISpanData(
span.properties as Record<string, unknown>,
span.duration / 1_000_000
)
? extractAISpanData(span.properties as Record<string, unknown>, span.duration / 1_000_000)
: undefined,
};

Expand Down Expand Up @@ -739,10 +737,7 @@ export class SpanPresenter extends BasePresenter {
"ai.streamObject",
];

if (
typeof span.message === "string" &&
AI_SUMMARY_MESSAGES.includes(span.message)
) {
if (typeof span.message === "string" && AI_SUMMARY_MESSAGES.includes(span.message)) {
const aiSummaryData = extractAISummarySpanData(
span.properties as Record<string, unknown>,
span.duration / 1_000_000
Expand Down
25 changes: 12 additions & 13 deletions apps/webapp/app/presenters/v3/TaskListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type TaskTriggerSource,
} from "@trigger.dev/database";
import { $replica } from "~/db.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import {
type AverageDurations,
ClickHouseEnvironmentMetricsRepository,
Expand All @@ -25,10 +25,7 @@ export type TaskListItem = {
export type TaskActivity = DailyTaskActivity[string];

export class TaskListPresenter {
constructor(
private readonly environmentMetricsRepository: EnvironmentMetricsRepository,
private readonly _replica: PrismaClientOrTransaction
) {}
constructor(private readonly _replica: PrismaClientOrTransaction) {}

public async call({
organizationId,
Expand Down Expand Up @@ -76,25 +73,31 @@ export class TaskListPresenter {

const slugs = tasks.map((t) => t.slug);

// Create org-specific environment metrics repository
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({
clickhouse,
});

// IMPORTANT: Don't await these, we want to return the promises
// so we can defer the loading of the data
const activity = this.environmentMetricsRepository.getDailyTaskActivity({
const activity = environmentMetricsRepository.getDailyTaskActivity({
organizationId,
projectId,
environmentId,
days: 6, // This actually means 7 days, because we want to show the current day too
tasks: slugs,
});

const runningStats = this.environmentMetricsRepository.getCurrentRunningStats({
const runningStats = environmentMetricsRepository.getCurrentRunningStats({
organizationId,
projectId,
environmentId,
days: 6,
tasks: slugs,
});

const durations = this.environmentMetricsRepository.getAverageDurations({
const durations = environmentMetricsRepository.getAverageDurations({
organizationId,
projectId,
environmentId,
Expand All @@ -109,9 +112,5 @@ export class TaskListPresenter {
export const taskListPresenter = singleton("taskListPresenter", setupTaskListPresenter);

function setupTaskListPresenter() {
const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({
clickhouse: clickhouseClient,
});

return new TaskListPresenter(environmentMetricsRepository, $replica);
return new TaskListPresenter($replica);
}
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/UsagePresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { getUsage, getUsageSeries } from "~/services/platform.v3.server";
import { createTimeSeriesData } from "~/utils/graphs";
import { BasePresenter } from "./basePresenter.server";
import { DataPoint, linear } from "regression";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";

type Options = {
organizationId: string;
Expand Down Expand Up @@ -124,7 +124,8 @@ async function getTaskUsageByOrganization(
endOfMonth: Date,
replica: PrismaClientOrTransaction
) {
const [queryError, tasks] = await clickhouseClient.taskRuns.getTaskUsageByOrganization({
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
const [queryError, tasks] = await clickhouse.taskRuns.getTaskUsageByOrganization({
startTime: startOfMonth.getTime(),
endTime: endOfMonth.getTime(),
organizationId,
Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/ViewSchedulePresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ScheduleObject } from "@trigger.dev/core/v3";
import { PrismaClient, prisma } from "~/db.server";
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { nextScheduledTimestamps } from "~/v3/utils/calculateNextSchedule.server";
import { NextRunListPresenter } from "./NextRunListPresenter.server";
import { scheduleWhereClause } from "~/models/schedules.server";
Expand Down Expand Up @@ -75,7 +75,8 @@ export class ViewSchedulePresenter {
? nextScheduledTimestamps(schedule.generatorExpression, schedule.timezone, new Date(), 5)
: [];

const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouseClient);
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(schedule.project.organizationId, "standard");
const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouse);
const { runs } = await runPresenter.call(schedule.project.organizationId, environmentId, {
projectId: schedule.project.id,
scheduleId: schedule.id,
Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/presenters/v3/WaitpointPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { isWaitpointOutputTimeout, prettyPrintPacket } from "@trigger.dev/core/v3";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { generateHttpCallbackUrl } from "~/services/httpCallback.server";
import { logger } from "~/services/logger.server";
import { BasePresenter } from "./basePresenter.server";
Expand Down Expand Up @@ -79,7 +79,8 @@ export class WaitpointPresenter extends BasePresenter {
const connectedRuns: NextRunListItem[] = [];

if (connectedRunIds.length > 0) {
const runPresenter = new NextRunListPresenter(this._prisma, clickhouseClient);
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(waitpoint.environment.organizationId, "standard");
const runPresenter = new NextRunListPresenter(this._prisma, clickhouse);
const { runs } = await runPresenter.call(
waitpoint.environment.organizationId,
environmentId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
MetricDashboardPresenter,
} from "~/presenters/v3/MetricDashboardPresenter.server";
import { PromptPresenter } from "~/presenters/v3/PromptPresenter.server";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { requireUser } from "~/services/session.server";
import { cn } from "~/utils/cn";
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
Expand Down Expand Up @@ -74,10 +74,12 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {

const filters = dashboard.filters ?? ["tasks", "queues"];

const clickhouse = await clickhouseFactory.getClickhouseForOrganization(project.organizationId, "standard");

// Load distinct models from ClickHouse if the dashboard has a models filter
let possibleModels: { model: string; system: string }[] = [];
if (filters.includes("models")) {
const queryFn = clickhouseClient.reader.query({
const queryFn = clickhouse.reader.query({
name: "getDistinctModels",
query: `SELECT response_model, any(gen_ai_system) AS gen_ai_system FROM trigger_dev.llm_metrics_v1 WHERE organization_id = {organizationId: String} AND project_id = {projectId: String} AND environment_id = {environmentId: String} AND response_model != '' GROUP BY response_model ORDER BY response_model`,
params: z.object({
Expand All @@ -97,7 +99,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
}
}

const promptPresenter = new PromptPresenter(clickhouseClient);
const promptPresenter = new PromptPresenter(clickhouse);
const [possiblePrompts, possibleOperations, possibleProviders] = await Promise.all([
filters.includes("prompts")
? promptPresenter.getDistinctPromptSlugs(project.organizationId, project.id, environment.id)
Expand Down
Loading
Loading