Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

version: 2
updates:
- package-ecosystem: "" # See documentation for possible values
- package-ecosystem: "bun" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"

53 changes: 28 additions & 25 deletions src/handlers/message.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SeverityNumber } from "@opentelemetry/api-logs"
import { SpanStatusCode, SpanKind, trace } from "@opentelemetry/api"
import { SpanStatusCode, SpanKind } from "@opentelemetry/api"
import type { AssistantMessage, EventMessageUpdated, EventMessagePartUpdated, ToolPart } from "@opencode-ai/sdk"
import {
AGENT_NAME,
Expand Down Expand Up @@ -27,7 +27,16 @@ import {
TOOL_NAME,
TOOL_PARAMETERS,
} from "@arizeai/openinference-semantic-conventions"
import { agentAttrs, errorSummary, setBoundedMap, accumulateSessionTotals, getSessionAgentMeta, isMetricEnabled, isTraceEnabled } from "../util.ts"
import {
agentAttrs,
errorSummary,
setBoundedMap,
accumulateSessionTotals,
getSessionAgentMeta,
isMetricEnabled,
isTraceEnabled,
resolveSessionTraceContext,
} from "../util.ts"
import type { HandlerContext } from "../types.ts"

const OPENINFERENCE_SPAN_KIND = SemanticConventions.OPENINFERENCE_SPAN_KIND
Expand All @@ -52,6 +61,7 @@ export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext
const msg = e.properties.info
if (msg.role !== "assistant") return
const assistant = msg as AssistantMessage
setBoundedMap(ctx.assistantRuns, assistant.id, assistant.parentID)
if (!assistant.time.completed) return

const { sessionID, modelID, providerID } = assistant
Expand Down Expand Up @@ -260,11 +270,6 @@ export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: Handle
const { agentName, agentType } = getSessionAgentMeta(toolPart.sessionID, ctx)
const toolSpan = isTraceEnabled("tool", ctx)
? (() => {
const sessionSpan = ctx.sessionSpans.get(toolPart.sessionID)
const baseCtx = ctx.rootContext()
const parentCtx = sessionSpan
? trace.setSpan(baseCtx, sessionSpan)
: baseCtx
return ctx.tracer.startSpan(
`${ctx.tracePrefix}tool.${toolPart.tool}`,
{
Expand All @@ -283,7 +288,9 @@ export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: Handle
...ctx.commonAttrs,
},
},
parentCtx,
resolveSessionTraceContext(toolPart.sessionID, ctx, {
assistantMessageID: toolPart.messageID,
}),
)
})()
: undefined
Expand Down Expand Up @@ -319,11 +326,6 @@ export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: Handle

if (isTraceEnabled("tool", ctx)) {
const toolSpan = pending?.span ?? (() => {
const sessionSpan = ctx.sessionSpans.get(toolPart.sessionID)
const baseCtx = ctx.rootContext()
const parentCtx = sessionSpan
? trace.setSpan(baseCtx, sessionSpan)
: baseCtx
return ctx.tracer.startSpan(
`${ctx.tracePrefix}tool.${toolPart.tool}`,
{
Expand All @@ -340,7 +342,9 @@ export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: Handle
...ctx.commonAttrs,
},
},
parentCtx,
resolveSessionTraceContext(toolPart.sessionID, ctx, {
assistantMessageID: toolPart.messageID,
}),
)
})()
toolSpan.setAttributes({ [AGENT_NAME]: agentName, "agent.type": agentType })
Expand Down Expand Up @@ -403,14 +407,16 @@ export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: Handle

