From 7a178d58c575079dbc51b7989a63bf4e150f3552 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 6 Jun 2026 10:02:52 +0100 Subject: [PATCH 1/4] feat(webapp,clickhouse): export run traces as log, markdown, or jsonl Adds a trace export to the run page: copy a run's trace to the clipboard as Markdown for pasting into an AI assistant, or download it as a flat log, a Markdown table, or JSON Lines. The export streams events straight from the store one at a time and never builds the span tree in memory, so arbitrarily large traces export with bounded memory. --- .server-changes/trace-export-formats.md | 6 + .../route.tsx | 125 ++++++----- .../resources.runs.$runParam.logs.download.ts | 193 ++++++----------- .../clickhouseEventRepository.server.ts | 80 +++++++ .../eventRepository/eventRepository.server.ts | 33 +++ .../eventRepository/eventRepository.types.ts | 30 +++ .../v3/eventRepository/traceExport.server.ts | 205 ++++++++++++++++++ apps/webapp/app/v3/taskEventStore.server.ts | 79 +++++++ apps/webapp/test/traceExport.test.ts | 163 ++++++++++++++ .../clickhouse/src/client/client.ts | 55 +++++ .../clickhouse/src/client/noop.ts | 11 + .../clickhouse/src/client/queryBuilder.ts | 18 ++ .../clickhouse/src/client/types.ts | 26 +++ internal-packages/clickhouse/src/index.ts | 4 + .../clickhouse/src/taskEvents.ts | 50 +++++ 15 files changed, 897 insertions(+), 181 deletions(-) create mode 100644 .server-changes/trace-export-formats.md create mode 100644 apps/webapp/app/v3/eventRepository/traceExport.server.ts create mode 100644 apps/webapp/test/traceExport.test.ts diff --git a/.server-changes/trace-export-formats.md b/.server-changes/trace-export-formats.md new file mode 100644 index 0000000000..ff15483003 --- /dev/null +++ b/.server-changes/trace-export-formats.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Export a run's full trace from the run page as a downloadable Log, Markdown, or JSON Lines file, or copy it to the clipboard for pasting into an AI assistant. The export streams straight from the store, so even very large runs export reliably. diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index 7e825fe303..a3aac32c89 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -1,9 +1,9 @@ import { ArrowPathIcon, - ArrowRightIcon, BookOpenIcon, CheckIcon, ChevronUpIcon, + ClipboardDocumentIcon, ClockIcon, CloudArrowDownIcon, EnvelopeIcon, @@ -42,6 +42,8 @@ import { PopoverMenuItem, PopoverTrigger, } from "~/components/primitives/Popover"; +import { ToastUI } from "~/components/primitives/Toast"; +import { toast } from "sonner"; import * as Property from "~/components/primitives/PropertyTable"; import { Spinner } from "~/components/primitives/Spinner"; import { @@ -79,7 +81,6 @@ import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; import { useSearchParams } from "~/hooks/useSearchParam"; import { useHasAdminAccess } from "~/hooks/useUser"; -import { useCanViewLogsPage } from "~/hooks/useCanViewLogsPage"; import { redirectWithErrorMessage } from "~/models/message.server"; import { type Span, SpanPresenter, type SpanRun } from "~/presenters/v3/SpanPresenter.server"; import { logger } from "~/services/logger.server"; @@ -91,7 +92,6 @@ import { v3BatchPath, v3SessionPath, v3DeploymentVersionPath, - v3LogsPath, v3RunDownloadLogsPath, v3RunIdempotencyKeyResetPath, v3RunPath, @@ -386,7 +386,6 @@ function RunBody({ const { value, replace } = useSearchParams(); const tab = value("tab"); const resetFetcher = useTypedFetcher(); - const canViewLogsPage = useCanViewLogsPage(); return (
@@ -1105,57 +1104,21 @@ function RunBody({
{run.logsDeletedAt === null ? ( - canViewLogsPage ? ( -
- + + - - - - - - -
- ) : ( - - Download logs - - ) + Export trace + + + + + + ) : null}
@@ -1163,6 +1126,62 @@ function RunBody({ ); } +// Trace export menu items: copy the trace as Markdown (for pasting into an AI +// assistant) plus a download per format. The export route streams `?format=` +// server-side. +function TraceExportMenuItems({ runParam }: { runParam: string }) { + const downloadPath = v3RunDownloadLogsPath({ friendlyId: runParam }); + + const copyForAI = async () => { + try { + const response = await fetch(`${downloadPath}?format=markdown`, { credentials: "include" }); + if (!response.ok) { + throw new Error(`Request failed with ${response.status}`); + } + await navigator.clipboard.writeText(await response.text()); + toast.custom((t) => ( + + )); + } catch { + toast.custom((t) => ( + + )); + } + }; + + return ( + <> + + + + + + ); +} + function RunError({ error }: { error: TaskRunError }) { const enhancedError = taskRunErrorEnhancer(error); diff --git a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts index c2a6fa9590..213b8ea131 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts @@ -1,21 +1,30 @@ import { LoaderFunctionArgs } from "@remix-run/server-runtime"; import { prisma } from "~/db.server"; +import { env } from "~/env.server"; import { requireUser } from "~/services/session.server"; -import { v3RunParamsSchema } from "~/utils/pathBuilder"; -import type { RunPreparedEvent } from "~/v3/eventRepository/eventRepository.types"; +import { v3RunParamsSchema, v3RunPath } from "~/utils/pathBuilder"; import { createGzip } from "zlib"; import { Readable } from "stream"; -import { formatDurationMilliseconds } from "@trigger.dev/core/v3/utils/durations"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; -import { TaskEventKind } from "@trigger.dev/database"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; +import { + getTraceExportFormat, + streamTraceExport, + type TraceExportContext, +} from "~/v3/eventRepository/traceExport.server"; import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; -import { deserialiseMollifierSnapshot } from "~/v3/mollifier/mollifierSnapshot.server"; export async function loader({ params, request }: LoaderFunctionArgs) { const user = await requireUser(request); const parsedParams = v3RunParamsSchema.pick({ runParam: true }).parse(params); + const url = new URL(request.url); + // ?format=log|jsonl|markdown (default log). ?showDebug=true includes internal + // engine debug events (off by default, matching the trace view's Debug toggle). + const format = getTraceExportFormat(url.searchParams.get("format")); + const showDebug = url.searchParams.get("showDebug") === "true"; + const filename = `${parsedParams.runParam}.${format.extension}`; + const run = await prisma.taskRun.findFirst({ where: { friendlyId: parsedParams.runParam, @@ -29,19 +38,27 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }, }, }, + select: { + friendlyId: true, + traceId: true, + organizationId: true, + runtimeEnvironmentId: true, + createdAt: true, + completedAt: true, + taskEventStore: true, + taskIdentifier: true, + project: { select: { slug: true, organization: { select: { slug: true } } } }, + runtimeEnvironment: { select: { slug: true } }, + }, }); if (!run || !run.organizationId) { - // Buffered run? It hasn't executed, so there are no events to - // stream — but a 404 is wrong: the run does exist, the customer's - // "Download logs" button on the run-detail page generates this - // exact URL, and a 404 reads as "your run vanished" rather than - // "no logs yet". Verify the entry exists in the buffer (with the - // user as a member of the entry's org), and if so stream a single - // informational line in the same ` - // ` shape `formatRunEvent` uses below — so a downstream - // log viewer / grep over the downloaded file produces a - // meaningful explanation, not a 0-byte mystery. + // Buffered run? It hasn't executed, so there's no trace to stream — but a + // 404 is wrong: the run does exist, the customer's "Download trace" button + // on the run-detail page generates this exact URL, and a 404 reads as "your + // run vanished" rather than "no trace yet". Verify the entry exists in the + // buffer (with the user as a member of the entry's org), and if so stream a + // single informational line instead of a 0-byte mystery. const buffer = getMollifierBuffer(); if (buffer) { const entry = await buffer.getEntry(parsedParams.runParam); @@ -51,45 +68,10 @@ export async function loader({ params, request }: LoaderFunctionArgs) { select: { id: true }, }); if (member) { - let taskIdentifier: string | undefined; - try { - // Use the shared webapp wrapper rather than raw JSON.parse so - // every read-side module shares a single deserialisation path - // (see contract comment in `mollifierSnapshot.server.ts` and - // `syntheticRedirectInfo.server.ts`). Keeps behaviour - // consistent if the snapshot encoding ever changes. - const snapshot = deserialiseMollifierSnapshot(entry.payload) as { - taskIdentifier?: unknown; - }; - if (typeof snapshot.taskIdentifier === "string") { - taskIdentifier = snapshot.taskIdentifier; - } - } catch { - // Fall through — taskIdentifier stays undefined. - } - const placeholderParts = [ - entry.createdAt.toISOString(), - ...(taskIdentifier ? [taskIdentifier] : []), - "INFO", - "Run is queued, has not started executing yet — no logs to download.", - ]; - const placeholder = placeholderParts.join(" ") + "\n"; - const placeholderReadable = new Readable({ - read() { - this.push(placeholder); - this.push(null); - }, - }); - const gzipStream = createGzip(); - const compressed = placeholderReadable.pipe(gzipStream); - return new Response(compressed as any, { - status: 200, - headers: { - "Content-Type": "application/octet-stream", - "Content-Disposition": `attachment; filename="${parsedParams.runParam}.log"`, - "Content-Encoding": "gzip", - }, - }); + return streamGzipText( + `Run ${parsedParams.runParam} is queued and has not started executing yet — no trace to download.\n`, + filename + ); } } } @@ -101,93 +83,48 @@ export async function loader({ params, request }: LoaderFunctionArgs) { run.organizationId ); - const runEvents = await eventRepository.getRunEvents( + // Stream the trace straight from the store to the gzip response, one event at + // a time, never materialising the full set or building a tree. This keeps the + // download bounded in memory and non-blocking regardless of how large the + // trace is. The chosen format renders each event as it streams through. + const events = eventRepository.streamTraceEvents( getTaskEventStoreTableForRun(run), run.runtimeEnvironmentId, run.traceId, - run.friendlyId, run.createdAt, - run.completedAt ?? undefined + run.completedAt ?? undefined, + { includeDebugLogs: showDebug } ); - // Create a Readable stream from the runEvents array - const readable = new Readable({ - read() { - runEvents.forEach((event) => { - try { - if (!user.admin && event.kind === TaskEventKind.LOG) { - // Only return debug logs for admins - return; - } - this.push(formatRunEvent(event) + "\n"); - } catch {} - }); - this.push(null); // End of stream - }, - }); - - // Create a gzip transform stream - const gzip = createGzip(); + const context: TraceExportContext = { + runFriendlyId: run.friendlyId, + traceId: run.traceId, + taskIdentifier: run.taskIdentifier, + runUrl: `${env.APP_ORIGIN}${v3RunPath( + run.project.organization, + run.project, + run.runtimeEnvironment, + { friendlyId: run.friendlyId } + )}`, + }; + + return streamGzipText(streamTraceExport(events, format, context), filename); +} - // Pipe the readable stream into the gzip stream - const compressedStream = readable.pipe(gzip); +function streamGzipText(source: string | AsyncIterable, filename: string): Response { + // `Readable.from` handles both a single string and an async generator. For + // the generator case it pulls lazily under backpressure, so a large trace is + // never fully materialised in memory — gzip drains it as fast as the client + // reads, and the generator pauses in between. + const readable = typeof source === "string" ? Readable.from([source]) : Readable.from(source); + const compressedStream = readable.pipe(createGzip()); - // Return the response with the compressed stream return new Response(compressedStream as any, { status: 200, headers: { "Content-Type": "application/octet-stream", - "Content-Disposition": `attachment; filename="${parsedParams.runParam}.log"`, + "Content-Disposition": `attachment; filename="${filename}"`, "Content-Encoding": "gzip", }, }); } - -function formatRunEvent(event: RunPreparedEvent): string { - const entries = []; - const parts: string[] = []; - - parts.push(getDateFromNanoseconds(event.startTime).toISOString()); - - if (event.taskSlug) { - parts.push(event.taskSlug); - } - - parts.push(event.level); - parts.push(event.message); - - if (event.level === "TRACE") { - parts.push(`(${formatDurationMilliseconds(event.duration / 1_000_000)})`); - } - - entries.push(parts.join(" ")); - - if (event.events) { - for (const subEvent of event.events) { - if (subEvent.name === "exception") { - const subEventParts: string[] = []; - - subEventParts.push(subEvent.time as unknown as string); - - if (event.taskSlug) { - subEventParts.push(event.taskSlug); - } - - subEventParts.push(subEvent.name); - subEventParts.push((subEvent.properties as any).exception.message); - - if ((subEvent.properties as any).exception.stack) { - subEventParts.push((subEvent.properties as any).exception.stack); - } - - entries.push(subEventParts.join(" ")); - } - } - } - - return entries.join("\n"); -} - -function getDateFromNanoseconds(nanoseconds: bigint) { - return new Date(Number(nanoseconds) / 1_000_000); -} diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index 13e3bce053..acbea67e6f 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -62,6 +62,7 @@ import type { SpanOverride, SpanSummary, SpanSummaryCommon, + StreamedTraceEvent, TraceAttributes, TraceDetailedSummary, TraceEventOptions, @@ -1988,6 +1989,75 @@ export class ClickhouseEventRepository implements IEventRepository { }; } + async *streamTraceEvents( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): AsyncIterable { + const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000); + + const queryBuilder = + this._version === "v2" + ? this._clickhouse.taskEventsV2.traceEventsForExportQueryBuilder() + : this._clickhouse.taskEvents.traceEventsForExportQueryBuilder(); + + queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); + queryBuilder.where("trace_id = {traceId: String}", { traceId }); + queryBuilder.where("start_time >= {startCreatedAt: String}", { + startCreatedAt: convertDateToNanoseconds(startCreatedAtWithBuffer).toString(), + }); + + if (endCreatedAt) { + queryBuilder.where("start_time <= {endCreatedAt: String}", { + endCreatedAt: convertDateToNanoseconds(endCreatedAt).toString(), + }); + } + + if (this._version === "v2") { + queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { + insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + }); + } + + // Admin-only debug events stay hidden unless explicitly requested. + if (options?.includeDebugLogs !== true) { + queryBuilder.where("kind != {debugKind: String}", { debugKind: "DEBUG_EVENT" }); + } + + // Each span is written twice: a PARTIAL start-marker (empty attributes) and + // the completed row. Keep only the completed row so the export has one line + // per span (the tree path merges these; streaming can't, so we filter). + queryBuilder.where("status != {partialStatus: String}", { partialStatus: "PARTIAL" }); + + // Internal trigger.dev span events (start timeline) are uninformative noise + // in the export; the tree path filters them too. Real span events such as + // exceptions are kept. + queryBuilder.where( + "(kind != {spanEventKind: String} OR NOT startsWith(message, {internalPrefix: String}))", + { spanEventKind: "SPAN_EVENT", internalPrefix: "trigger.dev/" } + ); + + queryBuilder.orderBy("start_time ASC"); + // Deliberately no LIMIT: streaming never materialises the result set, so the + // detailed-summary memory cap doesn't apply to the export. + + for await (const row of queryBuilder.executeStream()) { + yield { + spanId: row.span_id, + parentSpanId: row.parent_span_id, + startTime: convertClickhouseDateTime64ToJsDate(row.start_time), + durationNs: typeof row.duration === "number" ? row.duration : Number(row.duration), + level: clickhouseKindToLevel(row.kind), + message: row.message, + isError: row.status === "ERROR", + propertiesText: row.attributes_text ?? "", + }; + } + } + #mergeRecordsIntoSpanDetailedSummary( spanId: string, records: TaskEventDetailedSummaryV1Result[], @@ -2268,6 +2338,16 @@ export function convertClickhouseDateTime64ToNanosecondsEpoch(date: string): big * * Optimized with fast path for common format (avoids regex for 99% of cases). */ +// Map a ClickHouse task-event `kind` to a human display level for the streaming +// export (e.g. LOG_INFO -> INFO, SPAN -> TRACE, SPAN_EVENT -> EVENT). +function clickhouseKindToLevel(kind: string): string { + if (kind.startsWith("LOG_")) return kind.slice(4); + if (kind === "SPAN") return "TRACE"; + if (kind === "SPAN_EVENT") return "EVENT"; + if (kind === "DEBUG_EVENT") return "DEBUG"; + return kind; +} + export function convertClickhouseDateTime64ToJsDate(date: string): Date { // Fast path for common format: "2025-09-23 12:32:46.130262875" or "2025-09-23 12:32:46" // This avoids the expensive regex for the common case diff --git a/apps/webapp/app/v3/eventRepository/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository/eventRepository.server.ts index efc879b2f5..268c955fd2 100644 --- a/apps/webapp/app/v3/eventRepository/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/eventRepository.server.ts @@ -60,6 +60,7 @@ import type { SpanDetail, SpanDetailedSummary, SpanSummary, + StreamedTraceEvent, TraceAttributes, TraceDetailedSummary, TraceEventOptions, @@ -683,6 +684,38 @@ export class EventRepository implements IEventRepository { }); } + public async *streamTraceEvents( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): AsyncIterable { + for await (const event of this.taskEventStore.streamDetailedTraceEvents( + storeTable, + traceId, + startCreatedAt, + endCreatedAt, + { includeDebugLogs: options?.includeDebugLogs } + )) { + const properties = event.properties + ? removePrivateProperties(event.properties as Attributes) ?? {} + : {}; + + yield { + spanId: event.spanId, + parentSpanId: event.parentId ?? "", + startTime: getDateFromNanoseconds(event.startTime), + durationNs: Number(event.duration), + level: event.level, + message: event.message, + isError: event.isError, + propertiesText: JSON.stringify(properties), + }; + } + } + public async getRunEvents( storeTable: TaskEventStoreTable, environmentId: string, diff --git a/apps/webapp/app/v3/eventRepository/eventRepository.types.ts b/apps/webapp/app/v3/eventRepository/eventRepository.types.ts index 5c617a2482..591c7927a5 100644 --- a/apps/webapp/app/v3/eventRepository/eventRepository.types.ts +++ b/apps/webapp/app/v3/eventRepository/eventRepository.types.ts @@ -330,6 +330,24 @@ export type SpanDetailedSummary = { children: Array; }; +// A single trace event for the streaming export path (the "Download trace" +// feature). Deliberately flat and self-contained: it carries its own parent ref +// so hierarchy is reconstructable downstream without ever building a tree. Used +// by `streamTraceEvents`, which yields these one at a time so an arbitrarily +// large trace is never fully resident in memory. +export type StreamedTraceEvent = { + spanId: string; + parentSpanId: string; + startTime: Date; + durationNs: number; + level: string; + message: string; + isError: boolean; + // Span attributes/properties as a raw JSON string, emitted verbatim (the + // ClickHouse store already materialises it as text — no per-row parse). + propertiesText: string; +}; + export type TraceDetailedSummary = { traceId: string; rootSpan: SpanDetailedSummary; @@ -407,6 +425,18 @@ export interface IEventRepository { options?: { includeDebugLogs?: boolean } ): Promise; + // Streams a trace's events in start_time order, one at a time, without ever + // materialising the full result set or a tree. Powers the streaming trace + // export so arbitrarily large traces download with bounded memory. + streamTraceEvents( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): AsyncIterable; + getRunEvents( storeTable: TaskEventStoreTable, environmentId: string, diff --git a/apps/webapp/app/v3/eventRepository/traceExport.server.ts b/apps/webapp/app/v3/eventRepository/traceExport.server.ts new file mode 100644 index 0000000000..d1e9f5b6f0 --- /dev/null +++ b/apps/webapp/app/v3/eventRepository/traceExport.server.ts @@ -0,0 +1,205 @@ +import { formatDurationNanoseconds } from "@trigger.dev/core/v3/utils/durations"; +import type { StreamedTraceEvent } from "./eventRepository.types"; + +// Lines are batched into ~64KB chunks before being yielded to the gzip stream: +// per-line chunks make `Readable.from` ~10x slower, while chunks this size keep +// the pipe fed yet stay small enough to release the event loop frequently. +const DEFAULT_FLUSH_BYTES = 64 * 1024; + +export type TraceExportContext = { + runFriendlyId: string; + traceId: string; + taskIdentifier?: string; + /** Absolute dashboard URL for the run (used by formats that link out). */ + runUrl?: string; +}; + +/** + * A trace export format. `formatEvent` renders one {@link StreamedTraceEvent} to + * a string; `header`/`footer` bookend the stream. Formats are intentionally + * stateless across events so the export stays O(1) memory — see + * {@link streamTraceExport}. + */ +export type TraceExportFormat = { + name: TraceExportFormatName; + extension: string; + header?: (ctx: TraceExportContext) => string; + formatEvent: (event: StreamedTraceEvent, ctx: TraceExportContext) => string; + footer?: (ctx: TraceExportContext) => string; +}; + +export type TraceExportFormatName = "log" | "jsonl" | "markdown"; + +/** + * Streams a trace export by piping events through a {@link TraceExportFormat}. + * Batches output into ~`flushBytes` chunks and releases the event loop between + * flushes; holds nothing across events but the buffer, so an arbitrarily large + * trace exports in bounded memory regardless of format. + */ +export async function* streamTraceExport( + events: AsyncIterable, + format: TraceExportFormat, + ctx: TraceExportContext, + options: { flushBytes?: number } = {} +): AsyncGenerator { + const flushBytes = options.flushBytes ?? DEFAULT_FLUSH_BYTES; + + let buffer = format.header ? format.header(ctx) : ""; + + for await (const event of events) { + buffer += format.formatEvent(event, ctx); + if (buffer.length >= flushBytes) { + yield buffer; + buffer = ""; + await new Promise((resolve) => setImmediate(resolve)); + } + } + + if (format.footer) { + buffer += format.footer(ctx); + } + + if (buffer.length > 0) { + yield buffer; + } +} + +function hasProperties(propertiesText: string): boolean { + const trimmed = propertiesText.trim(); + return trimmed.length > 0 && trimmed !== "{}"; +} + +// For error events, pull the error message out of the properties so formats can +// surface it inline instead of burying it in the JSON blob. Only parses when the +// event is actually an error (rare), so the common path stays parse-free. Handles +// both trigger.dev's `error.*` shape and OTel's `exception.*` shape; the full +// object (incl. stacktrace) still rides along in the properties. +function errorMessage(event: StreamedTraceEvent): string | undefined { + if (!event.isError || !hasProperties(event.propertiesText)) return undefined; + try { + const props = JSON.parse(event.propertiesText) as { + error?: { message?: unknown }; + exception?: { message?: unknown }; + }; + const message = props.error?.message ?? props.exception?.message; + return typeof message === "string" && message.length > 0 + ? message.replace(/\s+/g, " ").trim() + : undefined; + } catch { + return undefined; + } +} + +function lineage(event: StreamedTraceEvent): string { + return event.parentSpanId ? `${event.spanId} ← ${event.parentSpanId}` : event.spanId; +} + +// --------------------------------------------------------------------------- +// log — flat, chronological, grep-friendly. One line per event (+ a props line). +// --------------------------------------------------------------------------- +const logFormat: TraceExportFormat = { + name: "log", + extension: "txt", + formatEvent(event) { + const time = event.startTime.toISOString(); + const level = event.level.padEnd(5); + const errMsg = errorMessage(event); + const status = event.isError ? (errMsg ? ` [ERROR: ${errMsg}]` : " [ERROR]") : ""; + const duration = event.durationNs > 0 ? ` (${formatDurationNanoseconds(event.durationNs)})` : ""; + + let out = `${time} ${level} [${lineage(event)}] ${event.message}${status}${duration}\n`; + if (hasProperties(event.propertiesText)) { + out += ` props: ${event.propertiesText.trim()}\n`; + } + return out; + }, +}; + +// --------------------------------------------------------------------------- +// jsonl — one JSON object per line, properties inlined as a nested object. +// --------------------------------------------------------------------------- +const jsonlFormat: TraceExportFormat = { + name: "jsonl", + extension: "jsonl", + formatEvent(event) { + let properties: unknown = undefined; + if (hasProperties(event.propertiesText)) { + try { + properties = JSON.parse(event.propertiesText); + } catch { + properties = event.propertiesText; + } + } + + return ( + JSON.stringify({ + time: event.startTime.toISOString(), + level: event.level, + spanId: event.spanId, + parentSpanId: event.parentSpanId || undefined, + message: event.message, + durationNs: event.durationNs, + isError: event.isError || undefined, + errorMessage: errorMessage(event), + properties, + }) + "\n" + ); + }, +}; + +// --------------------------------------------------------------------------- +// markdown — AI-friendly: YAML frontmatter (ids, task, dashboard URL) + a +// scannable table, one row per event. Properties stay (inline code) so the +// export isn't lossy; a column-friendly cell escaper keeps the table intact. +// --------------------------------------------------------------------------- +function mdCell(value: string): string { + // Pipes and newlines would break the table row; escape/flatten them. (GFM + // treats `\|` inside a table cell — including code spans — as a literal pipe.) + return value.replace(/\\/g, "\\\\").replace(/\|/g, "\\|").replace(/\r?\n/g, " "); +} + +const markdownFormat: TraceExportFormat = { + name: "markdown", + extension: "md", + header(ctx) { + const lines = ["---", `run: ${ctx.runFriendlyId}`, `trace: ${ctx.traceId}`]; + if (ctx.taskIdentifier) lines.push(`task: ${ctx.taskIdentifier}`); + if (ctx.runUrl) lines.push(`url: ${ctx.runUrl}`); + lines.push("---", "", `# Trace for ${ctx.runFriendlyId}`, ""); + if (ctx.runUrl) { + lines.push(`[View in dashboard](${ctx.runUrl})`, ""); + } + lines.push( + "| time | level | event | duration | span ← parent | properties |", + "| --- | --- | --- | --- | --- | --- |" + ); + return lines.join("\n") + "\n"; + }, + formatEvent(event) { + const time = event.startTime.toISOString(); + const level = event.isError ? `${event.level} ❌` : event.level; + const duration = event.durationNs > 0 ? formatDurationNanoseconds(event.durationNs) : "—"; + const lineage = event.parentSpanId ? `${event.spanId} ← ${event.parentSpanId}` : event.spanId; + const errMsg = errorMessage(event); + const eventCell = errMsg ? `${event.message} — ${errMsg}` : event.message; + const properties = hasProperties(event.propertiesText) + ? "`" + mdCell(event.propertiesText.trim()) + "`" + : "—"; + + return `| ${time} | ${level} | ${mdCell(eventCell)} | ${duration} | \`${lineage}\` | ${properties} |\n`; + }, +}; + +const FORMATS: Record = { + log: logFormat, + jsonl: jsonlFormat, + markdown: markdownFormat, +}; + +/** Resolve a `?format=` value to a format, defaulting to `log`. */ +export function getTraceExportFormat(name: string | null | undefined): TraceExportFormat { + if (name && Object.prototype.hasOwnProperty.call(FORMATS, name)) { + return FORMATS[name as TraceExportFormatName]; + } + return logFormat; +} diff --git a/apps/webapp/app/v3/taskEventStore.server.ts b/apps/webapp/app/v3/taskEventStore.server.ts index 6a80aa9926..ff2fa741c6 100644 --- a/apps/webapp/app/v3/taskEventStore.server.ts +++ b/apps/webapp/app/v3/taskEventStore.server.ts @@ -303,4 +303,83 @@ export class TaskEventStore { `; } } + + // Streams a trace's detailed events in (startTime, spanId) order via keyset + // pagination. Holds at most one page at a time — no overall cap, no full + // materialisation — so an arbitrarily large trace can be exported with bounded + // memory. Powers the streaming "Download trace" export. + async *streamDetailedTraceEvents( + table: TaskEventStoreTable, + traceId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean; pageSize?: number } + ): AsyncGenerator { + const filterDebug = + options?.includeDebugLogs === false || options?.includeDebugLogs === undefined; + const pageSize = options?.pageSize ?? 5_000; + const debugFilter = filterDebug + ? Prisma.sql`AND \"kind\" <> CAST('LOG'::text AS "public"."TaskEventKind")` + : Prisma.empty; + // Spans are written as a partial start-marker plus a completed row; keep + // only the completed row so the export has one line per span (mirrors the + // tree path's merge, but without holding state). + const partialFilter = Prisma.sql`AND "isPartial" = false`; + + const createdAtBufferInMillis = env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000; + const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - createdAtBufferInMillis); + const $endCreatedAt = endCreatedAt ?? new Date(); + const endCreatedAtWithBuffer = new Date($endCreatedAt.getTime() + createdAtBufferInMillis); + + let afterStartTime: bigint | null = null; + let afterSpanId: string | null = null; + + while (true) { + const keyset: Prisma.Sql = + afterStartTime === null + ? Prisma.empty + : Prisma.sql`AND ("startTime" > ${afterStartTime} OR ("startTime" = ${afterStartTime} AND "spanId" > ${afterSpanId}))`; + + const rows: DetailedTraceEvent[] = + table === "taskEventPartitioned" + ? await this.readReplica.$queryRaw` + SELECT "spanId","parentId","runId",message,style,"startTime",duration,"isError","isPartial","isCancelled",level,events,"kind","taskSlug",properties,"attemptNumber" + FROM "TaskEventPartitioned" + WHERE "traceId" = ${traceId} + AND "createdAt" >= ${startCreatedAtWithBuffer.toISOString()}::timestamp + AND "createdAt" < ${endCreatedAtWithBuffer.toISOString()}::timestamp + ${debugFilter} + ${partialFilter} + ${keyset} + ORDER BY "startTime" ASC, "spanId" ASC + LIMIT ${pageSize} + ` + : await this.readReplica.$queryRaw` + SELECT "spanId","parentId","runId",message,style,"startTime",duration,"isError","isPartial","isCancelled",level,events,"kind","taskSlug",properties,"attemptNumber" + FROM "TaskEvent" + WHERE "traceId" = ${traceId} + ${debugFilter} + ${partialFilter} + ${keyset} + ORDER BY "startTime" ASC, "spanId" ASC + LIMIT ${pageSize} + `; + + if (rows.length === 0) { + break; + } + + for (const row of rows) { + yield row; + } + + if (rows.length < pageSize) { + break; + } + + const last: DetailedTraceEvent = rows[rows.length - 1]; + afterStartTime = typeof last.startTime === "bigint" ? last.startTime : BigInt(last.startTime); + afterSpanId = last.spanId; + } + } } diff --git a/apps/webapp/test/traceExport.test.ts b/apps/webapp/test/traceExport.test.ts new file mode 100644 index 0000000000..a05187795d --- /dev/null +++ b/apps/webapp/test/traceExport.test.ts @@ -0,0 +1,163 @@ +import { describe, expect, it } from "vitest"; +import { + getTraceExportFormat, + streamTraceExport, + type TraceExportContext, +} from "~/v3/eventRepository/traceExport.server"; +import type { StreamedTraceEvent } from "~/v3/eventRepository/eventRepository.types"; + +const T = new Date("2026-06-06T12:00:00.000Z"); + +const CTX: TraceExportContext = { + runFriendlyId: "run_x", + traceId: "trace_x", + taskIdentifier: "agent-workflow", + runUrl: "https://app.example.com/orgs/o/projects/p/env/dev/runs/run_x", +}; + +function sampleEvents(): StreamedTraceEvent[] { + return [ + { + spanId: "root1", + parentSpanId: "", + startTime: T, + durationNs: 5_500_000_000, // 5.5s + level: "TRACE", + message: "agent.run", + isError: false, + propertiesText: '{"agent":{"name":"researcher-v2"}}', + }, + { + spanId: "log1", + parentSpanId: "root1", + startTime: T, + durationNs: 0, + level: "INFO", + message: "processing item", + isError: false, + propertiesText: '{"itemId":7}', + }, + { + spanId: "err1", + parentSpanId: "root1", + startTime: T, + durationNs: 3_000_000, // 3ms + level: "ERROR", + message: "task failed", + isError: true, + propertiesText: '{"error":{"message":"boom: it failed","name":"Error","stackTrace":"Error: boom\\n at fn"}}', + }, + { + spanId: "quiet1", + parentSpanId: "root1", + startTime: T, + durationNs: 0, + level: "DEBUG", + message: "no props here", + isError: false, + propertiesText: "{}", + }, + ]; +} + +async function* toAsyncIterable(items: StreamedTraceEvent[]): AsyncIterable { + for (const item of items) { + yield item; + } +} + +async function drain(gen: AsyncIterable): Promise { + let text = ""; + for await (const chunk of gen) { + text += chunk; + } + return text; +} + +function render(formatName: string, items = sampleEvents(), opts = {}): Promise { + return drain(streamTraceExport(toAsyncIterable(items), getTraceExportFormat(formatName), CTX, opts)); +} + +describe("getTraceExportFormat", () => { + it("resolves known formats and defaults unknown ones to log", () => { + expect(getTraceExportFormat("log").extension).toBe("txt"); + expect(getTraceExportFormat("jsonl").extension).toBe("jsonl"); + expect(getTraceExportFormat("markdown").extension).toBe("md"); + expect(getTraceExportFormat(null).name).toBe("log"); + expect(getTraceExportFormat("bogus").name).toBe("log"); + }); +}); + +describe("log format", () => { + it("is flat events with parent refs, no header, ns durations, inline error message", async () => { + const text = await render("log"); + expect(text).not.toContain("Run:"); + expect(text).not.toContain("Trace ID:"); + expect(text.startsWith("2026-")).toBe(true); + expect(text).toContain("[root1] agent.run"); + expect(text).toContain("(5.5 seconds)"); + expect(text).not.toMatch(/day/); + expect(text).toContain("[log1 ← root1] processing item"); + expect(text).toContain('props: {"itemId":7}'); + // Error message surfaced inline (not just an [ERROR] flag). + expect(text).toContain("[err1 ← root1] task failed [ERROR: boom: it failed]"); + // Empty properties are omitted. + expect(text).not.toContain("props: {}"); + }); +}); + +describe("jsonl format", () => { + it("emits one valid JSON object per line with inlined properties + errorMessage", async () => { + const text = await render("jsonl"); + const lines = text.trim().split("\n"); + expect(lines).toHaveLength(4); + + const first = JSON.parse(lines[0]); + expect(first).toMatchObject({ + spanId: "root1", + message: "agent.run", + level: "TRACE", + durationNs: 5_500_000_000, + }); + expect(first.parentSpanId).toBeUndefined(); + expect(first.properties).toEqual({ agent: { name: "researcher-v2" } }); + + const err = JSON.parse(lines[2]); + expect(err.isError).toBe(true); + expect(err.errorMessage).toBe("boom: it failed"); + expect(err.properties.error.stackTrace).toContain("at fn"); + + const quiet = JSON.parse(lines[3]); + expect(quiet.properties).toBeUndefined(); // "{}" → omitted + expect(quiet.errorMessage).toBeUndefined(); + }); +}); + +describe("markdown format", () => { + it("emits YAML frontmatter with ids/task/url and a table (no fenced blocks)", async () => { + const text = await render("markdown"); + expect(text.startsWith("---\n")).toBe(true); + expect(text).toContain("run: run_x"); + expect(text).toContain("trace: trace_x"); + expect(text).toContain("task: agent-workflow"); + expect(text).toContain("url: https://app.example.com/orgs/o/projects/p/env/dev/runs/run_x"); + expect(text).toContain("# Trace for run_x"); + expect(text).toContain("[View in dashboard](https://app.example.com/orgs/o/projects/p/env/dev/runs/run_x)"); + expect(text).toContain("| time | level | event | duration | span ← parent | properties |"); + expect(text).not.toContain("```json"); + expect(text).toContain("`log1 ← root1`"); + expect(text).toContain("ERROR ❌"); + // Error message surfaced in the event cell. + expect(text).toContain("| task failed — boom: it failed |"); + }); +}); + +describe("all formats", () => { + it("produce identical output regardless of flush size (no cross-event state)", async () => { + for (const name of ["log", "jsonl", "markdown"]) { + const full = await render(name); + const tiny = await render(name, sampleEvents(), { flushBytes: 8 }); + expect(tiny, `format ${name}`).toEqual(full); + } + }); +}); diff --git a/internal-packages/clickhouse/src/client/client.ts b/internal-packages/clickhouse/src/client/client.ts index 5a4118934e..6eb18c4488 100644 --- a/internal-packages/clickhouse/src/client/client.ts +++ b/internal-packages/clickhouse/src/client/client.ts @@ -18,6 +18,7 @@ import type { ClickhouseQueryBuilderFastFunction, ClickhouseQueryBuilderFunction, ClickhouseQueryFunction, + ClickhouseQueryStreamFunction, ClickhouseQueryWithStatsFunction, ClickhouseReader, ClickhouseWriter, @@ -517,6 +518,60 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { }; } + public queryFastStream, TParams extends Record>(req: { + name: string; + query: string; + columns: Array; + settings?: ClickHouseSettings; + }): ClickhouseQueryStreamFunction { + const self = this; + + return async function* (params, options) { + const queryId = randomUUID(); + + self.logger.debug("Streaming clickhouse fast", { + name: req.name, + query: req.query.replace(/\s+/g, " "), + params, + settings: req.settings, + queryId, + }); + + const resultSet = await self.client.query({ + query: req.query, + query_params: params, + format: "JSONCompactEachRow", + query_id: queryId, + ...options?.params, + clickhouse_settings: { + ...req.settings, + ...options?.params?.clickhouse_settings, + }, + }); + + // Stream rows off the socket and hydrate each one on the fly. The full + // result set is never materialised into an array — bounded memory for + // arbitrarily large queries. + for await (const rows of resultSet.stream()) { + for (const row of rows) { + const rowData = row.json() as any[]; + + const hydratedRow: Record = {}; + for (let i = 0; i < req.columns.length; i++) { + const column = req.columns[i]; + if (typeof column === "string") { + hydratedRow[column] = rowData[i]; + } else { + hydratedRow[column.name] = rowData[i]; + } + } + + yield hydratedRow as TOut; + } + } + }; + } + public queryBuilder>(req: { name: string; baseQuery: string; diff --git a/internal-packages/clickhouse/src/client/noop.ts b/internal-packages/clickhouse/src/client/noop.ts index e0872cada6..9f212c3d16 100644 --- a/internal-packages/clickhouse/src/client/noop.ts +++ b/internal-packages/clickhouse/src/client/noop.ts @@ -95,6 +95,17 @@ export class NoopClient implements ClickhouseReader, ClickhouseWriter { }; } + public queryFastStream, TParams extends Record>(req: { + name: string; + query: string; + columns: string[]; + settings?: ClickHouseSettings; + }): (params: TParams) => AsyncIterable { + return async function* () { + // Noop: empty stream. + }; + } + public insert>(req: { name: string; table: string; diff --git a/internal-packages/clickhouse/src/client/queryBuilder.ts b/internal-packages/clickhouse/src/client/queryBuilder.ts index fb0430fd0d..ec7f73415b 100644 --- a/internal-packages/clickhouse/src/client/queryBuilder.ts +++ b/internal-packages/clickhouse/src/client/queryBuilder.ts @@ -255,6 +255,24 @@ export class ClickhouseQueryFastBuilder> { return queryFunction(params); } + /** + * Like {@link execute} but streams rows instead of buffering them into an + * array. Returns an async iterable so the whole result set is never resident + * in memory at once. + */ + executeStream(): AsyncIterable { + const { query, params } = this.build(); + + const streamFunction = this.reader.queryFastStream>({ + name: this.name, + query, + columns: this.columns, + settings: this.settings, + }); + + return streamFunction(params); + } + build(): { query: string; params: QueryParams } { let query = `SELECT ${this.buildColumns().join(", ")} FROM ${this.table}`; if (this.prewhereClauses.length > 0) { diff --git a/internal-packages/clickhouse/src/client/types.ts b/internal-packages/clickhouse/src/client/types.ts index 7120422508..dde2bfce53 100644 --- a/internal-packages/clickhouse/src/client/types.ts +++ b/internal-packages/clickhouse/src/client/types.ts @@ -13,6 +13,20 @@ export type ClickhouseQueryFunction = ( } ) => Promise>; +/** + * Like {@link ClickhouseQueryFunction} but yields rows as they stream off the + * socket instead of buffering them into an array. The whole result set is never + * held in memory at once. Errors are thrown (the async iterable rejects) rather + * than returned as a Result tuple. + */ +export type ClickhouseQueryStreamFunction = ( + params: TInput, + options?: { + attributes?: Record; + params?: BaseQueryParams; + } +) => AsyncIterable; + /** * Query statistics returned by ClickHouse */ @@ -146,6 +160,18 @@ export interface ClickhouseReader { settings?: ClickHouseSettings; }): ClickhouseQueryFunction; + /** + * Like {@link queryFast} but streams rows instead of buffering them. Returns an + * async iterable so the caller can process arbitrarily large result sets with + * bounded memory. + */ + queryFastStream, TParams extends Record>(req: { + name: string; + query: string; + columns: Array; + settings?: ClickHouseSettings; + }): ClickhouseQueryStreamFunction; + queryBuilder>(req: { /** * The name of the operation. diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index d8a97296d1..9ee659958a 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -21,6 +21,8 @@ import { getSpanDetailsQueryBuilderV2, getTraceDetailedSummaryQueryBuilder, getTraceDetailedSummaryQueryBuilderV2, + getTraceEventsForExportQueryBuilder, + getTraceEventsForExportQueryBuilderV2, getTraceSummaryQueryBuilder, getTraceSummaryQueryBuilderV2, insertTaskEvents, @@ -240,6 +242,7 @@ export class ClickHouse { insert: insertTaskEvents(this.writer), traceSummaryQueryBuilder: getTraceSummaryQueryBuilder(this.reader), traceDetailedSummaryQueryBuilder: getTraceDetailedSummaryQueryBuilder(this.reader), + traceEventsForExportQueryBuilder: getTraceEventsForExportQueryBuilder(this.reader), spanDetailsQueryBuilder: getSpanDetailsQueryBuilder(this.reader), }; } @@ -278,6 +281,7 @@ export class ClickHouse { insert: insertTaskEventsV2(this.writer), traceSummaryQueryBuilder: getTraceSummaryQueryBuilderV2(this.reader), traceDetailedSummaryQueryBuilder: getTraceDetailedSummaryQueryBuilderV2(this.reader), + traceEventsForExportQueryBuilder: getTraceEventsForExportQueryBuilderV2(this.reader), spanDetailsQueryBuilder: getSpanDetailsQueryBuilderV2(this.reader), logDetailQueryBuilder: getLogDetailQueryBuilderV2(this.reader), }; diff --git a/internal-packages/clickhouse/src/taskEvents.ts b/internal-packages/clickhouse/src/taskEvents.ts index cfebd0a248..e8d4c2bd95 100644 --- a/internal-packages/clickhouse/src/taskEvents.ts +++ b/internal-packages/clickhouse/src/taskEvents.ts @@ -109,6 +109,44 @@ export function getTraceDetailedSummaryQueryBuilder( }); } +// Row shape for streaming a whole trace out for export (the "Download trace" +// feature). Unlike the detailed-summary builders this keeps the FULL message +// (not LEFT(message, 256)) since the export is the source of truth, and it's +// consumed via executeStream() so the trace is never fully materialised. +export type TaskEventExportRow = { + span_id: string; + parent_span_id: string; + start_time: string; + duration: number | string; + status: string; + kind: string; + message: string; + attributes_text: string; +}; + +const TASK_EVENT_EXPORT_COLUMNS = [ + "span_id", + "parent_span_id", + "start_time", + "duration", + "status", + "kind", + "message", + "attributes_text", +] as const; + +export function getTraceEventsForExportQueryBuilder( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilderFast({ + name: "getTraceEventsForExport", + table: "trigger_dev.task_events_v1", + columns: [...TASK_EVENT_EXPORT_COLUMNS], + settings, + }); +} + export const TaskEventDetailsV1Result = z.object({ span_id: z.string(), parent_span_id: z.string(), @@ -233,6 +271,18 @@ export function getSpanDetailsQueryBuilderV2( }); } +export function getTraceEventsForExportQueryBuilderV2( + ch: ClickhouseReader, + settings?: ClickHouseSettings +) { + return ch.queryBuilderFast({ + name: "getTraceEventsForExportV2", + table: "trigger_dev.task_events_v2", + columns: [...TASK_EVENT_EXPORT_COLUMNS], + settings, + }); +} + // ============================================================================ // Search Table Query Builders (for logs page, using task_events_search_v1) From 56841cf161e22f4b9553b5d5c90c0fdae9b7a2ee Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 6 Jun 2026 10:23:10 +0100 Subject: [PATCH 2/4] fix(webapp): exclude ANCESTOR_OVERRIDE rows from trace export --- .../v3/eventRepository/clickhouseEventRepository.server.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index acbea67e6f..1e091944e6 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -2040,6 +2040,11 @@ export class ClickhouseEventRepository implements IEventRepository { { spanEventKind: "SPAN_EVENT", internalPrefix: "trigger.dev/" } ); + // ANCESTOR_OVERRIDE rows duplicate a descendant's error onto an ancestor span + // to colour the tree; they carry no event of their own. The tree path drops + // them, so the export does too (otherwise the same error shows up twice). + queryBuilder.where("kind != {ancestorKind: String}", { ancestorKind: "ANCESTOR_OVERRIDE" }); + queryBuilder.orderBy("start_time ASC"); // Deliberately no LIMIT: streaming never materialises the result set, so the // detailed-summary memory cap doesn't apply to the export. From 61c070a70c9af38aeb6d78c7d88262fd803ac422 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 6 Jun 2026 11:59:51 +0100 Subject: [PATCH 3/4] chore(clickhouse): add tracing span and error recording to queryFastStream --- .../clickhouse/src/client/client.ts | 95 ++++++++++++++----- 1 file changed, 69 insertions(+), 26 deletions(-) diff --git a/internal-packages/clickhouse/src/client/client.ts b/internal-packages/clickhouse/src/client/client.ts index 6eb18c4488..d281b60482 100644 --- a/internal-packages/clickhouse/src/client/client.ts +++ b/internal-packages/clickhouse/src/client/client.ts @@ -529,6 +529,22 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { return async function* (params, options) { const queryId = randomUUID(); + // A generator yields across the await boundary, so we can't use the + // callback-style `startSpan` helper here. We start the span manually and + // end it in `finally` so the span covers the whole stream lifetime and is + // closed even if the consumer abandons the generator early. Errors are + // re-thrown (no Result tuple) since they can surface mid-stream after the + // response headers have already been sent, but they're still recorded on + // the span and logged for parity with `queryFast`. + const span = self.tracer.startSpan("queryFastStream"); + span.setAttributes({ + "clickhouse.clientName": self.name, + "clickhouse.operationName": req.name, + "clickhouse.queryId": queryId, + ...flattenAttributes(req.settings, "clickhouse.settings"), + ...flattenAttributes(options?.attributes), + }); + self.logger.debug("Streaming clickhouse fast", { name: req.name, query: req.query.replace(/\s+/g, " "), @@ -537,37 +553,64 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { queryId, }); - const resultSet = await self.client.query({ - query: req.query, - query_params: params, - format: "JSONCompactEachRow", - query_id: queryId, - ...options?.params, - clickhouse_settings: { - ...req.settings, - ...options?.params?.clickhouse_settings, - }, - }); + try { + const resultSet = await self.client.query({ + query: req.query, + query_params: params, + format: "JSONCompactEachRow", + query_id: queryId, + ...options?.params, + clickhouse_settings: { + ...req.settings, + ...options?.params?.clickhouse_settings, + }, + }); + + span.setAttributes({ + "clickhouse.query_id": resultSet.query_id, + ...flattenAttributes(resultSet.response_headers, "clickhouse.response_headers"), + }); - // Stream rows off the socket and hydrate each one on the fly. The full - // result set is never materialised into an array — bounded memory for - // arbitrarily large queries. - for await (const rows of resultSet.stream()) { - for (const row of rows) { - const rowData = row.json() as any[]; - - const hydratedRow: Record = {}; - for (let i = 0; i < req.columns.length; i++) { - const column = req.columns[i]; - if (typeof column === "string") { - hydratedRow[column] = rowData[i]; - } else { - hydratedRow[column.name] = rowData[i]; + // Stream rows off the socket and hydrate each one on the fly. The full + // result set is never materialised into an array — bounded memory for + // arbitrarily large queries. + let rowCount = 0; + for await (const rows of resultSet.stream()) { + for (const row of rows) { + const rowData = row.json() as any[]; + + const hydratedRow: Record = {}; + for (let i = 0; i < req.columns.length; i++) { + const column = req.columns[i]; + if (typeof column === "string") { + hydratedRow[column] = rowData[i]; + } else { + hydratedRow[column.name] = rowData[i]; + } } + + rowCount++; + yield hydratedRow as TOut; } + } - yield hydratedRow as TOut; + span.setAttributes({ "clickhouse.rows": rowCount }); + } catch (error) { + self.logger.error("Error streaming clickhouse", { + name: req.name, + error, + query: req.query, + params, + queryId, + }); + + if (error instanceof Error) { + recordClickhouseError(span, error); } + + throw error; + } finally { + span.end(); } }; } From 587abd3c7ba19da19e8cc44c13ede1c2ff90ee4c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sat, 6 Jun 2026 18:10:52 +0100 Subject: [PATCH 4/4] fix(webapp): admin-gate trace-export debug events, harden clipboard copy --- .../route.tsx | 20 ++++++++++++++----- .../resources.runs.$runParam.logs.download.ts | 5 +++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index a3aac32c89..6602d91ea2 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -1134,11 +1134,21 @@ function TraceExportMenuItems({ runParam }: { runParam: string }) { const copyForAI = async () => { try { - const response = await fetch(`${downloadPath}?format=markdown`, { credentials: "include" }); - if (!response.ok) { - throw new Error(`Request failed with ${response.status}`); - } - await navigator.clipboard.writeText(await response.text()); + // Hand the clipboard a ClipboardItem backed by a promise so access is + // reserved synchronously during the click. The fetch can then take as long + // as a large trace needs without the browser revoking the transient user + // activation, which a fetch-then-writeText sequence trips (notably Safari + // and Firefox). + const text = fetch(`${downloadPath}?format=markdown`, { credentials: "include" }).then( + async (response) => { + if (!response.ok) { + throw new Error(`Request failed with ${response.status}`); + } + return new Blob([await response.text()], { type: "text/plain" }); + } + ); + + await navigator.clipboard.write([new ClipboardItem({ "text/plain": text })]); toast.custom((t) => ( )); diff --git a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts index 213b8ea131..7cda5ac782 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts @@ -20,9 +20,10 @@ export async function loader({ params, request }: LoaderFunctionArgs) { const url = new URL(request.url); // ?format=log|jsonl|markdown (default log). ?showDebug=true includes internal - // engine debug events (off by default, matching the trace view's Debug toggle). + // engine debug events; these stay admin-only (matching the admin-gated Debug + // toggle in the trace view) and are off by default. const format = getTraceExportFormat(url.searchParams.get("format")); - const showDebug = url.searchParams.get("showDebug") === "true"; + const showDebug = url.searchParams.get("showDebug") === "true" && user.admin; const filename = `${parsedParams.runParam}.${format.extension}`; const run = await prisma.taskRun.findFirst({