Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/opentelemetry-metric-producer-temporality.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/opentelemetry": patch
---

Honor the registered `MetricReader`'s preferred `AggregationTemporality` in `MetricProducerImpl`. Previously, every Sum-typed data point (Counter, UpDownCounter, Frequency, Summary count/sum) and Gauge/Histogram data point was stamped with a hardcoded `CUMULATIVE`, causing `OTLPMetricExporter({ temporalityPreference: DELTA })` (and any other non-default temporality preference on the reader/exporter) to be silently ignored. The producer now queries `reader.selectAggregationTemporality(instrumentType)` per produced data point, matching the OpenTelemetry spec. When no reader is registered, behavior is unchanged. Fixes #6253.
48 changes: 39 additions & 9 deletions packages/opentelemetry/src/internal/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { HrTime } from "@opentelemetry/api"
import { ValueType } from "@opentelemetry/api"
import { diag, ValueType } from "@opentelemetry/api"
import type * as Resources from "@opentelemetry/resources"
import type {
CollectionResult,
Expand Down Expand Up @@ -32,7 +32,13 @@ type MetricDataWithInstrumentDescriptor = MetricData & {

/** @internal */
export class MetricProducerImpl implements MetricProducer {
constructor(readonly resource: Resources.Resource) {}
readonly resource: Resources.Resource
readonly readers: Array<MetricReader> = []
private warnedConflictingTemporalities = false

constructor(resource: Resources.Resource) {
this.resource = resource
}

startTimes = new Map<string, HrTime>()

Expand All @@ -44,6 +50,26 @@ export class MetricProducerImpl implements MetricProducer {
return hrTime
}

private temporalityFor(descriptorType: InstrumentType): AggregationTemporality {
if (this.readers.length === 0) {
return AggregationTemporality.CUMULATIVE
}
const selected = this.readers[0].selectAggregationTemporality(descriptorType)
if (
this.readers.length > 1 &&
this.readers.some((r) => r.selectAggregationTemporality(descriptorType) !== selected)
) {
if (!this.warnedConflictingTemporalities) {
this.warnedConflictingTemporalities = true
diag.warn(
"@effect/opentelemetry: multiple MetricReaders registered with conflicting " +
"aggregationTemporality for the same instrument type; using the first reader's preference."
)
}
}
return selected
}

collect(_options?: MetricCollectOptions): Promise<CollectionResult> {
const snapshot = Metric.unsafeSnapshot()
const hrTimeNow = currentHrTime()
Expand All @@ -62,6 +88,7 @@ export class MetricProducerImpl implements MetricProducer {
})
const descriptor = descriptorFromKey(metricKey, attributes)
const startTime = this.startTimeFor(descriptor.name, hrTimeNow)
const temporality = this.temporalityFor(descriptor.type)

if (MetricState.isCounterState(metricState)) {
const dataPoint: DataPoint<number> = {
Expand All @@ -77,7 +104,7 @@ export class MetricProducerImpl implements MetricProducer {
dataPointType: DataPointType.SUM,
descriptor,
isMonotonic: descriptor.type === InstrumentType.COUNTER,
aggregationTemporality: AggregationTemporality.CUMULATIVE,
aggregationTemporality: temporality,
dataPoints: [dataPoint]
})
}
Expand All @@ -94,7 +121,7 @@ export class MetricProducerImpl implements MetricProducer {
addMetricData({
dataPointType: DataPointType.GAUGE,
descriptor,
aggregationTemporality: AggregationTemporality.CUMULATIVE,
aggregationTemporality: temporality,
dataPoints: [dataPoint]
})
}
Expand Down Expand Up @@ -133,7 +160,7 @@ export class MetricProducerImpl implements MetricProducer {
addMetricData({
dataPointType: DataPointType.HISTOGRAM,
descriptor,
aggregationTemporality: AggregationTemporality.CUMULATIVE,
aggregationTemporality: temporality,
dataPoints: [dataPoint]
})
}
Expand All @@ -157,7 +184,7 @@ export class MetricProducerImpl implements MetricProducer {
addMetricData({
dataPointType: DataPointType.SUM,
descriptor: descriptorFromKey(metricKey, attributes),
aggregationTemporality: AggregationTemporality.CUMULATIVE,
aggregationTemporality: temporality,
isMonotonic: true,
dataPoints
})
Expand Down Expand Up @@ -205,7 +232,7 @@ export class MetricProducerImpl implements MetricProducer {
addMetricData({
dataPointType: DataPointType.SUM,
descriptor: descriptorFromKey(metricKey, attributes, "quantiles"),
aggregationTemporality: AggregationTemporality.CUMULATIVE,
aggregationTemporality: temporality,
isMonotonic: false,
dataPoints
})
Expand All @@ -217,7 +244,7 @@ export class MetricProducerImpl implements MetricProducer {
type: InstrumentType.COUNTER,
valueType: ValueType.INT
},
aggregationTemporality: AggregationTemporality.CUMULATIVE,
aggregationTemporality: temporality,
isMonotonic: true,
dataPoints: [countDataPoint]
})
Expand All @@ -229,7 +256,7 @@ export class MetricProducerImpl implements MetricProducer {
type: InstrumentType.COUNTER,
valueType: ValueType.DOUBLE
},
aggregationTemporality: AggregationTemporality.CUMULATIVE,
aggregationTemporality: temporality,
isMonotonic: true,
dataPoints: [sumDataPoint]
})
Expand Down Expand Up @@ -307,6 +334,9 @@ export const registerProducer = (
Effect.sync(() => {
const reader = metricReader()
const readers: Array<MetricReader> = Array.isArray(reader) ? reader : [reader] as any
if (self instanceof MetricProducerImpl) {
for (const r of readers) self.readers.push(r)
}
readers.forEach((reader) => reader.setMetricProducer(self))
return readers
}),
Expand Down
58 changes: 58 additions & 0 deletions packages/opentelemetry/test/Metrics.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { assert, describe, it } from "@effect/vitest"
import { ValueType } from "@opentelemetry/api"
import { resourceFromAttributes } from "@opentelemetry/resources"
import {
AggregationTemporality,
InMemoryMetricExporter,
PeriodicExportingMetricReader
} from "@opentelemetry/sdk-metrics"
import * as Effect from "effect/Effect"
import * as Metric from "effect/Metric"
import * as internal from "../src/internal/metrics.js"
Expand Down Expand Up @@ -292,4 +297,57 @@ describe("Metrics", () => {
]
})
}))