/**
* Starts an LLM span for an assistant message when it first appears in `message.updated`.
* The span is parented to the session span and carries `gen_ai.*` semantic attributes for
* the model and provider. It is ended in `handleMessageUpdated` once the message completes.
* The span is parented to the active run or subagent span and carries `gen_ai.*` semantic
* attributes for the model and provider. It is ended in `handleMessageUpdated` once the
* message completes.
*
* Only called for assistant messages that have not yet completed (`time.completed` absent).
*/
export function startMessageSpan(
sessionID: string,
messageID: string,
parentID: string,
modelID: string,
providerID: string,
startTime: number,
Expand All @@ -419,12 +425,9 @@ export function startMessageSpan(
if (!isTraceEnabled("llm", ctx)) return
const msgKey = `${sessionID}:${messageID}`
if (ctx.messageSpans.has(msgKey)) return
setBoundedMap(ctx.assistantRuns, messageID, parentID)
const { agentName, agentType } = getSessionAgentMeta(sessionID, ctx)
const sessionSpan = ctx.sessionSpans.get(sessionID)
const baseCtx = ctx.rootContext()
const parentCtx = sessionSpan
? trace.setSpan(baseCtx, sessionSpan)
: baseCtx
const inputText = ctx.runInputs.get(parentID)

const msgSpan = ctx.tracer.startSpan(
`${ctx.tracePrefix}llm`,
Expand All @@ -439,17 +442,17 @@ export function startMessageSpan(
[LLM_SYSTEM]: providerID,
[LLM_PROVIDER]: providerID,
[LLM_MODEL_NAME]: modelID,
...(ctx.sessionInputs.has(sessionID)
...(inputText
? {
[INPUT_VALUE]: ctx.sessionInputs.get(sessionID)!,
[INPUT_VALUE]: inputText,
[INPUT_MIME_TYPE]: MimeType.TEXT,
[LLM_INPUT_MESSAGES]: JSON.stringify([{ role: "user", content: ctx.sessionInputs.get(sessionID)! }]),
[LLM_INPUT_MESSAGES]: JSON.stringify([{ role: "user", content: inputText }]),
}
: {}),
...ctx.commonAttrs,
},
},
parentCtx,
resolveSessionTraceContext(sessionID, ctx, { runID: parentID, assistantMessageID: messageID }),
)
setBoundedMap(ctx.messageSpans, msgKey, msgSpan)
}
125 changes: 109 additions & 16 deletions src/handlers/session.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,86 @@
import { SeverityNumber } from "@opentelemetry/api-logs"
import { SpanStatusCode, trace } from "@opentelemetry/api"
import { SpanStatusCode } from "@opentelemetry/api"
import type { EventSessionCreated, EventSessionIdle, EventSessionError, EventSessionStatus } from "@opencode-ai/sdk"
import { AGENT_NAME, OpenInferenceSpanKind, SemanticConventions, SESSION_ID } from "@arizeai/openinference-semantic-conventions"
import { agentAttrs, errorSummary, getSessionAgentMeta, setBoundedMap, isMetricEnabled, isTraceEnabled } from "../util.ts"
import {
AGENT_NAME,
INPUT_MIME_TYPE,
INPUT_VALUE,
LLM_INPUT_MESSAGES,
MimeType,
OpenInferenceSpanKind,
SemanticConventions,
SESSION_ID,
} from "@arizeai/openinference-semantic-conventions"
import {
agentAttrs,
errorSummary,
getSessionAgentMeta,
setBoundedMap,
isMetricEnabled,
isTraceEnabled,
resolveSessionTraceContext,
} from "../util.ts"
import type { HandlerContext, SessionAgentType } from "../types.ts"

const OPENINFERENCE_SPAN_KIND = SemanticConventions.OPENINFERENCE_SPAN_KIND

/** Starts or refreshes the root run span for a single user turn, keyed by the user message ID. */
export function handleRunStarted(
runID: string,
sessionID: string,
agent: string,
promptText: string,
model: string,
startTime: number,
ctx: HandlerContext,
) {
ctx.activeRuns.set(sessionID, runID)
ctx.pendingRuns.delete(sessionID)
if (promptText) setBoundedMap(ctx.runInputs, runID, promptText)
if (!isTraceEnabled("session", ctx)) return
const existing = ctx.runSpans.get(runID)
if (existing) {
existing.setAttributes({
[AGENT_NAME]: agent,
...(promptText
? {
[INPUT_VALUE]: promptText,
[INPUT_MIME_TYPE]: MimeType.TEXT,
[LLM_INPUT_MESSAGES]: JSON.stringify([{ role: "user", content: promptText }]),
}
: {}),
model,
})
return
}

const runSpan = ctx.tracer.startSpan(
`${ctx.tracePrefix}session`,
{
startTime,
attributes: {
[OPENINFERENCE_SPAN_KIND]: OpenInferenceSpanKind.AGENT,
[SESSION_ID]: sessionID,
[AGENT_NAME]: agent,
"agent.type": "primary",
"session.is_subagent": false,
...(promptText
? {
[INPUT_VALUE]: promptText,
[INPUT_MIME_TYPE]: MimeType.TEXT,
[LLM_INPUT_MESSAGES]: JSON.stringify([{ role: "user", content: promptText }]),
}
: {}),
model,
...ctx.commonAttrs,
},
},
ctx.rootContext(),
)
ctx.runSpans.set(runID, runSpan)
setBoundedMap(ctx.runSpanContexts, runID, runSpan.spanContext())
}

/** Increments the session counter, records start time, starts the root session span, and emits a `session.created` log event. */
export function handleSessionCreated(e: EventSessionCreated, ctx: HandlerContext) {
const { id: sessionID, time, parentID } = e.properties.info
Expand All @@ -18,16 +92,7 @@ export function handleSessionCreated(e: EventSessionCreated, ctx: HandlerContext
}
setBoundedMap(ctx.sessionTotals, sessionID, { startMs: createdAt, tokens: 0, cost: 0, messages: 0, agent: "unknown", agentType })

// WARNING: disabling "session" traces while "llm" or "tool" traces remain enabled
// leaves those child spans without a local session parent. If OPENCODE_TRACEPARENT
// is set, they fall back to that remote parent; otherwise they become root spans.
if (isTraceEnabled("session", ctx)) {
const parentSpan = parentID ? ctx.sessionSpans.get(parentID) : undefined
const baseCtx = ctx.rootContext()
const spanCtx = parentSpan
? trace.setSpan(baseCtx, parentSpan)
: baseCtx

if (isTraceEnabled("session", ctx) && parentID) {
const sessionSpan = ctx.tracer.startSpan(
`${ctx.tracePrefix}session`,
{
Expand All @@ -41,9 +106,10 @@ export function handleSessionCreated(e: EventSessionCreated, ctx: HandlerContext
...ctx.commonAttrs,
},
},
spanCtx,
resolveSessionTraceContext(parentID, ctx),
)
setBoundedMap(ctx.sessionSpans, sessionID, sessionSpan)
ctx.sessionSpans.set(sessionID, sessionSpan)
setBoundedMap(ctx.sessionSpanContexts, sessionID, sessionSpan.spanContext())
}

ctx.emitLog({
Expand Down Expand Up @@ -74,7 +140,7 @@ function sweepSession(sessionID: string, ctx: HandlerContext) {
ctx.pendingToolSpans.delete(key)
}
}
ctx.sessionInputs.delete(sessionID)
ctx.pendingRuns.delete(sessionID)
const msgPrefix = `${sessionID}:`
for (const [key, span] of ctx.messageSpans) {
if (key.startsWith(msgPrefix)) {
Expand Down Expand Up @@ -128,6 +194,23 @@ export function handleSessionIdle(e: EventSessionIdle, ctx: HandlerContext) {
sessionSpan.end()
ctx.sessionSpans.delete(sessionID)
}
const runID = ctx.activeRuns.get(sessionID)
if (runID) ctx.activeRuns.delete(sessionID)
const runSpan = runID ? ctx.runSpans.get(runID) : undefined
if (runSpan) {
if (totals) {
runSpan.setAttributes({
[AGENT_NAME]: totals.agent,
"agent.type": totals.agentType,
"session.total_tokens": totals.tokens,
"session.total_cost_usd": totals.cost,
"session.total_messages": totals.messages,
})
}
runSpan.setStatus({ code: SpanStatusCode.OK })
runSpan.end()
ctx.runSpans.delete(runID!)
}

ctx.emitLog({
severityNumber: SeverityNumber.INFO,
Expand Down Expand Up @@ -173,6 +256,16 @@ export function handleSessionError(e: EventSessionError, ctx: HandlerContext) {
sessionSpan.end()
ctx.sessionSpans.delete(rawID)
}
const runID = ctx.activeRuns.get(rawID)
if (runID) ctx.activeRuns.delete(rawID)
const runSpan = runID ? ctx.runSpans.get(runID) : undefined
if (runSpan) {
if (totals) runSpan.setAttributes({ [AGENT_NAME]: totals.agent, "agent.type": totals.agentType })
runSpan.setStatus({ code: SpanStatusCode.ERROR, message: error })
runSpan.setAttribute("error", error)
runSpan.end()
ctx.runSpans.delete(runID!)
}
}

ctx.emitLog({
Expand Down
Loading
Loading