diff --git a/docs/docs/plugins/custom-plugins.md b/docs/docs/plugins/custom-plugins.md index 7b7cf568..297ecde0 100644 --- a/docs/docs/plugins/custom-plugins.md +++ b/docs/docs/plugins/custom-plugins.md @@ -130,7 +130,7 @@ This pattern allows: - **Shared services**: - **Cache management**: Access the cache service via `this.cache`. See [`CacheConfig`](../api/appkit/Interface.CacheConfig.md) for configuration. - **Telemetry**: Instrument your plugin with traces and metrics via `this.telemetry`. See [`ITelemetry`](../api/appkit/Interface.ITelemetry.md). -- **Execution interceptors**: Use `execute()` and `executeStream()` with [`StreamExecutionSettings`](../api/appkit/Interface.StreamExecutionSettings.md) +- **Execution interceptors**: Use `execute()` and `executeStream()` with [`StreamExecutionSettings`](../api/appkit/Interface.StreamExecutionSettings.md) for automatic caching, retry, timeout, and [telemetry span attributes](./execution-context.md#telemetry-span-attributes) (`execution.context`, `caller.id`) **Consuming your plugin programmatically** diff --git a/docs/docs/plugins/execution-context.md b/docs/docs/plugins/execution-context.md index 6c7f8960..8d280e71 100644 --- a/docs/docs/plugins/execution-context.md +++ b/docs/docs/plugins/execution-context.md @@ -43,6 +43,18 @@ Exported from `@databricks/appkit`: - `getWorkspaceId()`: `Promise` (from `DATABRICKS_WORKSPACE_ID` or fetched) - `isInUserContext()`: Returns `true` if currently executing in user context +## Telemetry span attributes + +The `plugin.execute` span created by the execution interceptor chain includes these attributes: + +| Attribute | Type | Description | +|-----------|------|-------------| +| `execution.context` | `"user"` \| `"service"` | Whether the operation runs as a user (OBO) or service principal | +| `caller.id` | `string` | The user ID (OBO) or service principal ID | +| `execution.obo_dev_fallback` | `boolean` | Set to `true` when an OBO call falls back to service principal in development mode | + +These attributes are automatically added when your plugin uses `execute()` or `executeStream()`. All built-in plugins use these methods for their OBO operations. Custom plugins should do the same to get automatic telemetry instrumentation. + ## Development mode behavior -In local development (`NODE_ENV=development`), if `asUser(req)` is called without a user token, it logs a warning and skips user impersonation — the operation runs with the default credentials configured for the app instead. +In local development (`NODE_ENV=development`), if `asUser(req)` is called without a user token, it logs a warning and skips user impersonation — the operation runs with the default credentials configured for the app instead. The telemetry span will show `execution.context: "service"` with `execution.obo_dev_fallback: true` to distinguish these from regular service principal calls. diff --git a/packages/appkit/package.json b/packages/appkit/package.json index 146be5a9..a5039c3c 100644 --- a/packages/appkit/package.json +++ b/packages/appkit/package.json @@ -80,6 +80,7 @@ "ws": "8.18.3" }, "devDependencies": { + "@opentelemetry/context-async-hooks": "2.6.1", "@types/express": "4.17.25", "@types/json-schema": "7.0.15", "@types/pg": "8.16.0", diff --git a/packages/appkit/src/plugin/interceptors/telemetry.ts b/packages/appkit/src/plugin/interceptors/telemetry.ts index d08d3149..2dd2d6a1 100644 --- a/packages/appkit/src/plugin/interceptors/telemetry.ts +++ b/packages/appkit/src/plugin/interceptors/telemetry.ts @@ -1,6 +1,11 @@ import type { TelemetryConfig } from "shared"; +import { + getCurrentUserId, + isInUserContext, +} from "../../context/execution-context"; import type { ITelemetry, Span } from "../../telemetry"; import { SpanStatusCode } from "../../telemetry"; +import { isDevOboFallback } from "../plugin"; import type { ExecutionInterceptor, InterceptorContext } from "./types"; export class TelemetryInterceptor implements ExecutionInterceptor { @@ -45,6 +50,15 @@ export class TelemetryInterceptor implements ExecutionInterceptor { } try { + span.setAttribute( + "execution.context", + isInUserContext() ? "user" : "service", + ); + span.setAttribute("caller.id", getCurrentUserId()); + if (isDevOboFallback()) { + span.setAttribute("execution.obo_dev_fallback", true); + } + const result = await fn(); if (!isAborted) { span.setStatus({ code: SpanStatusCode.OK }); diff --git a/packages/appkit/src/plugin/plugin.ts b/packages/appkit/src/plugin/plugin.ts index 5173cb61..75d994d8 100644 --- a/packages/appkit/src/plugin/plugin.ts +++ b/packages/appkit/src/plugin/plugin.ts @@ -1,3 +1,4 @@ +import { createContextKey, context as otelContext } from "@opentelemetry/api"; import type express from "express"; import type { BasePlugin, @@ -41,6 +42,20 @@ import type { const logger = createLogger("plugin"); +/** + * OTel context key for marking OBO dev mode fallback. + * Set when asUser() is called in development mode without a user token. + */ +const DEV_OBO_FALLBACK_KEY = createContextKey("appkit.devOboFallback"); + +/** + * Returns true if the current execution is an OBO dev mode fallback + * (asUser() was called but fell back to service principal due to missing token). + */ +export function isDevOboFallback(): boolean { + return otelContext.active().getValue(DEV_OBO_FALLBACK_KEY) === true; +} + /** * Narrow an unknown thrown value to an Error that carries a numeric * `statusCode` property (e.g. `ApiError` from `@databricks/sdk-experimental`). @@ -338,7 +353,23 @@ export abstract class Plugin< "asUser() called without user token in development mode. Skipping user impersonation.", ); - return this; + // Return a proxy that marks execution as OBO dev fallback via OTel context, + // so telemetry spans can distinguish intended OBO calls from regular SP calls + return new Proxy(this, { + get: (target, prop, receiver) => { + const value = Reflect.get(target, prop, receiver); + if (typeof value !== "function") return value; + if (typeof prop === "string" && EXCLUDED_FROM_PROXY.has(prop)) + return value; + + return (...args: unknown[]) => { + const ctx = otelContext + .active() + .setValue(DEV_OBO_FALLBACK_KEY, true); + return otelContext.with(ctx, () => value.apply(target, args)); + }; + }, + }) as this; } if (!token) { @@ -409,6 +440,9 @@ export abstract class Plugin< const effectiveUserKey = userKey ?? getCurrentUserId(); const self = this; + // capture the active OTel context (HTTP span) before entering the async generator, + // where it would otherwise be lost across the async boundary + const parentOtelContext = otelContext.active(); // wrapper function to ensure it returns a generator const asyncWrapperFn = async function* (streamSignal?: AbortSignal) { @@ -428,11 +462,14 @@ export abstract class Plugin< return result; }; - // execute the function with interceptors - const result = await self._executeWithInterceptors( - wrappedFn as (signal?: AbortSignal) => Promise, - interceptors, - context, + // execute the function with interceptors, restoring the parent OTel context + // so telemetry spans are linked as children of the HTTP request span + const result = await otelContext.with(parentOtelContext, () => + self._executeWithInterceptors( + wrappedFn as (signal?: AbortSignal) => Promise, + interceptors, + context, + ), ); // check if result is a generator diff --git a/packages/appkit/src/plugin/tests/plugin.test.ts b/packages/appkit/src/plugin/tests/plugin.test.ts index 440579d7..0a08bdb5 100644 --- a/packages/appkit/src/plugin/tests/plugin.test.ts +++ b/packages/appkit/src/plugin/tests/plugin.test.ts @@ -1,3 +1,9 @@ +import { + type ContextManager, + createContextKey, + context as otelContext, +} from "@opentelemetry/api"; +import { AsyncLocalStorageContextManager } from "@opentelemetry/context-async-hooks"; import { createMockTelemetry, mockServiceContext } from "@tools/test-helpers"; import type express from "express"; import type { @@ -5,7 +11,16 @@ import type { IAppResponse, PluginExecuteConfig, } from "shared"; -import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { + afterAll, + afterEach, + beforeAll, + beforeEach, + describe, + expect, + test, + vi, +} from "vitest"; import { AppManager } from "../../app"; import { CacheManager } from "../../cache"; import { ServiceContext } from "../../context/service-context"; @@ -20,7 +35,7 @@ import { StreamManager } from "../../stream"; import type { ITelemetry, TelemetryProvider } from "../../telemetry"; import { TelemetryManager } from "../../telemetry"; import type { InterceptorContext } from "../interceptors/types"; -import { Plugin } from "../plugin"; +import { isDevOboFallback, Plugin } from "../plugin"; const { MockApiError } = vi.hoisted(() => { class MockApiError extends Error { @@ -148,6 +163,20 @@ class PluginWithRoutes extends TestPlugin { } } +class OboTestPlugin extends Plugin { + lastOboFallbackValue: boolean | undefined; + + async captureOboFallback(): Promise { + this.lastOboFallbackValue = isDevOboFallback(); + return "captured"; + } + + syncCapture(): string { + this.lastOboFallbackValue = isDevOboFallback(); + return "sync-captured"; + } +} + describe("Plugin", () => { let mockTelemetry: ITelemetry; let mockCache: CacheManager; @@ -916,4 +945,164 @@ describe("Plugin", () => { expect(result).toEqual({ ok: true, data: "integration-result" }); }); }); + + describe("asUser() dev fallback", () => { + let originalNodeEnv: string | undefined; + let contextManager: ContextManager; + + beforeAll(() => { + otelContext.disable(); + contextManager = new AsyncLocalStorageContextManager().enable(); + otelContext.setGlobalContextManager(contextManager); + }); + + afterAll(() => { + otelContext.disable(); + }); + + beforeEach(() => { + originalNodeEnv = process.env.NODE_ENV; + process.env.NODE_ENV = "development"; + vi.useRealTimers(); + }); + + afterEach(() => { + process.env.NODE_ENV = originalNodeEnv; + }); + + function createMockReqWithoutToken(): express.Request { + return { + header: vi.fn().mockReturnValue(undefined), + } as unknown as express.Request; + } + + test("should return a Proxy (different reference) in dev mode without token", () => { + const plugin = new TestPlugin(config); + const proxied = plugin.asUser(createMockReqWithoutToken()); + + expect(proxied).not.toBe(plugin); + expect(proxied).toBeInstanceOf(TestPlugin); + }); + + test("should pass through non-function properties unchanged", () => { + const plugin = new TestPlugin(config); + const proxied = plugin.asUser(createMockReqWithoutToken()); + + expect(proxied.name).toBe(plugin.name); + }); + + test("should preserve return values from proxied async methods", async () => { + const plugin = new TestPlugin(config); + const proxied = plugin.asUser(createMockReqWithoutToken()); + + const result = await proxied.customMethod("value"); + expect(result).toBe("processed-value"); + }); + + test("should preserve return values from proxied sync methods", () => { + const plugin = new TestPlugin(config); + const proxied = plugin.asUser(createMockReqWithoutToken()); + + const result = proxied.syncMethod("value"); + expect(result).toBe("sync-value"); + }); + + test("should set isDevOboFallback() to true inside proxied method", async () => { + const plugin = new OboTestPlugin(config); + const proxied = plugin.asUser(createMockReqWithoutToken()); + + await proxied.captureOboFallback(); + + expect(plugin.lastOboFallbackValue).toBe(true); + }); + + test("should set isDevOboFallback() to true inside proxied sync method", () => { + const plugin = new OboTestPlugin(config); + const proxied = plugin.asUser(createMockReqWithoutToken()); + + proxied.syncCapture(); + + expect(plugin.lastOboFallbackValue).toBe(true); + }); + + test("should not set OBO fallback for excluded methods (setup)", async () => { + const plugin = new OboTestPlugin(config); + // Override setup to capture OBO fallback + plugin.setup = async () => { + plugin.lastOboFallbackValue = isDevOboFallback(); + }; + + const proxied = plugin.asUser(createMockReqWithoutToken()); + await proxied.setup(); + + expect(plugin.lastOboFallbackValue).toBe(false); + }); + + test("isDevOboFallback() should return false outside proxy context", () => { + expect(isDevOboFallback()).toBe(false); + }); + }); + + describe("executeStream OTel context preservation", () => { + let contextManager: ContextManager; + + beforeAll(() => { + otelContext.disable(); + contextManager = new AsyncLocalStorageContextManager().enable(); + otelContext.setGlobalContextManager(contextManager); + }); + + afterAll(() => { + otelContext.disable(); + }); + + beforeEach(() => { + vi.useRealTimers(); + }); + + test("should preserve parent OTel context inside async generator", async () => { + const plugin = new TestPlugin(config); + const mockResponse = {} as IAppResponse; + + const TEST_KEY = createContextKey("test.parent.context"); + const parentCtx = otelContext.active().setValue(TEST_KEY, "parent-value"); + + let capturedContextValue: unknown; + + const mockFn = vi.fn().mockImplementation(async () => { + capturedContextValue = otelContext.active().getValue(TEST_KEY); + return "stream-result"; + }); + + // Capture the generator function passed to streamManager.stream + let capturedGeneratorFn: any; + vi.mocked(mockStreamManager.stream).mockImplementation( + async (_res, genFn) => { + capturedGeneratorFn = genFn; + }, + ); + + // Execute within the parent context + await otelContext.with(parentCtx, () => + (plugin as any).executeStream(mockResponse, mockFn, { + default: {}, + stream: {}, + }), + ); + + // Invoke the captured generator OUTSIDE the parent context scope + // The generator should restore parentOtelContext internally + const gen = capturedGeneratorFn(); + await gen.next(); + + expect(capturedContextValue).toBe("parent-value"); + }); + + test("should not have parent context without the fix (baseline)", async () => { + const TEST_KEY = createContextKey("test.baseline.context"); + + // Outside any context, the value should not exist + expect(otelContext.active().getValue(TEST_KEY)).toBeUndefined(); + }); + }); }); diff --git a/packages/appkit/src/plugins/analytics/analytics.ts b/packages/appkit/src/plugins/analytics/analytics.ts index a9c688da..d591e32f 100644 --- a/packages/appkit/src/plugins/analytics/analytics.ts +++ b/packages/appkit/src/plugins/analytics/analytics.ts @@ -45,7 +45,10 @@ export class AnalyticsPlugin extends Plugin { } injectRoutes(router: IAppRouter) { - // Service principal endpoints + // Arrow data downloads always run as service principal and bypass the + // interceptor chain (execute/executeStream). The original query execution + // handles OBO via executeStream(); this endpoint fetches pre-computed + // results by job ID. this.route(router, { name: "arrow", method: "get", diff --git a/packages/appkit/src/telemetry/tests/telemetry-interceptor.test.ts b/packages/appkit/src/telemetry/tests/telemetry-interceptor.test.ts index 1e605d0a..d4c78ebf 100644 --- a/packages/appkit/src/telemetry/tests/telemetry-interceptor.test.ts +++ b/packages/appkit/src/telemetry/tests/telemetry-interceptor.test.ts @@ -1,8 +1,10 @@ import { type Span, SpanStatusCode } from "@opentelemetry/api"; import type { TelemetryConfig } from "shared"; import { beforeEach, describe, expect, test, vi } from "vitest"; +import * as executionContext from "../../context/execution-context"; import { TelemetryInterceptor } from "../../plugin/interceptors/telemetry"; import type { InterceptorContext } from "../../plugin/interceptors/types"; +import * as pluginModule from "../../plugin/plugin"; import type { ITelemetry } from "../types"; describe("TelemetryInterceptor", () => { @@ -36,6 +38,8 @@ describe("TelemetryInterceptor", () => { metadata: new Map(), userKey: "test", }; + + vi.spyOn(executionContext, "getCurrentUserId").mockReturnValue("test-user"); }); test("should execute function and set span status to OK on success", async () => { @@ -131,4 +135,66 @@ describe("TelemetryInterceptor", () => { // Verify end was called despite the error expect(mockSpan.end).toHaveBeenCalledTimes(1); }); + + test("should set execution context as 'service' when not in user context", async () => { + vi.spyOn(executionContext, "isInUserContext").mockReturnValue(false); + vi.spyOn(executionContext, "getCurrentUserId").mockReturnValue("sp-123"); + const interceptor = new TelemetryInterceptor(mockTelemetry); + const fn = vi.fn().mockResolvedValue("result"); + + await interceptor.intercept(fn, context); + + expect(mockSpan.setAttribute).toHaveBeenCalledWith( + "execution.context", + "service", + ); + expect(mockSpan.setAttribute).toHaveBeenCalledWith("caller.id", "sp-123"); + }); + + test("should set execution context as 'user' when in user context", async () => { + vi.spyOn(executionContext, "isInUserContext").mockReturnValue(true); + vi.spyOn(executionContext, "getCurrentUserId").mockReturnValue("user-123"); + const interceptor = new TelemetryInterceptor(mockTelemetry); + const fn = vi.fn().mockResolvedValue("result"); + + await interceptor.intercept(fn, context); + + expect(mockSpan.setAttribute).toHaveBeenCalledWith( + "execution.context", + "user", + ); + expect(mockSpan.setAttribute).toHaveBeenCalledWith("caller.id", "user-123"); + }); + + test("should set execution.obo_dev_fallback when in dev OBO fallback", async () => { + vi.spyOn(executionContext, "isInUserContext").mockReturnValue(false); + vi.spyOn(pluginModule, "isDevOboFallback").mockReturnValue(true); + const interceptor = new TelemetryInterceptor(mockTelemetry); + const fn = vi.fn().mockResolvedValue("result"); + + await interceptor.intercept(fn, context); + + expect(mockSpan.setAttribute).toHaveBeenCalledWith( + "execution.context", + "service", + ); + expect(mockSpan.setAttribute).toHaveBeenCalledWith( + "execution.obo_dev_fallback", + true, + ); + }); + + test("should not set execution.obo_dev_fallback when not in dev fallback", async () => { + vi.spyOn(executionContext, "isInUserContext").mockReturnValue(false); + vi.spyOn(pluginModule, "isDevOboFallback").mockReturnValue(false); + const interceptor = new TelemetryInterceptor(mockTelemetry); + const fn = vi.fn().mockResolvedValue("result"); + + await interceptor.intercept(fn, context); + + expect(mockSpan.setAttribute).not.toHaveBeenCalledWith( + "execution.obo_dev_fallback", + expect.anything(), + ); + }); }); diff --git a/packages/lakebase/src/pool.ts b/packages/lakebase/src/pool.ts index 1ca6c254..c07c114c 100644 --- a/packages/lakebase/src/pool.ts +++ b/packages/lakebase/src/pool.ts @@ -92,6 +92,7 @@ export function createLakebasePool( kind: SpanKind.CLIENT, attributes: { "db.system": "lakebase", + "db.user": poolConfig.user ?? "unknown", "db.statement": sql ? sql.substring(0, 500) : "unknown", }, }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9ca11b81..24b4d9cc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -327,6 +327,9 @@ importers: specifier: 8.18.3 version: 8.18.3(bufferutil@4.0.9) devDependencies: + '@opentelemetry/context-async-hooks': + specifier: 2.6.1 + version: 2.6.1(@opentelemetry/api@1.9.0) '@types/express': specifier: 4.17.25 version: 4.17.25 @@ -2787,6 +2790,12 @@ packages: peerDependencies: '@opentelemetry/api': '>=1.0.0 <1.10.0' + '@opentelemetry/context-async-hooks@2.6.1': + resolution: {integrity: sha512-XHzhwRNkBpeP8Fs/qjGrAf9r9PRv67wkJQ/7ZPaBQQ68DYlTBBx5MF9LvPx7mhuXcDessKK2b+DcxqwpgkcivQ==} + engines: {node: ^18.19.0 || >=20.6.0} + peerDependencies: + '@opentelemetry/api': '>=1.0.0 <1.10.0' + '@opentelemetry/core@2.2.0': resolution: {integrity: sha512-FuabnnUm8LflnieVxs6eP7Z383hgQU4W1e3KJS6aOG3RxWxcHyBxH8fDMHNgu/gFx/M2jvTOW/4/PHhLz6bjWw==} engines: {node: ^18.19.0 || >=20.6.0} @@ -15111,6 +15120,10 @@ snapshots: dependencies: '@opentelemetry/api': 1.9.0 + '@opentelemetry/context-async-hooks@2.6.1(@opentelemetry/api@1.9.0)': + dependencies: + '@opentelemetry/api': 1.9.0 + '@opentelemetry/core@2.2.0(@opentelemetry/api@1.9.0)': dependencies: '@opentelemetry/api': 1.9.0