Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/prisma-span-datasource-attribute.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Tag Prisma spans with `db.datasource: "writer" | "replica"` so monitors and trace queries can distinguish the writer pool from the replica pool. Applies to all `prisma:engine:*` spans (including `prisma:engine:connection` used by the connection-pool monitors) and the outer `prisma:client:operation` span.
32 changes: 25 additions & 7 deletions apps/webapp/app/db.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import { env } from "./env.server";
import { logger } from "./services/logger.server";
import { isValidDatabaseUrl } from "./utils/db";
import { singleton } from "./utils/singleton";
import { startActiveSpan } from "./v3/tracer.server";
import { Span } from "@opentelemetry/api";
import { DATASOURCE_CONTEXT_KEY, startActiveSpan } from "./v3/tracer.server";
import { context, Span, trace } from "@opentelemetry/api";
import { queryPerformanceMonitor } from "./utils/queryPerformanceMonitor.server";

export type {
Expand Down Expand Up @@ -98,12 +98,30 @@ export async function $transaction<R>(

export { Prisma };

export const prisma = singleton("prisma", getClient);
function tagDatasource<T extends PrismaClient>(
datasource: "writer" | "replica",
client: T
): T {
return client.$extends({
name: "datasource-tagger",
query: {
$allOperations: ({ query, args }) => {
trace.getActiveSpan()?.setAttribute("db.datasource", datasource);
return context.with(
context.active().setValue(DATASOURCE_CONTEXT_KEY, datasource),
async () => await query(args)
);
},
},
}) as unknown as T;
}

export const $replica: PrismaReplicaClient = singleton(
"replica",
() => getReplicaClient() ?? prisma
);
export const prisma = singleton("prisma", () => tagDatasource("writer", getClient()));

export const $replica: PrismaReplicaClient = singleton("replica", () => {
const replica = getReplicaClient();
return replica ? tagDatasource("replica", replica) : prisma;
});

function getClient() {
const { DATABASE_URL } = process.env;
Expand Down
21 changes: 20 additions & 1 deletion apps/webapp/app/v3/tracer.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
type Attributes,
type Context,
createContextKey,
DiagConsoleLogger,
DiagLogLevel,
type Link,
Expand Down Expand Up @@ -61,6 +62,24 @@ import { performance } from "node:perf_hooks";

export const SEMINTATTRS_FORCE_RECORDING = "forceRecording";

export const DATASOURCE_CONTEXT_KEY = createContextKey("trigger.db.datasource");

class DatasourceAttributeSpanProcessor implements SpanProcessor {
onStart(span: Span, parentContext: Context): void {
const ds = parentContext.getValue(DATASOURCE_CONTEXT_KEY);
if (typeof ds === "string") {
span.setAttribute("db.datasource", ds);
}
}
onEnd(): void {}
shutdown(): Promise<void> {
return Promise.resolve();
}
forceFlush(): Promise<void> {
return Promise.resolve();
}
}

class CustomWebappSampler implements Sampler {
constructor(private readonly _baseSampler: Sampler) {}

Expand Down Expand Up @@ -205,7 +224,7 @@ function setupTelemetry() {

const samplingRate = 1.0 / Math.max(parseInt(env.INTERNAL_OTEL_TRACE_SAMPLING_RATE, 10), 1);

const spanProcessors: SpanProcessor[] = [];
const spanProcessors: SpanProcessor[] = [new DatasourceAttributeSpanProcessor()];

if (env.INTERNAL_OTEL_TRACE_EXPORTER_URL) {
const headers = parseInternalTraceHeaders() ?? {};
Expand Down
6 changes: 5 additions & 1 deletion references/hello-world/src/trigger/example.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { batch, logger, task, tasks, timeout, wait } from "@trigger.dev/sdk";
import { batch, logger, task, tasks, timeout, wait, waitUntil } from "@trigger.dev/sdk";
import { setTimeout } from "timers/promises";
import { ResourceMonitor } from "../resourceMonitor.js";
import { fixedLengthTask } from "./batches.js";
Expand All @@ -21,6 +21,10 @@ export const helloWorldTask = task({
env: process.env,
});

waitUntil((async () => {
logger.info("Hello, world from the waitUntil hook", { payload });
})());

logger.debug("debug: Hello, worlds!", { payload });
logger.info("info: Hello, world!", { payload });
logger.log("log: Hello, world!", { payload });
Expand Down
Loading