diff --git a/src/services/analytics/firstPartyEventLoggingExporter.ts b/src/services/analytics/firstPartyEventLoggingExporter.ts index aefb22c..fe0135e 100644 --- a/src/services/analytics/firstPartyEventLoggingExporter.ts +++ b/src/services/analytics/firstPartyEventLoggingExporter.ts @@ -1,806 +1,35 @@ -import type { HrTime } from '@opentelemetry/api' -import { type ExportResult, ExportResultCode } from '@opentelemetry/core' -import type { - LogRecordExporter, - ReadableLogRecord, -} from '@opentelemetry/sdk-logs' -import axios from 'axios' -import { randomUUID } from 'crypto' -import { appendFile, mkdir, readdir, unlink, writeFile } from 'fs/promises' -import * as path from 'path' -import type { CoreUserData } from 'src/utils/user.js' -import { - getIsNonInteractiveSession, - getSessionId, -} from '../../bootstrap/state.js' -import { ClaudeCodeInternalEvent } from '../../types/generated/events_mono/claude_code/v1/claude_code_internal_event.js' -import { GrowthbookExperimentEvent } from '../../types/generated/events_mono/growthbook/v1/growthbook_experiment_event.js' -import { - getClaudeAIOAuthTokens, - hasProfileScope, - isClaudeAISubscriber, -} from '../../utils/auth.js' -import { checkHasTrustDialogAccepted } from '../../utils/config.js' -import { logForDebugging } from '../../utils/debug.js' -import { getClaudeConfigHomeDir } from '../../utils/envUtils.js' -import { errorMessage, isFsInaccessible, toError } from '../../utils/errors.js' -import { getAuthHeaders } from '../../utils/http.js' -import { readJSONLFile } from '../../utils/json.js' -import { logError } from '../../utils/log.js' -import { sleep } from '../../utils/sleep.js' -import { jsonStringify } from '../../utils/slowOperations.js' -import { getClaudeCodeUserAgent } from '../../utils/userAgent.js' -import { isOAuthTokenExpired } from '../oauth/client.js' -import { stripProtoFields } from './index.js' -import { type EventMetadata, to1PEventFormat } from './metadata.js' - -// Unique ID for this process run - used to isolate failed event files between runs -const BATCH_UUID = randomUUID() - -// File prefix for failed event storage -const FILE_PREFIX = '1p_failed_events.' - -// Storage directory for failed events - evaluated at runtime to respect CLAUDE_CONFIG_DIR in tests -function getStorageDir(): string { - return path.join(getClaudeConfigHomeDir(), 'telemetry') -} - -// API envelope - event_data is the JSON output from proto toJSON() -type FirstPartyEventLoggingEvent = { - event_type: 'ClaudeCodeInternalEvent' | 'GrowthbookExperimentEvent' - event_data: unknown -} - -type FirstPartyEventLoggingPayload = { - events: FirstPartyEventLoggingEvent[] -} - /** - * Exporter for 1st-party event logging to /api/event_logging/batch. + * First-party event logging exporter is intentionally disabled in the OSS build. * - * Export cycles are controlled by OpenTelemetry's BatchLogRecordProcessor, which - * triggers export() when either: - * - Time interval elapses (default: 5 seconds via scheduledDelayMillis) - * - Batch size is reached (default: 200 events via maxExportBatchSize) + * Originally batched and sent custom telemetry events to: + * https://api.anthropic.com/api/event_logging/batch * - * This exporter adds resilience on top: - * - Append-only log for failed events (concurrency-safe) - * - Quadratic backoff retry for failed events, dropped after maxAttempts - * - Immediate retry of queued events when any export succeeds (endpoint is healthy) - * - Chunking large event sets into smaller batches - * - Auth fallback: retries without auth on 401 errors + * Failed events were persisted locally to ~/.config/claude/telemetry/ for + * retry across process restarts. + * + * This stub preserves the exported class so any dormant import sites compile, + * but all methods are no-ops and no network traffic is generated. */ -export class FirstPartyEventLoggingExporter implements LogRecordExporter { - private readonly endpoint: string - private readonly timeout: number - private readonly maxBatchSize: number - private readonly skipAuth: boolean - private readonly batchDelayMs: number - private readonly baseBackoffDelayMs: number - private readonly maxBackoffDelayMs: number - private readonly maxAttempts: number - private readonly isKilled: () => boolean - private pendingExports: Promise[] = [] - private isShutdown = false - private readonly schedule: ( - fn: () => Promise, - delayMs: number, - ) => () => void - private cancelBackoff: (() => void) | null = null - private attempts = 0 - private isRetrying = false - private lastExportErrorContext: string | undefined - - constructor( - options: { - timeout?: number - maxBatchSize?: number - skipAuth?: boolean - batchDelayMs?: number - baseBackoffDelayMs?: number - maxBackoffDelayMs?: number - maxAttempts?: number - path?: string - baseUrl?: string - // Injected killswitch probe. Checked per-POST so that disabling the - // firstParty sink also stops backoff retries (not just new emits). - // Passed in rather than imported to avoid a cycle with firstPartyEventLogger.ts. - isKilled?: () => boolean - schedule?: (fn: () => Promise, delayMs: number) => () => void - } = {}, - ) { - // Default: prod, except when ANTHROPIC_BASE_URL is explicitly staging. - // Overridable via tengu_1p_event_batch_config.baseUrl. - const baseUrl = - options.baseUrl || - (process.env.ANTHROPIC_BASE_URL === 'https://api-staging.anthropic.com' - ? 'https://api-staging.anthropic.com' - : 'https://api.anthropic.com') - - this.endpoint = `${baseUrl}${options.path || '/api/event_logging/batch'}` - this.timeout = options.timeout || 10000 - this.maxBatchSize = options.maxBatchSize || 200 - this.skipAuth = options.skipAuth ?? false - this.batchDelayMs = options.batchDelayMs || 100 - this.baseBackoffDelayMs = options.baseBackoffDelayMs || 500 - this.maxBackoffDelayMs = options.maxBackoffDelayMs || 30000 - this.maxAttempts = options.maxAttempts ?? 8 - this.isKilled = options.isKilled ?? (() => false) - this.schedule = - options.schedule ?? - ((fn, ms) => { - const t = setTimeout(fn, ms) - return () => clearTimeout(t) - }) - - // Retry any failed events from previous runs of this session (in background) - void this.retryPreviousBatches() - } - - // Expose for testing - async getQueuedEventCount(): Promise { - return (await this.loadEventsFromCurrentBatch()).length - } - - // --- Storage helpers --- - - private getCurrentBatchFilePath(): string { - return path.join( - getStorageDir(), - `${FILE_PREFIX}${getSessionId()}.${BATCH_UUID}.json`, - ) - } - - private async loadEventsFromFile( - filePath: string, - ): Promise { - try { - return await readJSONLFile(filePath) - } catch { - return [] - } - } - - private async loadEventsFromCurrentBatch(): Promise< - FirstPartyEventLoggingEvent[] - > { - return this.loadEventsFromFile(this.getCurrentBatchFilePath()) - } - - private async saveEventsToFile( - filePath: string, - events: FirstPartyEventLoggingEvent[], - ): Promise { - try { - if (events.length === 0) { - try { - await unlink(filePath) - } catch { - // File doesn't exist, nothing to delete - } - } else { - // Ensure storage directory exists - await mkdir(getStorageDir(), { recursive: true }) - // Write as JSON lines (one event per line) - const content = events.map(e => jsonStringify(e)).join('\n') + '\n' - await writeFile(filePath, content, 'utf8') - } - } catch (error) { - logError(error) - } - } - - private async appendEventsToFile( - filePath: string, - events: FirstPartyEventLoggingEvent[], - ): Promise { - if (events.length === 0) return - try { - // Ensure storage directory exists - await mkdir(getStorageDir(), { recursive: true }) - // Append as JSON lines (one event per line) - atomic on most filesystems - const content = events.map(e => jsonStringify(e)).join('\n') + '\n' - await appendFile(filePath, content, 'utf8') - } catch (error) { - logError(error) - } - } - - private async deleteFile(filePath: string): Promise { - try { - await unlink(filePath) - } catch { - // File doesn't exist or can't be deleted, ignore - } - } - - // --- Previous batch retry (startup) --- - - private async retryPreviousBatches(): Promise { - try { - const prefix = `${FILE_PREFIX}${getSessionId()}.` - let files: string[] - try { - files = (await readdir(getStorageDir())) - .filter((f: string) => f.startsWith(prefix) && f.endsWith('.json')) - .filter((f: string) => !f.includes(BATCH_UUID)) // Exclude current batch - } catch (e) { - if (isFsInaccessible(e)) return - throw e - } - - for (const file of files) { - const filePath = path.join(getStorageDir(), file) - void this.retryFileInBackground(filePath) - } - } catch (error) { - logError(error) - } - } - - private async retryFileInBackground(filePath: string): Promise { - if (this.attempts >= this.maxAttempts) { - await this.deleteFile(filePath) - return - } - - const events = await this.loadEventsFromFile(filePath) - if (events.length === 0) { - await this.deleteFile(filePath) - return - } - - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: retrying ${events.length} events from previous batch`, - ) - } - - const failedEvents = await this.sendEventsInBatches(events) - if (failedEvents.length === 0) { - await this.deleteFile(filePath) - if (process.env.USER_TYPE === 'ant') { - logForDebugging('1P event logging: previous batch retry succeeded') - } - } else { - // Save only the failed events back (not all original events) - await this.saveEventsToFile(filePath, failedEvents) - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: previous batch retry failed, ${failedEvents.length} events remain`, - ) - } - } - } - - async export( - logs: ReadableLogRecord[], - resultCallback: (result: ExportResult) => void, - ): Promise { - if (this.isShutdown) { - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - '1P event logging export failed: Exporter has been shutdown', - ) - } - resultCallback({ - code: ExportResultCode.FAILED, - error: new Error('Exporter has been shutdown'), - }) - return - } - - const exportPromise = this.doExport(logs, resultCallback) - this.pendingExports.push(exportPromise) - - // Clean up completed exports - void exportPromise.finally(() => { - const index = this.pendingExports.indexOf(exportPromise) - if (index > -1) { - void this.pendingExports.splice(index, 1) - } - }) - } +import { ExportResultCode, type ExportResult } from '@opentelemetry/core' +import type { + LogRecordExporter, + ReadableLogRecord, +} from '@opentelemetry/sdk-logs' - private async doExport( - logs: ReadableLogRecord[], +export class FirstPartyEventLoggingExporter implements LogRecordExporter { + export( + _logs: ReadableLogRecord[], resultCallback: (result: ExportResult) => void, - ): Promise { - try { - // Filter for event logs only (by scope name) - const eventLogs = logs.filter( - log => - log.instrumentationScope?.name === 'com.anthropic.claude_code.events', - ) - - if (eventLogs.length === 0) { - resultCallback({ code: ExportResultCode.SUCCESS }) - return - } - - // Transform new logs (failed events are retried independently via backoff) - const events = this.transformLogsToEvents(eventLogs).events - - if (events.length === 0) { - resultCallback({ code: ExportResultCode.SUCCESS }) - return - } - - if (this.attempts >= this.maxAttempts) { - resultCallback({ - code: ExportResultCode.FAILED, - error: new Error( - `Dropped ${events.length} events: max attempts (${this.maxAttempts}) reached`, - ), - }) - return - } - - // Send events - const failedEvents = await this.sendEventsInBatches(events) - this.attempts++ - - if (failedEvents.length > 0) { - await this.queueFailedEvents(failedEvents) - this.scheduleBackoffRetry() - const context = this.lastExportErrorContext - ? ` (${this.lastExportErrorContext})` - : '' - resultCallback({ - code: ExportResultCode.FAILED, - error: new Error( - `Failed to export ${failedEvents.length} events${context}`, - ), - }) - return - } - - // Success - reset backoff and immediately retry any queued events - this.resetBackoff() - if ((await this.getQueuedEventCount()) > 0 && !this.isRetrying) { - void this.retryFailedEvents() - } - resultCallback({ code: ExportResultCode.SUCCESS }) - } catch (error) { - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging export failed: ${errorMessage(error)}`, - ) - } - logError(error) - resultCallback({ - code: ExportResultCode.FAILED, - error: toError(error), - }) - } - } - - private async sendEventsInBatches( - events: FirstPartyEventLoggingEvent[], - ): Promise { - // Chunk events into batches - const batches: FirstPartyEventLoggingEvent[][] = [] - for (let i = 0; i < events.length; i += this.maxBatchSize) { - batches.push(events.slice(i, i + this.maxBatchSize)) - } - - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: exporting ${events.length} events in ${batches.length} batch(es)`, - ) - } - - // Send each batch with delay between them. On first failure, assume the - // endpoint is down and short-circuit: queue the failed batch plus all - // remaining unsent batches without POSTing them. The backoff retry will - // probe again with a single batch next tick. - const failedBatchEvents: FirstPartyEventLoggingEvent[] = [] - let lastErrorContext: string | undefined - for (let i = 0; i < batches.length; i++) { - const batch = batches[i]! - try { - await this.sendBatchWithRetry({ events: batch }) - } catch (error) { - lastErrorContext = getAxiosErrorContext(error) - for (let j = i; j < batches.length; j++) { - failedBatchEvents.push(...batches[j]!) - } - if (process.env.USER_TYPE === 'ant') { - const skipped = batches.length - 1 - i - logForDebugging( - `1P event logging: batch ${i + 1}/${batches.length} failed (${lastErrorContext}); short-circuiting ${skipped} remaining batch(es)`, - ) - } - break - } - - if (i < batches.length - 1 && this.batchDelayMs > 0) { - await sleep(this.batchDelayMs) - } - } - - if (failedBatchEvents.length > 0 && lastErrorContext) { - this.lastExportErrorContext = lastErrorContext - } - - return failedBatchEvents - } - - private async queueFailedEvents( - events: FirstPartyEventLoggingEvent[], - ): Promise { - const filePath = this.getCurrentBatchFilePath() - - // Append-only: just add new events to file (atomic on most filesystems) - await this.appendEventsToFile(filePath, events) - - const context = this.lastExportErrorContext - ? ` (${this.lastExportErrorContext})` - : '' - const message = `1P event logging: ${events.length} events failed to export${context}` - logError(new Error(message)) - } - - private scheduleBackoffRetry(): void { - // Don't schedule if already retrying or shutdown - if (this.cancelBackoff || this.isRetrying || this.isShutdown) { - return - } - - // Quadratic backoff (matching Statsig SDK): base * attempts² - const delay = Math.min( - this.baseBackoffDelayMs * this.attempts * this.attempts, - this.maxBackoffDelayMs, - ) - - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: scheduling backoff retry in ${delay}ms (attempt ${this.attempts})`, - ) - } - - this.cancelBackoff = this.schedule(async () => { - this.cancelBackoff = null - await this.retryFailedEvents() - }, delay) - } - - private async retryFailedEvents(): Promise { - const filePath = this.getCurrentBatchFilePath() - - // Keep retrying while there are events and endpoint is healthy - while (!this.isShutdown) { - const events = await this.loadEventsFromFile(filePath) - if (events.length === 0) break - - if (this.attempts >= this.maxAttempts) { - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: max attempts (${this.maxAttempts}) reached, dropping ${events.length} events`, - ) - } - await this.deleteFile(filePath) - this.resetBackoff() - return - } - - this.isRetrying = true - - // Clear file before retry (we have events in memory now) - await this.deleteFile(filePath) - - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: retrying ${events.length} failed events (attempt ${this.attempts + 1})`, - ) - } - - const failedEvents = await this.sendEventsInBatches(events) - this.attempts++ - - this.isRetrying = false - - if (failedEvents.length > 0) { - // Write failures back to disk - await this.saveEventsToFile(filePath, failedEvents) - this.scheduleBackoffRetry() - return // Failed - wait for backoff - } - - // Success - reset backoff and continue loop to drain any newly queued events - this.resetBackoff() - if (process.env.USER_TYPE === 'ant') { - logForDebugging('1P event logging: backoff retry succeeded') - } - } - } - - private resetBackoff(): void { - this.attempts = 0 - if (this.cancelBackoff) { - this.cancelBackoff() - this.cancelBackoff = null - } - } - - private async sendBatchWithRetry( - payload: FirstPartyEventLoggingPayload, - ): Promise { - if (this.isKilled()) { - // Throw so the caller short-circuits remaining batches and queues - // everything to disk. Zero network traffic while killed; the backoff - // timer keeps ticking and will resume POSTs as soon as the GrowthBook - // cache picks up the cleared flag. - throw new Error('firstParty sink killswitch active') - } - - const baseHeaders: Record = { - 'Content-Type': 'application/json', - 'User-Agent': getClaudeCodeUserAgent(), - 'x-service-name': 'claude-code', - } - - // Skip auth if trust hasn't been established yet - // This prevents executing apiKeyHelper commands before the trust dialog - // Non-interactive sessions implicitly have workspace trust - const hasTrust = - checkHasTrustDialogAccepted() || getIsNonInteractiveSession() - if (process.env.USER_TYPE === 'ant' && !hasTrust) { - logForDebugging('1P event logging: Trust not accepted') - } - - // Skip auth when the OAuth token is expired or lacks user:profile - // scope (service key sessions). Falls through to unauthenticated send. - let shouldSkipAuth = this.skipAuth || !hasTrust - if (!shouldSkipAuth && isClaudeAISubscriber()) { - const tokens = getClaudeAIOAuthTokens() - if (!hasProfileScope()) { - shouldSkipAuth = true - } else if (tokens && isOAuthTokenExpired(tokens.expiresAt)) { - shouldSkipAuth = true - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - '1P event logging: OAuth token expired, skipping auth to avoid 401', - ) - } - } - } - - // Try with auth headers first (unless trust not established or token is known to be expired) - const authResult = shouldSkipAuth - ? { headers: {}, error: 'trust not established or Oauth token expired' } - : getAuthHeaders() - const useAuth = !authResult.error - - if (!useAuth && process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: auth not available, sending without auth`, - ) - } - - const headers = useAuth - ? { ...baseHeaders, ...authResult.headers } - : baseHeaders - - try { - const response = await axios.post(this.endpoint, payload, { - timeout: this.timeout, - headers, - }) - this.logSuccess(payload.events.length, useAuth, response.data) - return - } catch (error) { - // Handle 401 by retrying without auth - if ( - useAuth && - axios.isAxiosError(error) && - error.response?.status === 401 - ) { - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - '1P event logging: 401 auth error, retrying without auth', - ) - } - const response = await axios.post(this.endpoint, payload, { - timeout: this.timeout, - headers: baseHeaders, - }) - this.logSuccess(payload.events.length, false, response.data) - return - } - - throw error - } - } - - private logSuccess( - eventCount: number, - withAuth: boolean, - responseData: unknown, ): void { - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: ${eventCount} events exported successfully${withAuth ? ' (with auth)' : ' (without auth)'}`, - ) - logForDebugging(`API Response: ${jsonStringify(responseData, null, 2)}`) - } - } - - private hrTimeToDate(hrTime: HrTime): Date { - const [seconds, nanoseconds] = hrTime - return new Date(seconds * 1000 + nanoseconds / 1000000) - } - - private transformLogsToEvents( - logs: ReadableLogRecord[], - ): FirstPartyEventLoggingPayload { - const events: FirstPartyEventLoggingEvent[] = [] - - for (const log of logs) { - const attributes = log.attributes || {} - - // Check if this is a GrowthBook experiment event - if (attributes.event_type === 'GrowthbookExperimentEvent') { - const timestamp = this.hrTimeToDate(log.hrTime) - const account_uuid = attributes.account_uuid as string | undefined - const organization_uuid = attributes.organization_uuid as - | string - | undefined - events.push({ - event_type: 'GrowthbookExperimentEvent', - event_data: GrowthbookExperimentEvent.toJSON({ - event_id: attributes.event_id as string, - timestamp, - experiment_id: attributes.experiment_id as string, - variation_id: attributes.variation_id as number, - environment: attributes.environment as string, - user_attributes: attributes.user_attributes as string, - experiment_metadata: attributes.experiment_metadata as string, - device_id: attributes.device_id as string, - session_id: attributes.session_id as string, - auth: - account_uuid || organization_uuid - ? { account_uuid, organization_uuid } - : undefined, - }), - }) - continue - } - - // Extract event name - const eventName = - (attributes.event_name as string) || (log.body as string) || 'unknown' - - // Extract metadata objects directly (no JSON parsing needed) - const coreMetadata = attributes.core_metadata as EventMetadata | undefined - const userMetadata = attributes.user_metadata as CoreUserData - const eventMetadata = (attributes.event_metadata || {}) as Record< - string, - unknown - > - - if (!coreMetadata) { - // Emit partial event if core metadata is missing - if (process.env.USER_TYPE === 'ant') { - logForDebugging( - `1P event logging: core_metadata missing for event ${eventName}`, - ) - } - events.push({ - event_type: 'ClaudeCodeInternalEvent', - event_data: ClaudeCodeInternalEvent.toJSON({ - event_id: attributes.event_id as string | undefined, - event_name: eventName, - client_timestamp: this.hrTimeToDate(log.hrTime), - session_id: getSessionId(), - additional_metadata: Buffer.from( - jsonStringify({ - transform_error: 'core_metadata attribute is missing', - }), - ).toString('base64'), - }), - }) - continue - } - - // Transform to 1P format - const formatted = to1PEventFormat( - coreMetadata, - userMetadata, - eventMetadata, - ) - - // _PROTO_* keys are PII-tagged values meant only for privileged BQ - // columns. Hoist known keys to proto fields, then defensively strip any - // remaining _PROTO_* so an unrecognized future key can't silently land - // in the general-access additional_metadata blob. sink.ts applies the - // same strip before Datadog; this closes the 1P side. - const { - _PROTO_skill_name, - _PROTO_plugin_name, - _PROTO_marketplace_name, - ...rest - } = formatted.additional - const additionalMetadata = stripProtoFields(rest) - - events.push({ - event_type: 'ClaudeCodeInternalEvent', - event_data: ClaudeCodeInternalEvent.toJSON({ - event_id: attributes.event_id as string | undefined, - event_name: eventName, - client_timestamp: this.hrTimeToDate(log.hrTime), - device_id: attributes.user_id as string | undefined, - email: userMetadata?.email, - auth: formatted.auth, - ...formatted.core, - env: formatted.env, - process: formatted.process, - skill_name: - typeof _PROTO_skill_name === 'string' - ? _PROTO_skill_name - : undefined, - plugin_name: - typeof _PROTO_plugin_name === 'string' - ? _PROTO_plugin_name - : undefined, - marketplace_name: - typeof _PROTO_marketplace_name === 'string' - ? _PROTO_marketplace_name - : undefined, - additional_metadata: - Object.keys(additionalMetadata).length > 0 - ? Buffer.from(jsonStringify(additionalMetadata)).toString( - 'base64', - ) - : undefined, - }), - }) - } - - return { events } - } - - async shutdown(): Promise { - this.isShutdown = true - this.resetBackoff() - await this.forceFlush() - if (process.env.USER_TYPE === 'ant') { - logForDebugging('1P event logging exporter shutdown complete') - } - } - - async forceFlush(): Promise { - await Promise.all(this.pendingExports) - if (process.env.USER_TYPE === 'ant') { - logForDebugging('1P event logging exporter flush complete') - } - } -} - -function getAxiosErrorContext(error: unknown): string { - if (!axios.isAxiosError(error)) { - return errorMessage(error) - } - - const parts: string[] = [] - - const requestId = error.response?.headers?.['request-id'] - if (requestId) { - parts.push(`request-id=${requestId}`) + resultCallback({ code: ExportResultCode.SUCCESS }) } - if (error.response?.status) { - parts.push(`status=${error.response.status}`) - } + async shutdown(): Promise {} - if (error.code) { - parts.push(`code=${error.code}`) - } + async forceFlush(): Promise {} - if (error.message) { - parts.push(error.message) + async getQueuedEventCount(): Promise { + return 0 } - - return parts.join(', ') } diff --git a/src/services/api/metricsOptOut.ts b/src/services/api/metricsOptOut.ts index 8ef884a..315fb4d 100644 --- a/src/services/api/metricsOptOut.ts +++ b/src/services/api/metricsOptOut.ts @@ -1,159 +1,21 @@ -import axios from 'axios' -import { hasProfileScope, isClaudeAISubscriber } from '../../utils/auth.js' -import { getGlobalConfig, saveGlobalConfig } from '../../utils/config.js' -import { logForDebugging } from '../../utils/debug.js' -import { errorMessage } from '../../utils/errors.js' -import { getAuthHeaders, withOAuth401Retry } from '../../utils/http.js' -import { logError } from '../../utils/log.js' -import { memoizeWithTTLAsync } from '../../utils/memoize.js' -import { isEssentialTrafficOnly } from '../../utils/privacyLevel.js' -import { getClaudeCodeUserAgent } from '../../utils/userAgent.js' - -type MetricsEnabledResponse = { - metrics_logging_enabled: boolean -} +/** + * Metrics opt-out check is intentionally disabled in the OSS build. + * + * Originally called https://api.anthropic.com/api/claude_code/organizations/metrics_enabled + * to determine whether an organization had opted out of metrics collection. + * + * This stub always returns enabled:false so that the BigQuery exporter (also + * stubbed) would never attempt to send data even if somehow reactivated. + */ type MetricsStatus = { enabled: boolean hasError: boolean } -// In-memory TTL — dedupes calls within a single process -const CACHE_TTL_MS = 60 * 60 * 1000 - -// Disk TTL — org settings rarely change. When disk cache is fresher than this, -// we skip the network entirely (no background refresh). This is what collapses -// N `claude -p` invocations into ~1 API call/day. -const DISK_CACHE_TTL_MS = 24 * 60 * 60 * 1000 - -/** - * Internal function to call the API and check if metrics are enabled - * This is wrapped by memoizeWithTTLAsync to add caching behavior - */ -async function _fetchMetricsEnabled(): Promise { - const authResult = getAuthHeaders() - if (authResult.error) { - throw new Error(`Auth error: ${authResult.error}`) - } - - const headers = { - 'Content-Type': 'application/json', - 'User-Agent': getClaudeCodeUserAgent(), - ...authResult.headers, - } - - const endpoint = `https://api.anthropic.com/api/claude_code/organizations/metrics_enabled` - const response = await axios.get(endpoint, { - headers, - timeout: 5000, - }) - return response.data -} - -async function _checkMetricsEnabledAPI(): Promise { - // Incident kill switch: skip the network call when nonessential traffic is disabled. - // Returning enabled:false sheds load at the consumer (bigqueryExporter skips - // export). Matches the non-subscriber early-return shape below. - if (isEssentialTrafficOnly()) { - return { enabled: false, hasError: false } - } - - try { - const data = await withOAuth401Retry(_fetchMetricsEnabled, { - also403Revoked: true, - }) - - logForDebugging( - `Metrics opt-out API response: enabled=${data.metrics_logging_enabled}`, - ) - - return { - enabled: data.metrics_logging_enabled, - hasError: false, - } - } catch (error) { - logForDebugging( - `Failed to check metrics opt-out status: ${errorMessage(error)}`, - ) - logError(error) - return { enabled: false, hasError: true } - } -} - -// Create memoized version with custom error handling -const memoizedCheckMetrics = memoizeWithTTLAsync( - _checkMetricsEnabledAPI, - CACHE_TTL_MS, -) - -/** - * Fetch (in-memory memoized) and persist to disk on change. - * Errors are not persisted — a transient failure should not overwrite a - * known-good disk value. - */ -async function refreshMetricsStatus(): Promise { - const result = await memoizedCheckMetrics() - if (result.hasError) { - return result - } - - const cached = getGlobalConfig().metricsStatusCache - const unchanged = cached !== undefined && cached.enabled === result.enabled - // Skip write when unchanged AND timestamp still fresh — avoids config churn - // when concurrent callers race past a stale disk entry and all try to write. - if (unchanged && Date.now() - cached.timestamp < DISK_CACHE_TTL_MS) { - return result - } - - saveGlobalConfig(current => ({ - ...current, - metricsStatusCache: { - enabled: result.enabled, - timestamp: Date.now(), - }, - })) - return result -} - -/** - * Check if metrics are enabled for the current organization. - * - * Two-tier cache: - * - Disk (24h TTL): survives process restarts. Fresh disk cache → zero network. - * - In-memory (1h TTL): dedupes the background refresh within a process. - * - * The caller (bigqueryExporter) tolerates stale reads — a missed export or - * an extra one during the 24h window is acceptable. - */ export async function checkMetricsEnabled(): Promise { - // Service key OAuth sessions lack user:profile scope → would 403. - // API key users (non-subscribers) fall through and use x-api-key auth. - // This check runs before the disk read so we never persist auth-state-derived - // answers — only real API responses go to disk. Otherwise a service-key - // session would poison the cache for a later full-OAuth session. - if (isClaudeAISubscriber() && !hasProfileScope()) { - return { enabled: false, hasError: false } - } - - const cached = getGlobalConfig().metricsStatusCache - if (cached) { - if (Date.now() - cached.timestamp > DISK_CACHE_TTL_MS) { - // saveGlobalConfig's fallback path (config.ts:731) can throw if both - // locked and fallback writes fail — catch here so fire-and-forget - // doesn't become an unhandled rejection. - void refreshMetricsStatus().catch(logError) - } - return { - enabled: cached.enabled, - hasError: false, - } - } - - // First-ever run on this machine: block on the network to populate disk. - return refreshMetricsStatus() + return { enabled: false, hasError: false } } // Export for testing purposes only -export const _clearMetricsEnabledCacheForTesting = (): void => { - memoizedCheckMetrics.cache.clear() -} +export const _clearMetricsEnabledCacheForTesting = (): void => {} diff --git a/src/utils/telemetry/bigqueryExporter.ts b/src/utils/telemetry/bigqueryExporter.ts index 2f935c4..562abc4 100644 --- a/src/utils/telemetry/bigqueryExporter.ts +++ b/src/utils/telemetry/bigqueryExporter.ts @@ -1,252 +1,33 @@ -import type { Attributes, HrTime } from '@opentelemetry/api' -import { type ExportResult, ExportResultCode } from '@opentelemetry/core' +/** + * BigQuery metrics exporter is intentionally disabled in the OSS build. + * + * Originally sent OpenTelemetry metrics to: + * https://api.anthropic.com/api/claude_code/metrics + * + * This stub preserves the exported class so any dormant import sites compile, + * but all methods are no-ops and no network traffic is generated. + */ + import { AggregationTemporality, - type MetricData, - type DataPoint as OTelDataPoint, type PushMetricExporter, type ResourceMetrics, } from '@opentelemetry/sdk-metrics' -import axios from 'axios' -import { checkMetricsEnabled } from 'src/services/api/metricsOptOut.js' -import { getIsNonInteractiveSession } from '../../bootstrap/state.js' -import { getSubscriptionType, isClaudeAISubscriber } from '../auth.js' -import { checkHasTrustDialogAccepted } from '../config.js' -import { logForDebugging } from '../debug.js' -import { errorMessage, toError } from '../errors.js' -import { getAuthHeaders } from '../http.js' -import { logError } from '../log.js' -import { jsonStringify } from '../slowOperations.js' -import { getClaudeCodeUserAgent } from '../userAgent.js' - -type DataPoint = { - attributes: Record - value: number - timestamp: string -} - -type Metric = { - name: string - description?: string - unit?: string - data_points: DataPoint[] -} - -type InternalMetricsPayload = { - resource_attributes: Record - metrics: Metric[] -} +import { ExportResultCode, type ExportResult } from '@opentelemetry/core' export class BigQueryMetricsExporter implements PushMetricExporter { - private readonly endpoint: string - private readonly timeout: number - private pendingExports: Promise[] = [] - private isShutdown = false - - constructor(options: { timeout?: number } = {}) { - const defaultEndpoint = 'https://api.anthropic.com/api/claude_code/metrics' - - if ( - process.env.USER_TYPE === 'ant' && - process.env.ANT_CLAUDE_CODE_METRICS_ENDPOINT - ) { - this.endpoint = - process.env.ANT_CLAUDE_CODE_METRICS_ENDPOINT + - '/api/claude_code/metrics' - } else { - this.endpoint = defaultEndpoint - } - - this.timeout = options.timeout || 5000 - } - async export( - metrics: ResourceMetrics, - resultCallback: (result: ExportResult) => void, - ): Promise { - if (this.isShutdown) { - resultCallback({ - code: ExportResultCode.FAILED, - error: new Error('Exporter has been shutdown'), - }) - return - } - - const exportPromise = this.doExport(metrics, resultCallback) - this.pendingExports.push(exportPromise) - - // Clean up completed exports - void exportPromise.finally(() => { - const index = this.pendingExports.indexOf(exportPromise) - if (index > -1) { - void this.pendingExports.splice(index, 1) - } - }) - } - - private async doExport( - metrics: ResourceMetrics, + _metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void, ): Promise { - try { - // Skip if trust not established in interactive mode - // This prevents triggering apiKeyHelper before trust dialog - const hasTrust = - checkHasTrustDialogAccepted() || getIsNonInteractiveSession() - if (!hasTrust) { - logForDebugging( - 'BigQuery metrics export: trust not established, skipping', - ) - resultCallback({ code: ExportResultCode.SUCCESS }) - return - } - - // Check organization-level metrics opt-out - const metricsStatus = await checkMetricsEnabled() - if (!metricsStatus.enabled) { - logForDebugging('Metrics export disabled by organization setting') - resultCallback({ code: ExportResultCode.SUCCESS }) - return - } - - const payload = this.transformMetricsForInternal(metrics) - - const authResult = getAuthHeaders() - if (authResult.error) { - logForDebugging(`Metrics export failed: ${authResult.error}`) - resultCallback({ - code: ExportResultCode.FAILED, - error: new Error(authResult.error), - }) - return - } - - const headers: Record = { - 'Content-Type': 'application/json', - 'User-Agent': getClaudeCodeUserAgent(), - ...authResult.headers, - } - - const response = await axios.post(this.endpoint, payload, { - timeout: this.timeout, - headers, - }) - - logForDebugging('BigQuery metrics exported successfully') - logForDebugging( - `BigQuery API Response: ${jsonStringify(response.data, null, 2)}`, - ) - resultCallback({ code: ExportResultCode.SUCCESS }) - } catch (error) { - logForDebugging(`BigQuery metrics export failed: ${errorMessage(error)}`) - logError(error) - resultCallback({ - code: ExportResultCode.FAILED, - error: toError(error), - }) - } - } - - private transformMetricsForInternal( - metrics: ResourceMetrics, - ): InternalMetricsPayload { - const attrs = metrics.resource.attributes - - const resourceAttributes: Record = { - 'service.name': (attrs['service.name'] as string) || 'claude-code', - 'service.version': (attrs['service.version'] as string) || 'unknown', - 'os.type': (attrs['os.type'] as string) || 'unknown', - 'os.version': (attrs['os.version'] as string) || 'unknown', - 'host.arch': (attrs['host.arch'] as string) || 'unknown', - 'aggregation.temporality': - this.selectAggregationTemporality() === AggregationTemporality.DELTA - ? 'delta' - : 'cumulative', - } - - // Only add wsl.version if it exists (omit instead of default) - if (attrs['wsl.version']) { - resourceAttributes['wsl.version'] = attrs['wsl.version'] as string - } - - // Add customer type and subscription type - if (isClaudeAISubscriber()) { - resourceAttributes['user.customer_type'] = 'claude_ai' - const subscriptionType = getSubscriptionType() - if (subscriptionType) { - resourceAttributes['user.subscription_type'] = subscriptionType - } - } else { - resourceAttributes['user.customer_type'] = 'api' - } - - const transformed = { - resource_attributes: resourceAttributes, - metrics: metrics.scopeMetrics.flatMap(scopeMetric => - scopeMetric.metrics.map(metric => ({ - name: metric.descriptor.name, - description: metric.descriptor.description, - unit: metric.descriptor.unit, - data_points: this.extractDataPoints(metric), - })), - ), - } - - return transformed - } - - private extractDataPoints(metric: MetricData): DataPoint[] { - const dataPoints = metric.dataPoints || [] - - return dataPoints - .filter( - (point): point is OTelDataPoint => - typeof point.value === 'number', - ) - .map(point => ({ - attributes: this.convertAttributes(point.attributes), - value: point.value, - timestamp: this.hrTimeToISOString( - point.endTime || point.startTime || [Date.now() / 1000, 0], - ), - })) - } - - async shutdown(): Promise { - this.isShutdown = true - await this.forceFlush() - logForDebugging('BigQuery metrics exporter shutdown complete') + resultCallback({ code: ExportResultCode.SUCCESS }) } - async forceFlush(): Promise { - await Promise.all(this.pendingExports) - logForDebugging('BigQuery metrics exporter flush complete') - } + async shutdown(): Promise {} - private convertAttributes( - attributes: Attributes | undefined, - ): Record { - const result: Record = {} - if (attributes) { - for (const [key, value] of Object.entries(attributes)) { - if (value !== undefined && value !== null) { - result[key] = String(value) - } - } - } - return result - } - - private hrTimeToISOString(hrTime: HrTime): string { - const [seconds, nanoseconds] = hrTime - const date = new Date(seconds * 1000 + nanoseconds / 1000000) - return date.toISOString() - } + async forceFlush(): Promise {} selectAggregationTemporality(): AggregationTemporality { - // DO NOT CHANGE THIS TO CUMULATIVE - // It would mess up the aggregation of metrics - // for CC Productivity metrics dashboard return AggregationTemporality.DELTA } }