From 9571e91288f6223e7ef2751c86652c8846e8e2b8 Mon Sep 17 00:00:00 2001 From: mohamedramadan14 Date: Mon, 8 Jun 2026 09:56:35 +0300 Subject: [PATCH] fix(opentelemetry): honor MetricReader temporality in MetricProducer, closes #6253 --- ...entelemetry-metric-producer-temporality.md | 5 ++ .../opentelemetry/src/internal/metrics.ts | 48 ++++++++++++--- packages/opentelemetry/test/Metrics.test.ts | 58 +++++++++++++++++++ 3 files changed, 102 insertions(+), 9 deletions(-) create mode 100644 .changeset/opentelemetry-metric-producer-temporality.md diff --git a/.changeset/opentelemetry-metric-producer-temporality.md b/.changeset/opentelemetry-metric-producer-temporality.md new file mode 100644 index 00000000000..ac87ee755f8 --- /dev/null +++ b/.changeset/opentelemetry-metric-producer-temporality.md @@ -0,0 +1,5 @@ +--- +"@effect/opentelemetry": patch +--- + +Honor the registered `MetricReader`'s preferred `AggregationTemporality` in `MetricProducerImpl`. Previously, every Sum-typed data point (Counter, UpDownCounter, Frequency, Summary count/sum) and Gauge/Histogram data point was stamped with a hardcoded `CUMULATIVE`, causing `OTLPMetricExporter({ temporalityPreference: DELTA })` (and any other non-default temporality preference on the reader/exporter) to be silently ignored. The producer now queries `reader.selectAggregationTemporality(instrumentType)` per produced data point, matching the OpenTelemetry spec. When no reader is registered, behavior is unchanged. Fixes #6253. diff --git a/packages/opentelemetry/src/internal/metrics.ts b/packages/opentelemetry/src/internal/metrics.ts index 125dabb9432..e1e4ab50da1 100644 --- a/packages/opentelemetry/src/internal/metrics.ts +++ b/packages/opentelemetry/src/internal/metrics.ts @@ -1,5 +1,5 @@ import type { HrTime } from "@opentelemetry/api" -import { ValueType } from "@opentelemetry/api" +import { diag, ValueType } from "@opentelemetry/api" import type * as Resources from "@opentelemetry/resources" import type { CollectionResult, @@ -32,7 +32,13 @@ type MetricDataWithInstrumentDescriptor = MetricData & { /** @internal */ export class MetricProducerImpl implements MetricProducer { - constructor(readonly resource: Resources.Resource) {} + readonly resource: Resources.Resource + readonly readers: Array = [] + private warnedConflictingTemporalities = false + + constructor(resource: Resources.Resource) { + this.resource = resource + } startTimes = new Map() @@ -44,6 +50,26 @@ export class MetricProducerImpl implements MetricProducer { return hrTime } + private temporalityFor(descriptorType: InstrumentType): AggregationTemporality { + if (this.readers.length === 0) { + return AggregationTemporality.CUMULATIVE + } + const selected = this.readers[0].selectAggregationTemporality(descriptorType) + if ( + this.readers.length > 1 && + this.readers.some((r) => r.selectAggregationTemporality(descriptorType) !== selected) + ) { + if (!this.warnedConflictingTemporalities) { + this.warnedConflictingTemporalities = true + diag.warn( + "@effect/opentelemetry: multiple MetricReaders registered with conflicting " + + "aggregationTemporality for the same instrument type; using the first reader's preference." + ) + } + } + return selected + } + collect(_options?: MetricCollectOptions): Promise { const snapshot = Metric.unsafeSnapshot() const hrTimeNow = currentHrTime() @@ -62,6 +88,7 @@ export class MetricProducerImpl implements MetricProducer { }) const descriptor = descriptorFromKey(metricKey, attributes) const startTime = this.startTimeFor(descriptor.name, hrTimeNow) + const temporality = this.temporalityFor(descriptor.type) if (MetricState.isCounterState(metricState)) { const dataPoint: DataPoint = { @@ -77,7 +104,7 @@ export class MetricProducerImpl implements MetricProducer { dataPointType: DataPointType.SUM, descriptor, isMonotonic: descriptor.type === InstrumentType.COUNTER, - aggregationTemporality: AggregationTemporality.CUMULATIVE, + aggregationTemporality: temporality, dataPoints: [dataPoint] }) } @@ -94,7 +121,7 @@ export class MetricProducerImpl implements MetricProducer { addMetricData({ dataPointType: DataPointType.GAUGE, descriptor, - aggregationTemporality: AggregationTemporality.CUMULATIVE, + aggregationTemporality: temporality, dataPoints: [dataPoint] }) } @@ -133,7 +160,7 @@ export class MetricProducerImpl implements MetricProducer { addMetricData({ dataPointType: DataPointType.HISTOGRAM, descriptor, - aggregationTemporality: AggregationTemporality.CUMULATIVE, + aggregationTemporality: temporality, dataPoints: [dataPoint] }) } @@ -157,7 +184,7 @@ export class MetricProducerImpl implements MetricProducer { addMetricData({ dataPointType: DataPointType.SUM, descriptor: descriptorFromKey(metricKey, attributes), - aggregationTemporality: AggregationTemporality.CUMULATIVE, + aggregationTemporality: temporality, isMonotonic: true, dataPoints }) @@ -205,7 +232,7 @@ export class MetricProducerImpl implements MetricProducer { addMetricData({ dataPointType: DataPointType.SUM, descriptor: descriptorFromKey(metricKey, attributes, "quantiles"), - aggregationTemporality: AggregationTemporality.CUMULATIVE, + aggregationTemporality: temporality, isMonotonic: false, dataPoints }) @@ -217,7 +244,7 @@ export class MetricProducerImpl implements MetricProducer { type: InstrumentType.COUNTER, valueType: ValueType.INT }, - aggregationTemporality: AggregationTemporality.CUMULATIVE, + aggregationTemporality: temporality, isMonotonic: true, dataPoints: [countDataPoint] }) @@ -229,7 +256,7 @@ export class MetricProducerImpl implements MetricProducer { type: InstrumentType.COUNTER, valueType: ValueType.DOUBLE }, - aggregationTemporality: AggregationTemporality.CUMULATIVE, + aggregationTemporality: temporality, isMonotonic: true, dataPoints: [sumDataPoint] }) @@ -307,6 +334,9 @@ export const registerProducer = ( Effect.sync(() => { const reader = metricReader() const readers: Array = Array.isArray(reader) ? reader : [reader] as any + if (self instanceof MetricProducerImpl) { + for (const r of readers) self.readers.push(r) + } readers.forEach((reader) => reader.setMetricProducer(self)) return readers }), diff --git a/packages/opentelemetry/test/Metrics.test.ts b/packages/opentelemetry/test/Metrics.test.ts index ec3bda89323..fa4904b1180 100644 --- a/packages/opentelemetry/test/Metrics.test.ts +++ b/packages/opentelemetry/test/Metrics.test.ts @@ -1,6 +1,11 @@ import { assert, describe, it } from "@effect/vitest" import { ValueType } from "@opentelemetry/api" import { resourceFromAttributes } from "@opentelemetry/resources" +import { + AggregationTemporality, + InMemoryMetricExporter, + PeriodicExportingMetricReader +} from "@opentelemetry/sdk-metrics" import * as Effect from "effect/Effect" import * as Metric from "effect/Metric" import * as internal from "../src/internal/metrics.js" @@ -292,4 +297,57 @@ describe("Metrics", () => { ] }) })) + + it.effect("counter honors reader's preferred temporality (DELTA)", () => + Effect.gen(function*() { + const producer = new internal.MetricProducerImpl( + resourceFromAttributes({ name: "test", version: "1.0.0" }) + ) + const reader = new PeriodicExportingMetricReader({ + exporter: new InMemoryMetricExporter(AggregationTemporality.DELTA), + exportIntervalMillis: 60_000_000 + }) + yield* Effect.scoped(internal.registerProducer(producer, () => reader)) + + const counter = Metric.counter("counter-temp", { incremental: true }) + yield* Metric.increment(counter) + yield* Metric.increment(counter) + + const results = yield* Effect.promise(() => producer.collect()) + const metric = findMetric(JSON.parse(JSON.stringify(results)), "counter-temp") + assert.equal(metric.aggregationTemporality, AggregationTemporality.DELTA) + assert.equal(metric.isMonotonic, true) + })) + + it.effect("gauge honors reader's preferred temporality (DELTA)", () => + Effect.gen(function*() { + const producer = new internal.MetricProducerImpl( + resourceFromAttributes({ name: "test", version: "1.0.0" }) + ) + const reader = new PeriodicExportingMetricReader({ + exporter: new InMemoryMetricExporter(AggregationTemporality.DELTA), + exportIntervalMillis: 60_000_000 + }) + yield* Effect.scoped(internal.registerProducer(producer, () => reader)) + + const gauge = Metric.gauge("gauge-temp") + yield* Metric.set(gauge, 42) + + const results = yield* Effect.promise(() => producer.collect()) + const metric = findMetric(JSON.parse(JSON.stringify(results)), "gauge-temp") + assert.equal(metric.aggregationTemporality, AggregationTemporality.DELTA) + })) + + it.effect("falls back to CUMULATIVE when no reader is registered", () => + Effect.gen(function*() { + const producer = new internal.MetricProducerImpl( + resourceFromAttributes({ name: "test", version: "1.0.0" }) + ) + const counter = Metric.counter("counter-no-reader", { incremental: true }) + yield* Metric.increment(counter) + + const results = yield* Effect.promise(() => producer.collect()) + const metric = findMetric(JSON.parse(JSON.stringify(results)), "counter-no-reader") + assert.equal(metric.aggregationTemporality, AggregationTemporality.CUMULATIVE) + })) })