it.effect("counter honors reader's preferred temporality (DELTA)", () =>
Effect.gen(function*() {
const producer = new internal.MetricProducerImpl(
resourceFromAttributes({ name: "test", version: "1.0.0" })
)
const reader = new PeriodicExportingMetricReader({
exporter: new InMemoryMetricExporter(AggregationTemporality.DELTA),
exportIntervalMillis: 60_000_000
})
yield* Effect.scoped(internal.registerProducer(producer, () => reader))

const counter = Metric.counter("counter-temp", { incremental: true })
yield* Metric.increment(counter)
yield* Metric.increment(counter)

const results = yield* Effect.promise(() => producer.collect())
const metric = findMetric(JSON.parse(JSON.stringify(results)), "counter-temp")
assert.equal(metric.aggregationTemporality, AggregationTemporality.DELTA)
assert.equal(metric.isMonotonic, true)
}))

it.effect("gauge honors reader's preferred temporality (DELTA)", () =>
Effect.gen(function*() {
const producer = new internal.MetricProducerImpl(
resourceFromAttributes({ name: "test", version: "1.0.0" })
)
const reader = new PeriodicExportingMetricReader({
exporter: new InMemoryMetricExporter(AggregationTemporality.DELTA),
exportIntervalMillis: 60_000_000
})
yield* Effect.scoped(internal.registerProducer(producer, () => reader))

const gauge = Metric.gauge("gauge-temp")
yield* Metric.set(gauge, 42)

const results = yield* Effect.promise(() => producer.collect())
const metric = findMetric(JSON.parse(JSON.stringify(results)), "gauge-temp")
assert.equal(metric.aggregationTemporality, AggregationTemporality.DELTA)
}))

it.effect("falls back to CUMULATIVE when no reader is registered", () =>
Effect.gen(function*() {
const producer = new internal.MetricProducerImpl(
resourceFromAttributes({ name: "test", version: "1.0.0" })
)
const counter = Metric.counter("counter-no-reader", { incremental: true })
yield* Metric.increment(counter)

const results = yield* Effect.promise(() => producer.collect())
const metric = findMetric(JSON.parse(JSON.stringify(results)), "counter-no-reader")
assert.equal(metric.aggregationTemporality, AggregationTemporality.CUMULATIVE)
}))
})
Loading