Skip to content

Commit 7bd7398

Browse files
committed
feat(webapp): tag Prisma spans with db.datasource attribute
Wrap the writer and replica Prisma clients with a $extends middleware that sets an OTel context key around each operation, and a span processor that reads the key and stamps 'db.datasource' = 'writer' | 'replica' on every span created in that scope. Also directly tags the prisma:client:operation span via trace.getActiveSpan() since that outer span is created before the extension middleware runs and would otherwise miss the context. Motivation: writer and replica emit identical span names through the same global instrumentation, so pool-saturation monitors on prisma:engine:connection could not distinguish the two pools. With this change, monitors can filter by the new attribute. Context propagation note: PrismaPromise is lazy, so wrapping query(args) directly with context.with leaves the thenable unstarted and Prisma's .then() fires outside the scope. The inner 'async () => await query(args)' forces the .then() inside the context.with callback so engine spans see the correct active context. Not tagged: prisma:client:load_engine (one-time startup, irrelevant).
1 parent 03e4d5f commit 7bd7398

File tree

4 files changed

+56
-9
lines changed

4 files changed

+56
-9
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Tag Prisma spans with `db.datasource: "writer" | "replica"` so monitors and trace queries can distinguish the writer pool from the replica pool. Applies to all `prisma:engine:*` spans (including `prisma:engine:connection` used by the connection-pool monitors) and the outer `prisma:client:operation` span.

apps/webapp/app/db.server.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import { env } from "./env.server";
1313
import { logger } from "./services/logger.server";
1414
import { isValidDatabaseUrl } from "./utils/db";
1515
import { singleton } from "./utils/singleton";
16-
import { startActiveSpan } from "./v3/tracer.server";
17-
import { Span } from "@opentelemetry/api";
16+
import { DATASOURCE_CONTEXT_KEY, startActiveSpan } from "./v3/tracer.server";
17+
import { context, Span, trace } from "@opentelemetry/api";
1818
import { queryPerformanceMonitor } from "./utils/queryPerformanceMonitor.server";
1919

2020
export type {
@@ -98,12 +98,30 @@ export async function $transaction<R>(
9898

9999
export { Prisma };
100100

101-
export const prisma = singleton("prisma", getClient);
101+
function tagDatasource<T extends PrismaClient>(
102+
datasource: "writer" | "replica",
103+
client: T
104+
): T {
105+
return client.$extends({
106+
name: "datasource-tagger",
107+
query: {
108+
$allOperations: ({ query, args }) => {
109+
trace.getActiveSpan()?.setAttribute("db.datasource", datasource);
110+
return context.with(
111+
context.active().setValue(DATASOURCE_CONTEXT_KEY, datasource),
112+
async () => await query(args)
113+
);
114+
},
115+
},
116+
}) as unknown as T;
117+
}
102118

103-
export const $replica: PrismaReplicaClient = singleton(
104-
"replica",
105-
() => getReplicaClient() ?? prisma
106-
);
119+
export const prisma = singleton("prisma", () => tagDatasource("writer", getClient()));
120+
121+
export const $replica: PrismaReplicaClient = singleton("replica", () => {
122+
const replica = getReplicaClient();
123+
return replica ? tagDatasource("replica", replica) : prisma;
124+
});
107125

108126
function getClient() {
109127
const { DATABASE_URL } = process.env;

apps/webapp/app/v3/tracer.server.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
type Attributes,
33
type Context,
4+
createContextKey,
45
DiagConsoleLogger,
56
DiagLogLevel,
67
type Link,
@@ -61,6 +62,24 @@ import { performance } from "node:perf_hooks";
6162

6263
export const SEMINTATTRS_FORCE_RECORDING = "forceRecording";
6364

65+
export const DATASOURCE_CONTEXT_KEY = createContextKey("trigger.db.datasource");
66+
67+
class DatasourceAttributeSpanProcessor implements SpanProcessor {
68+
onStart(span: Span, parentContext: Context): void {
69+
const ds = parentContext.getValue(DATASOURCE_CONTEXT_KEY);
70+
if (typeof ds === "string") {
71+
span.setAttribute("db.datasource", ds);
72+
}
73+
}
74+
onEnd(): void {}
75+
shutdown(): Promise<void> {
76+
return Promise.resolve();
77+
}
78+
forceFlush(): Promise<void> {
79+
return Promise.resolve();
80+
}
81+
}
82+
6483
class CustomWebappSampler implements Sampler {
6584
constructor(private readonly _baseSampler: Sampler) {}
6685

@@ -205,7 +224,7 @@ function setupTelemetry() {
205224

206225
const samplingRate = 1.0 / Math.max(parseInt(env.INTERNAL_OTEL_TRACE_SAMPLING_RATE, 10), 1);
207226

208-
const spanProcessors: SpanProcessor[] = [];
227+
const spanProcessors: SpanProcessor[] = [new DatasourceAttributeSpanProcessor()];
209228

210229
if (env.INTERNAL_OTEL_TRACE_EXPORTER_URL) {
211230
const headers = parseInternalTraceHeaders() ?? {};

references/hello-world/src/trigger/example.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { batch, logger, task, tasks, timeout, wait } from "@trigger.dev/sdk";
1+
import { batch, logger, task, tasks, timeout, wait, waitUntil } from "@trigger.dev/sdk";
22
import { setTimeout } from "timers/promises";
33
import { ResourceMonitor } from "../resourceMonitor.js";
44
import { fixedLengthTask } from "./batches.js";
@@ -21,6 +21,10 @@ export const helloWorldTask = task({
2121
env: process.env,
2222
});
2323

24+
waitUntil((async () => {
25+
logger.info("Hello, world from the waitUntil hook", { payload });
26+
})());
27+
2428
logger.debug("debug: Hello, worlds!", { payload });
2529
logger.info("info: Hello, world!", { payload });
2630
logger.log("log: Hello, world!", { payload });

0 commit comments

Comments
 (0)