Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 70 additions & 15 deletions src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
reconcileFile,
saveCache,
} from './session-cache.js'
import type { ParsedProviderCall } from './providers/types.js'
import type { ParsedProviderCall, SessionSource } from './providers/types.js'
import type {
AssistantMessageContent,
ClassifiedTurn,
Expand Down Expand Up @@ -1324,6 +1324,10 @@ function buildSessionSummary(
totalCacheWrite += call.usage.cacheCreationInputTokens
apiCalls++

if (process.env['DEBUG_OTEL'] && (call.usage.cacheReadInputTokens > 0 || call.usage.cacheCreationInputTokens > 0)) {
console.warn(`[Aggregate] Model=${call.model}, cache_read=${call.usage.cacheReadInputTokens}, cache_write=${call.usage.cacheCreationInputTokens}`)
}

const modelKey = getShortModelName(call.model)
if (!modelBreakdown[modelKey]) {
modelBreakdown[modelKey] = {
Expand Down Expand Up @@ -1946,7 +1950,7 @@ function warnProviderParseFailure(providerName: string, sourcePath: string, err:

async function parseProviderSources(
providerName: string,
sources: Array<{ path: string; project: string }>,
sources: SessionSource[],
seenKeys: Set<string>,
diskCache: SessionCache,
dateRange?: DateRange,
Expand All @@ -1957,8 +1961,8 @@ async function parseProviderSources(
const section = getOrCreateProviderSection(diskCache, providerName)
const allDiscoveredFiles = new Set<string>()

type SourceInfo = { source: { path: string; project: string }; fp: NonNullable<Awaited<ReturnType<typeof fingerprintFile>>> }
const unchangedSources: Array<{ source: { path: string; project: string }; cached: CachedFile }> = []
type SourceInfo = { source: SessionSource; fp: NonNullable<Awaited<ReturnType<typeof fingerprintFile>>> }
const unchangedSources: Array<{ source: SessionSource; cached: CachedFile }> = []
const changedSources: SourceInfo[] = []

for (const source of sources) {
Expand Down Expand Up @@ -2001,29 +2005,58 @@ async function parseProviderSources(

// Parse changed files, update cache
let didParse = false
// Track which paths have already been cleared this pass so that subsequent
// sources sharing the same path (e.g. multiple OTel conversations from one
// agent-traces.db) can accumulate via the merge logic below rather than
// being wiped on every iteration.
const clearedPaths = new Set<string>()
try {
for (const { source, fp } of changedSources) {
if (dateRange) {
if (fp.mtimeMs < dateRange.start.getTime()) continue
}

// Clear stale entry before parse — if parse fails, file is excluded
delete section.files[source.path]
// Clear stale entry before parse — but only once per path so that
// multiple sources mapping to the same file path can merge their turns.
if (!clearedPaths.has(source.path)) {
delete section.files[source.path]
clearedPaths.add(source.path)
}

const parser = provider.createSessionParser(
{ path: source.path, project: source.project, provider: providerName },
parserDedup,
dateRange,
)
const parser = provider.createSessionParser(source, parserDedup, dateRange)

try {
const providerCalls: ParsedProviderCall[] = []
for await (const call of parser.parse()) {
providerCalls.push(call)
}
if (process.env['DEBUG_OTEL']) {
console.warn(`[Parse] File ${source.path.substring(0, 40)}: Collected ${providerCalls.length} provider calls, ${providerCalls.filter(c => c.cacheReadInputTokens > 0 || c.cacheCreationInputTokens > 0).length} with cache tokens`)
}
if (process.env['DEBUG_OTEL'] && providerCalls.some(c => c.cacheReadInputTokens > 0 || c.cacheCreationInputTokens > 0)) {
console.warn(`[Parse] After conversion: turns to be created...`)
}
const canonicalCalls = await Promise.all(providerCalls.map(canonicalizeProviderCallProject))
const turns = providerCallsToCachedTurns(canonicalCalls)
section.files[source.path] = { fingerprint: fp, mcpInventory: [], turns }
if (process.env['DEBUG_OTEL'] && providerCalls.some(c => c.cacheReadInputTokens > 0 || c.cacheCreationInputTokens > 0)) {
console.warn(`[Parse] After conversion: ${turns.length} turns, calls with cache: ${turns.flatMap(t => t.calls).filter(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0).length}`)
}

// For files with multiple sources (e.g., OTel database with many conversations),
// merge turns instead of overwriting. This allows us to parse the same file
// multiple times for different conversations and aggregate all results.
const existingCacheEntry = section.files[source.path]
if (process.env['DEBUG_OTEL']) {
console.warn(`[Parse] Cache entry exists for path: ${!!existingCacheEntry}, turns to merge: ${turns.length}`)
}
if (existingCacheEntry) {
existingCacheEntry.turns = [...existingCacheEntry.turns, ...turns]
if (process.env['DEBUG_OTEL']) {
console.warn(`[Parse] Merged with existing cache entry for ${source.path.substring(0, 40)}, now has ${existingCacheEntry.turns.length} turns total`)
}
} else {
section.files[source.path] = { fingerprint: fp, mcpInventory: [], turns }
}
didParse = true
;(diskCache as { _dirty?: boolean })._dirty = true
} catch (err) {
Expand Down Expand Up @@ -2064,13 +2097,31 @@ async function parseProviderSources(
// Uses seenKeys (shared across providers) for cross-provider dedup.
const sessionMap = new Map<string, { project: string; projectPath?: string; turns: ClassifiedTurn[] }>()

if (process.env['DEBUG_OTEL']) {
const totalCacheCalls = sources.flatMap(s => section.files[s.path]?.turns ?? []).flatMap(t => t.calls).filter(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0)
console.warn(`[SessionMap] Starting with ${sources.length} sources, ${totalCacheCalls.length} calls with cache tokens`)

const filesInfo = Object.entries(section.files).map(([path, file]) => ({
path: path.substring(0, 50),
turns: file.turns.length,
totalCalls: file.turns.flatMap(t => t.calls).length,
cacheTokenCalls: file.turns.flatMap(t => t.calls).filter(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0).length
}))
console.warn(`[SessionMap] Cache files: ${JSON.stringify(filesInfo)}`)
}

for (const source of sources) {
const cachedFile = section.files[source.path]
if (!cachedFile) continue

for (const turn of cachedFile.turns) {
const hasDup = turn.calls.some(c => seenKeys.has(c.deduplicationKey))
if (hasDup) continue
if (hasDup) {
if (process.env['DEBUG_OTEL'] && turn.calls.some(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0)) {
console.warn(`[SessionMap] Skipping turn with cache tokens due to dedup: ${turn.calls.map(c => c.deduplicationKey).join(',')}`)
}
continue
}

for (const c of turn.calls) seenKeys.add(c.deduplicationKey)

Expand All @@ -2085,6 +2136,10 @@ async function parseProviderSources(
const project = turn.calls[0]?.project ?? source.project
const key = `${providerName}:${turn.sessionId}:${project}`

if (process.env['DEBUG_OTEL'] && turn.calls.some(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0)) {
console.warn(`[SessionMap] Adding turn with cache tokens to sessionMap key=${key}`)
}

const existing = sessionMap.get(key)
if (existing) {
existing.turns.push(classified)
Expand Down Expand Up @@ -2246,10 +2301,10 @@ export async function parseAllSessions(dateRange?: DateRange, providerFilter?: s
const claudeDirs = claudeSources.map(s => ({ path: s.path, name: s.project }))
const claudeProjects = await scanProjectDirs(claudeDirs, seenMsgIds, diskCache, dateRange)

const providerGroups = new Map<string, Array<{ path: string; project: string }>>()
const providerGroups = new Map<string, SessionSource[]>()
for (const source of nonClaudeSources) {
const existing = providerGroups.get(source.provider) ?? []
existing.push({ path: source.path, project: source.project })
existing.push(source)
providerGroups.set(source.provider, existing)
}

Expand Down
Loading