Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/plugins/custom-plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ This pattern allows:
- **Shared services**:
- **Cache management**: Access the cache service via `this.cache`. See [`CacheConfig`](../api/appkit/Interface.CacheConfig.md) for configuration.
- **Telemetry**: Instrument your plugin with traces and metrics via `this.telemetry`. See [`ITelemetry`](../api/appkit/Interface.ITelemetry.md).
- **Execution interceptors**: Use `execute()` and `executeStream()` with [`StreamExecutionSettings`](../api/appkit/Interface.StreamExecutionSettings.md)
- **Execution interceptors**: Use `execute()` and `executeStream()` with [`StreamExecutionSettings`](../api/appkit/Interface.StreamExecutionSettings.md) for automatic caching, retry, timeout, and [telemetry span attributes](./execution-context.md#telemetry-span-attributes) (`execution.context`, `caller.id`)

**Consuming your plugin programmatically**

Expand Down
14 changes: 13 additions & 1 deletion docs/docs/plugins/execution-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ Exported from `@databricks/appkit`:
- `getWorkspaceId()`: `Promise<string>` (from `DATABRICKS_WORKSPACE_ID` or fetched)
- `isInUserContext()`: Returns `true` if currently executing in user context

## Telemetry span attributes

The `plugin.execute` span created by the execution interceptor chain includes these attributes:

| Attribute | Type | Description |
|-----------|------|-------------|
| `execution.context` | `"user"` \| `"service"` | Whether the operation runs as a user (OBO) or service principal |
| `caller.id` | `string` | The user ID (OBO) or service principal ID |
| `execution.obo_dev_fallback` | `boolean` | Set to `true` when an OBO call falls back to service principal in development mode |

These attributes are automatically added when your plugin uses `execute()` or `executeStream()`. All built-in plugins use these methods for their OBO operations. Custom plugins should do the same to get automatic telemetry instrumentation.

## Development mode behavior

In local development (`NODE_ENV=development`), if `asUser(req)` is called without a user token, it logs a warning and skips user impersonation — the operation runs with the default credentials configured for the app instead.
In local development (`NODE_ENV=development`), if `asUser(req)` is called without a user token, it logs a warning and skips user impersonation — the operation runs with the default credentials configured for the app instead. The telemetry span will show `execution.context: "service"` with `execution.obo_dev_fallback: true` to distinguish these from regular service principal calls.
1 change: 1 addition & 0 deletions packages/appkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"ws": "8.18.3"
},
"devDependencies": {
"@opentelemetry/context-async-hooks": "2.6.1",
"@types/express": "4.17.25",
"@types/json-schema": "7.0.15",
"@types/pg": "8.16.0",
Expand Down
14 changes: 14 additions & 0 deletions packages/appkit/src/plugin/interceptors/telemetry.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type { TelemetryConfig } from "shared";
import {
getCurrentUserId,
isInUserContext,
} from "../../context/execution-context";
import type { ITelemetry, Span } from "../../telemetry";
import { SpanStatusCode } from "../../telemetry";
import { isDevOboFallback } from "../plugin";
import type { ExecutionInterceptor, InterceptorContext } from "./types";

export class TelemetryInterceptor implements ExecutionInterceptor {
Expand Down Expand Up @@ -45,6 +50,15 @@ export class TelemetryInterceptor implements ExecutionInterceptor {
}

try {
span.setAttribute(
"execution.context",
isInUserContext() ? "user" : "service",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will work, have you tried it works? the user context will only be set if asUser was called but it doesn't propagate down all the chain, did you test it?

);
span.setAttribute("caller.id", getCurrentUserId());
if (isDevOboFallback()) {
span.setAttribute("execution.obo_dev_fallback", true);
}

const result = await fn();
if (!isAborted) {
span.setStatus({ code: SpanStatusCode.OK });
Expand Down
49 changes: 43 additions & 6 deletions packages/appkit/src/plugin/plugin.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createContextKey, context as otelContext } from "@opentelemetry/api";
import type express from "express";
import type {
BasePlugin,
Expand Down Expand Up @@ -41,6 +42,20 @@ import type {

const logger = createLogger("plugin");

/**
* OTel context key for marking OBO dev mode fallback.
* Set when asUser() is called in development mode without a user token.
*/
const DEV_OBO_FALLBACK_KEY = createContextKey("appkit.devOboFallback");

/**
* Returns true if the current execution is an OBO dev mode fallback
* (asUser() was called but fell back to service principal due to missing token).
*/
export function isDevOboFallback(): boolean {
return otelContext.active().getValue(DEV_OBO_FALLBACK_KEY) === true;
}

/**
* Narrow an unknown thrown value to an Error that carries a numeric
* `statusCode` property (e.g. `ApiError` from `@databricks/sdk-experimental`).
Expand Down Expand Up @@ -338,7 +353,23 @@ export abstract class Plugin<
"asUser() called without user token in development mode. Skipping user impersonation.",
);

return this;
// Return a proxy that marks execution as OBO dev fallback via OTel context,
// so telemetry spans can distinguish intended OBO calls from regular SP calls
return new Proxy(this, {
get: (target, prop, receiver) => {
const value = Reflect.get(target, prop, receiver);
if (typeof value !== "function") return value;
if (typeof prop === "string" && EXCLUDED_FROM_PROXY.has(prop))
return value;

return (...args: unknown[]) => {
const ctx = otelContext
.active()
.setValue(DEV_OBO_FALLBACK_KEY, true);
return otelContext.with(ctx, () => value.apply(target, args));
};
},
}) as this;
}

if (!token) {
Expand Down Expand Up @@ -409,6 +440,9 @@ export abstract class Plugin<
const effectiveUserKey = userKey ?? getCurrentUserId();

const self = this;
// capture the active OTel context (HTTP span) before entering the async generator,
// where it would otherwise be lost across the async boundary
const parentOtelContext = otelContext.active();

// wrapper function to ensure it returns a generator
const asyncWrapperFn = async function* (streamSignal?: AbortSignal) {
Expand All @@ -428,11 +462,14 @@ export abstract class Plugin<
return result;
};

// execute the function with interceptors
const result = await self._executeWithInterceptors(
wrappedFn as (signal?: AbortSignal) => Promise<T>,
interceptors,
context,
// execute the function with interceptors, restoring the parent OTel context
// so telemetry spans are linked as children of the HTTP request span
const result = await otelContext.with(parentOtelContext, () =>
self._executeWithInterceptors(
wrappedFn as (signal?: AbortSignal) => Promise<T>,
interceptors,
context,
),
);

// check if result is a generator
Expand Down
193 changes: 191 additions & 2 deletions packages/appkit/src/plugin/tests/plugin.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import {
type ContextManager,
createContextKey,
context as otelContext,
} from "@opentelemetry/api";
import { AsyncLocalStorageContextManager } from "@opentelemetry/context-async-hooks";
import { createMockTelemetry, mockServiceContext } from "@tools/test-helpers";
import type express from "express";
import type {
BasePluginConfig,
IAppResponse,
PluginExecuteConfig,
} from "shared";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import {
afterAll,
afterEach,
beforeAll,
beforeEach,
describe,
expect,
test,
vi,
} from "vitest";
import { AppManager } from "../../app";
import { CacheManager } from "../../cache";
import { ServiceContext } from "../../context/service-context";
Expand All @@ -20,7 +35,7 @@ import { StreamManager } from "../../stream";
import type { ITelemetry, TelemetryProvider } from "../../telemetry";
import { TelemetryManager } from "../../telemetry";
import type { InterceptorContext } from "../interceptors/types";
import { Plugin } from "../plugin";
import { isDevOboFallback, Plugin } from "../plugin";

const { MockApiError } = vi.hoisted(() => {
class MockApiError extends Error {
Expand Down Expand Up @@ -148,6 +163,20 @@ class PluginWithRoutes extends TestPlugin {
}
}

class OboTestPlugin extends Plugin<BasePluginConfig> {
lastOboFallbackValue: boolean | undefined;

async captureOboFallback(): Promise<string> {
this.lastOboFallbackValue = isDevOboFallback();
return "captured";
}

syncCapture(): string {
this.lastOboFallbackValue = isDevOboFallback();
return "sync-captured";
}
}

describe("Plugin", () => {
let mockTelemetry: ITelemetry;
let mockCache: CacheManager;
Expand Down Expand Up @@ -916,4 +945,164 @@ describe("Plugin", () => {
expect(result).toEqual({ ok: true, data: "integration-result" });
});
});

describe("asUser() dev fallback", () => {
let originalNodeEnv: string | undefined;
let contextManager: ContextManager;

beforeAll(() => {
otelContext.disable();
contextManager = new AsyncLocalStorageContextManager().enable();
otelContext.setGlobalContextManager(contextManager);
});

afterAll(() => {
otelContext.disable();
});

beforeEach(() => {
originalNodeEnv = process.env.NODE_ENV;
process.env.NODE_ENV = "development";
vi.useRealTimers();
});

afterEach(() => {
process.env.NODE_ENV = originalNodeEnv;
});

function createMockReqWithoutToken(): express.Request {
return {
header: vi.fn().mockReturnValue(undefined),
} as unknown as express.Request;
}

test("should return a Proxy (different reference) in dev mode without token", () => {
const plugin = new TestPlugin(config);
const proxied = plugin.asUser(createMockReqWithoutToken());

expect(proxied).not.toBe(plugin);
expect(proxied).toBeInstanceOf(TestPlugin);
});

test("should pass through non-function properties unchanged", () => {
const plugin = new TestPlugin(config);
const proxied = plugin.asUser(createMockReqWithoutToken());

expect(proxied.name).toBe(plugin.name);
});

test("should preserve return values from proxied async methods", async () => {
const plugin = new TestPlugin(config);
const proxied = plugin.asUser(createMockReqWithoutToken());

const result = await proxied.customMethod("value");
expect(result).toBe("processed-value");
});

test("should preserve return values from proxied sync methods", () => {
const plugin = new TestPlugin(config);
const proxied = plugin.asUser(createMockReqWithoutToken());

const result = proxied.syncMethod("value");
expect(result).toBe("sync-value");
});

test("should set isDevOboFallback() to true inside proxied method", async () => {
const plugin = new OboTestPlugin(config);
const proxied = plugin.asUser(createMockReqWithoutToken());

await proxied.captureOboFallback();

expect(plugin.lastOboFallbackValue).toBe(true);
});

test("should set isDevOboFallback() to true inside proxied sync method", () => {
const plugin = new OboTestPlugin(config);
const proxied = plugin.asUser(createMockReqWithoutToken());

proxied.syncCapture();

expect(plugin.lastOboFallbackValue).toBe(true);
});

test("should not set OBO fallback for excluded methods (setup)", async () => {
const plugin = new OboTestPlugin(config);
// Override setup to capture OBO fallback
plugin.setup = async () => {
plugin.lastOboFallbackValue = isDevOboFallback();
};

const proxied = plugin.asUser(createMockReqWithoutToken());
await proxied.setup();

expect(plugin.lastOboFallbackValue).toBe(false);
});

test("isDevOboFallback() should return false outside proxy context", () => {
expect(isDevOboFallback()).toBe(false);
});
});

describe("executeStream OTel context preservation", () => {
let contextManager: ContextManager;

beforeAll(() => {
otelContext.disable();
contextManager = new AsyncLocalStorageContextManager().enable();
otelContext.setGlobalContextManager(contextManager);
});

afterAll(() => {
otelContext.disable();
});

beforeEach(() => {
vi.useRealTimers();
});

test("should preserve parent OTel context inside async generator", async () => {
const plugin = new TestPlugin(config);
const mockResponse = {} as IAppResponse;

const TEST_KEY = createContextKey("test.parent.context");
const parentCtx = otelContext.active().setValue(TEST_KEY, "parent-value");

let capturedContextValue: unknown;

const mockFn = vi.fn().mockImplementation(async () => {
capturedContextValue = otelContext.active().getValue(TEST_KEY);
return "stream-result";
});

// Capture the generator function passed to streamManager.stream
let capturedGeneratorFn: any;
vi.mocked(mockStreamManager.stream).mockImplementation(
async (_res, genFn) => {
capturedGeneratorFn = genFn;
},
);

// Execute within the parent context
await otelContext.with(parentCtx, () =>
(plugin as any).executeStream(mockResponse, mockFn, {
default: {},
stream: {},
}),
);

// Invoke the captured generator OUTSIDE the parent context scope
// The generator should restore parentOtelContext internally
const gen = capturedGeneratorFn();
await gen.next();

expect(capturedContextValue).toBe("parent-value");
});

test("should not have parent context without the fix (baseline)", async () => {
const TEST_KEY = createContextKey("test.baseline.context");

// Outside any context, the value should not exist
expect(otelContext.active().getValue(TEST_KEY)).toBeUndefined();
});
});
});
5 changes: 4 additions & 1 deletion packages/appkit/src/plugins/analytics/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ export class AnalyticsPlugin extends Plugin {
}

injectRoutes(router: IAppRouter) {
// Service principal endpoints
// Arrow data downloads always run as service principal and bypass the
// interceptor chain (execute/executeStream). The original query execution
// handles OBO via executeStream(); this endpoint fetches pre-computed
// results by job ID.
this.route(router, {
name: "arrow",
method: "get",
Expand Down
Loading
Loading