diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c254b498788..baf376be06f 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -192,7 +192,6 @@ crc-catalog,https://github.com/akhilles/crc-catalog,MIT OR Apache-2.0,Akhil Vela crc-fast,https://github.com/awesomized/crc-fast-rust,MIT OR Apache-2.0,Don MacAskill crc32fast,https://github.com/srijs/rust-crc32fast,MIT OR Apache-2.0,"Sam Rijs , Alex Crichton " criterion-plot,https://github.com/criterion-rs/criterion.rs,Apache-2.0 OR MIT,"Jorge Aparicio , Brook Heisler " -critical-section,https://github.com/rust-embedded/critical-section,MIT OR Apache-2.0,The critical-section Authors cron,https://github.com/zslayton/cron,MIT OR Apache-2.0,Zack Slayton crossbeam-channel,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-channel Authors crossbeam-deque,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-deque Authors @@ -470,7 +469,6 @@ measure_time,https://github.com/PSeitz/rust_measure_time,MIT,Pascal Seitz , bluss" memmap2,https://github.com/RazrFalcon/memmap2-rs,MIT OR Apache-2.0,"Dan Burkert , Yevhenii Reizner , The Contributors" metrics,https://github.com/metrics-rs/metrics,MIT,Toby Lawrence -metrics-exporter-otel,https://github.com/palindrom615/metrics,MIT,Whoemoon Jang metrics-exporter-prometheus,https://github.com/metrics-rs/metrics,MIT AND Apache-2.0,Toby Lawrence metrics-util,https://github.com/metrics-rs/metrics,MIT,Toby Lawrence mime,https://github.com/hyperium/mime,MIT OR Apache-2.0,Sean McArthur diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 330e6591856..89ce3828611 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -2470,12 +2470,6 @@ dependencies = [ "itertools 0.13.0", ] -[[package]] -name = "critical-section" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" - [[package]] name = "cron" version = "0.16.0" @@ -6128,19 +6122,6 @@ dependencies = [ "rapidhash", ] -[[package]] -name = "metrics-exporter-otel" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58b8984fa38406b80c094943c0ba90e53d5fff0aea051ff9fac96cf6940993c8" -dependencies = [ - "metrics", - "metrics-util", - "opentelemetry", - "portable-atomic", - "scc", -] - [[package]] name = "metrics-exporter-prometheus" version = "0.18.3" @@ -7784,9 +7765,6 @@ name = "portable-atomic" version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" -dependencies = [ - "critical-section", -] [[package]] name = "portable-atomic-util" @@ -9434,13 +9412,13 @@ version = "0.8.0" dependencies = [ "anyhow", "metrics", - "metrics-exporter-otel", "metrics-exporter-prometheus", "metrics-util", "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry_sdk", + "portable-atomic", "quickwit-common", "quickwit-metrics", "serde_json", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 74b7f6069b9..c0985fae063 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -170,7 +170,6 @@ lru = "0.18" matches = "0.1" md5 = "0.8" metrics = "0.24" -metrics-exporter-otel = "0.3" metrics-exporter-prometheus = { version = "0.18", default-features = false } metrics-util = "0.20" mime_guess = "2.0" @@ -192,6 +191,7 @@ parquet = { version = "58", default-features = false, features = ["arrow", "expe percent-encoding = "2.3" pin-project = "1.1" pnet = { version = "0.35", features = ["std"] } +portable-atomic = { version = "1", features = ["float"] } postcard = { version = "1.1", features = [ "use-std", ], default-features = false } diff --git a/quickwit/quickwit-telemetry-exporters/Cargo.toml b/quickwit/quickwit-telemetry-exporters/Cargo.toml index 3b763a96884..424f50c279f 100644 --- a/quickwit/quickwit-telemetry-exporters/Cargo.toml +++ b/quickwit/quickwit-telemetry-exporters/Cargo.toml @@ -13,13 +13,13 @@ license.workspace = true [dependencies] anyhow = { workspace = true } metrics = { workspace = true } -metrics-exporter-otel = { workspace = true } metrics-exporter-prometheus = { workspace = true } metrics-util = { workspace = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true } opentelemetry-otlp = { workspace = true } opentelemetry_sdk = { workspace = true } +portable-atomic = { workspace = true } serde_json = { workspace = true } time = { workspace = true, features = ["parsing"] } tracing = { workspace = true } @@ -28,3 +28,6 @@ tracing-subscriber = { workspace = true } quickwit-common = { workspace = true } quickwit-metrics = { workspace = true } + +[dev-dependencies] +opentelemetry_sdk = { workspace = true, features = ["testing"] } diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs index 65bda3ef476..54a52a905ab 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs @@ -13,11 +13,11 @@ // limitations under the License. use anyhow::Context; -use metrics_exporter_otel::OpenTelemetryRecorder; use opentelemetry::metrics::MeterProvider; use opentelemetry_otlp::{MetricExporter, Protocol as OtlpWireProtocol, WithExportConfig}; use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality}; +use super::metrics_exporter::OtlpMetricsRecorder; use crate::otlp::{OtlpExporterConfig, OtlpProtocol, quickwit_resource}; impl OtlpProtocol { @@ -48,7 +48,7 @@ impl OtlpProtocol { pub(crate) fn build_recorder( service_version: &str, otlp_config: &OtlpExporterConfig, -) -> anyhow::Result<(OpenTelemetryRecorder, SdkMeterProvider)> { +) -> anyhow::Result<(OtlpMetricsRecorder, SdkMeterProvider)> { let metrics_protocol = otlp_config.metrics_protocol()?; let temporality = otlp_config.metrics_temporality()?; let metric_exporter = metrics_protocol.metric_exporter(temporality)?; @@ -58,7 +58,7 @@ pub(crate) fn build_recorder( .build(); let meter = metrics_provider.meter("quickwit"); - let recorder = OpenTelemetryRecorder::new(meter); + let recorder = OtlpMetricsRecorder::new(meter); for (name, buckets) in quickwit_metrics::histogram_buckets() { recorder.set_histogram_bounds(&metrics::KeyName::from(name), buckets); } diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/mod.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/mod.rs new file mode 100644 index 00000000000..89fc0903f96 --- /dev/null +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod recorder; +mod state; +mod storage; + +pub(crate) use recorder::OtlpMetricsRecorder; + +#[cfg(test)] +mod tests; diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/recorder.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/recorder.rs new file mode 100644 index 00000000000..d2a173b49c1 --- /dev/null +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/recorder.rs @@ -0,0 +1,72 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit}; +use metrics_util::MetricKind; + +use super::state::OtlpMetricsState; + +#[derive(Clone)] +pub(crate) struct OtlpMetricsRecorder { + state: Arc, +} + +impl OtlpMetricsRecorder { + pub(crate) fn new(meter: opentelemetry::metrics::Meter) -> Self { + Self { + state: Arc::new(OtlpMetricsState::new(meter)), + } + } + + pub(crate) fn set_histogram_bounds(&self, key_name: &KeyName, bounds: Vec) { + self.state.set_histogram_bounds(key_name, bounds); + } +} + +impl Recorder for OtlpMetricsRecorder { + fn describe_counter(&self, key_name: KeyName, unit: Option, description: SharedString) { + self.state + .set_description(key_name, MetricKind::Counter, unit, description); + } + + fn describe_gauge(&self, key_name: KeyName, unit: Option, description: SharedString) { + self.state + .set_description(key_name, MetricKind::Gauge, unit, description); + } + + fn describe_histogram(&self, key_name: KeyName, unit: Option, description: SharedString) { + self.state + .set_description(key_name, MetricKind::Histogram, unit, description); + } + + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter { + self.state + .registry() + .get_or_create_counter(key, |counter| Counter::from_arc(counter.clone())) + } + + fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge { + self.state + .registry() + .get_or_create_gauge(key, |gauge| Gauge::from_arc(gauge.clone())) + } + + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram { + self.state + .registry() + .get_or_create_histogram(key, |histogram| Histogram::from_arc(histogram.clone())) + } +} diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/state.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/state.rs new file mode 100644 index 00000000000..3ca99908066 --- /dev/null +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/state.rs @@ -0,0 +1,121 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use metrics::{Key, KeyName, SharedString, Unit}; +use metrics_util::MetricKind; +use metrics_util::registry::Registry; + +use super::storage::OtlpMetricStorage; + +pub(super) struct OtlpMetricsState { + registry: Registry, + metadata: OtlpMetricMetadata, +} + +impl OtlpMetricsState { + pub(super) fn new(meter: opentelemetry::metrics::Meter) -> Self { + let metadata = OtlpMetricMetadata::default(); + let registry = Registry::new(OtlpMetricStorage::new(meter, metadata.clone())); + Self { registry, metadata } + } + + pub(super) fn registry(&self) -> &Registry { + &self.registry + } + + pub(super) fn set_description( + &self, + key_name: KeyName, + metric_kind: MetricKind, + unit: Option, + description: SharedString, + ) { + self.metadata + .set_description(key_name, metric_kind, unit, description); + } + + pub(super) fn set_histogram_bounds(&self, key_name: &KeyName, bounds: Vec) { + self.metadata + .set_histogram_bounds(key_name.to_retained(), bounds); + } +} + +#[derive(Clone, Default)] +pub(super) struct OtlpMetricMetadata { + inner: Arc, +} + +#[derive(Default)] +struct OtlpMetricMetadataInner { + descriptions: RwLock>, + histogram_bounds: RwLock>>, +} + +impl OtlpMetricMetadata { + pub(super) fn set_description( + &self, + key_name: KeyName, + metric_kind: MetricKind, + unit: Option, + description: SharedString, + ) { + self.inner + .descriptions + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .insert( + (key_name.to_retained(), metric_kind), + OtlpMetricDescription { unit, description }, + ); + } + + pub(super) fn get_description( + &self, + key_name: &KeyName, + metric_kind: MetricKind, + ) -> Option { + self.inner + .descriptions + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .get(&(key_name.to_retained(), metric_kind)) + .cloned() + } + + fn set_histogram_bounds(&self, key_name: KeyName, bounds: Vec) { + self.inner + .histogram_bounds + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .insert(key_name, bounds); + } + + pub(super) fn get_histogram_bounds(&self, key_name: &KeyName) -> Option> { + self.inner + .histogram_bounds + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .get(&key_name.to_retained()) + .cloned() + } +} + +#[derive(Clone)] +pub(super) struct OtlpMetricDescription { + pub(super) unit: Option, + pub(super) description: SharedString, +} diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/storage.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/storage.rs new file mode 100644 index 00000000000..a36f9487191 --- /dev/null +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/storage.rs @@ -0,0 +1,201 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::sync::Arc; +use std::sync::atomic::Ordering; + +use metrics::atomics::AtomicU64; +use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Unit}; +use metrics_util::MetricKind; +use metrics_util::registry::Storage; +use opentelemetry::KeyValue; +use opentelemetry::metrics::{ + Counter as OtelCounterInstrument, Histogram as OtelHistogramInstrument, ObservableGauge, +}; +use portable_atomic::AtomicF64; + +use super::state::OtlpMetricMetadata; + +pub(super) struct OtlpMetricStorage { + meter: opentelemetry::metrics::Meter, + metadata: OtlpMetricMetadata, +} + +impl OtlpMetricStorage { + pub(super) fn new(meter: opentelemetry::metrics::Meter, metadata: OtlpMetricMetadata) -> Self { + Self { meter, metadata } + } + + fn attributes(key: &Key) -> Vec { + key.labels() + .map(|label| { + let (key, value) = label.clone().into_parts(); + let key: Cow<'static, str> = key.into(); + let value: Cow<'static, str> = value.into(); + KeyValue::new(key, value) + }) + .collect() + } +} + +impl Storage for OtlpMetricStorage { + type Counter = Arc; + type Gauge = Arc; + type Histogram = Arc; + + fn counter(&self, key: &Key) -> Self::Counter { + let key_name = key.name_shared(); + let mut builder = self.meter.u64_counter(key_name.clone().into_inner()); + if let Some(description) = self + .metadata + .get_description(&key_name, MetricKind::Counter) + { + builder = builder.with_description(description.description); + if let Some(unit) = description.unit { + builder = builder.with_unit(unit_as_ucum_label(unit)); + } + } + Arc::new(OtlpCounter::new(builder.build(), Self::attributes(key))) + } + + fn gauge(&self, key: &Key) -> Self::Gauge { + let key_name = key.name_shared(); + let mut builder = self + .meter + .f64_observable_gauge(key_name.clone().into_inner()); + if let Some(description) = self.metadata.get_description(&key_name, MetricKind::Gauge) { + builder = builder.with_description(description.description); + if let Some(unit) = description.unit { + builder = builder.with_unit(unit_as_ucum_label(unit)); + } + } + + let attributes = Self::attributes(key); + let value = Arc::new(AtomicF64::new(0.0)); + let observed_value = value.clone(); + let otel_gauge = builder + .with_callback(move |observer| { + observer.observe(observed_value.load(Ordering::Acquire), &attributes); + }) + .build(); + Arc::new(OtlpGauge { + value, + _otel_gauge: otel_gauge, + }) + } + + fn histogram(&self, key: &Key) -> Self::Histogram { + let key_name = key.name_shared(); + let mut builder = self.meter.f64_histogram(key_name.clone().into_inner()); + if let Some(description) = self + .metadata + .get_description(&key_name, MetricKind::Histogram) + { + builder = builder.with_description(description.description); + if let Some(unit) = description.unit { + builder = builder.with_unit(unit_as_ucum_label(unit)); + } + } + if let Some(bounds) = self.metadata.get_histogram_bounds(&key_name) { + builder = builder.with_boundaries(bounds); + } + Arc::new(OtlpHistogram { + histogram: builder.build(), + attributes: Self::attributes(key), + }) + } +} + +pub(super) struct OtlpCounter { + counter: OtelCounterInstrument, + value: AtomicU64, + attributes: Vec, +} + +impl OtlpCounter { + fn new(counter: OtelCounterInstrument, attributes: Vec) -> Self { + Self { + counter, + value: AtomicU64::new(0), + attributes, + } + } +} + +impl CounterFn for OtlpCounter { + fn increment(&self, value: u64) { + self.value.fetch_add(value, Ordering::Release); + self.counter.add(value, &self.attributes); + } + + fn absolute(&self, value: u64) { + let previous = self.value.fetch_max(value, Ordering::AcqRel); + if value > previous { + self.counter.add(value - previous, &self.attributes); + } + } +} + +pub(super) struct OtlpGauge { + value: Arc, + _otel_gauge: ObservableGauge, +} + +impl GaugeFn for OtlpGauge { + fn increment(&self, value: f64) { + self.value.fetch_add(value, Ordering::AcqRel); + } + + fn decrement(&self, value: f64) { + self.value.fetch_sub(value, Ordering::AcqRel); + } + + fn set(&self, value: f64) { + self.value.store(value, Ordering::Release); + } +} + +pub(super) struct OtlpHistogram { + histogram: OtelHistogramInstrument, + attributes: Vec, +} + +impl HistogramFn for OtlpHistogram { + fn record(&self, value: f64) { + self.histogram.record(value, &self.attributes); + } +} + +fn unit_as_ucum_label(unit: Unit) -> &'static str { + match unit { + Unit::Count => "1", + Unit::Percent => "%", + Unit::Seconds => "s", + Unit::Milliseconds => "ms", + Unit::Microseconds => "us", + Unit::Nanoseconds => "ns", + Unit::Tebibytes => "TiBy", + Unit::Gibibytes => "GiBy", + Unit::Mebibytes => "MiBy", + Unit::Kibibytes => "KiBy", + Unit::Bytes => "By", + Unit::TerabitsPerSecond => "Tbit/s", + Unit::GigabitsPerSecond => "Gbit/s", + Unit::MegabitsPerSecond => "Mbit/s", + Unit::KilobitsPerSecond => "kbit/s", + Unit::BitsPerSecond => "bit/s", + Unit::CountPerSecond => "1/s", + } +} diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/tests.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/tests.rs new file mode 100644 index 00000000000..0a470add56d --- /dev/null +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics_exporter/tests.rs @@ -0,0 +1,421 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use metrics::{KeyName, Unit}; +use opentelemetry::KeyValue; +use opentelemetry::metrics::MeterProvider; +use opentelemetry_sdk::metrics::data::{AggregatedMetrics, Metric, MetricData, ResourceMetrics}; +use opentelemetry_sdk::metrics::{InMemoryMetricExporter, SdkMeterProvider}; + +use super::*; + +fn setup_recorder() -> ( + OtlpMetricsRecorder, + SdkMeterProvider, + InMemoryMetricExporter, +) { + let exporter = InMemoryMetricExporter::default(); + let provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter.clone()) + .build(); + let recorder = OtlpMetricsRecorder::new(provider.meter("quickwit-test")); + (recorder, provider, exporter) +} + +fn exported_metrics( + provider: &SdkMeterProvider, + exporter: &InMemoryMetricExporter, +) -> Vec { + provider.force_flush().expect("metrics should flush"); + exporter + .get_finished_metrics() + .expect("metrics should be exported") +} + +fn find_metric<'a>(resource_metrics: &'a [ResourceMetrics], name: &str) -> &'a Metric { + resource_metrics + .iter() + .rev() + .flat_map(|resource_metric| resource_metric.scope_metrics()) + .flat_map(|scope_metrics| scope_metrics.metrics()) + .find(|metric| metric.name() == name) + .unwrap_or_else(|| panic!("metric `{name}` should be exported")) +} + +fn has_attributes<'a>( + attributes: impl Iterator, + expected_attributes: &[(&str, &str)], +) -> bool { + let attributes: Vec<_> = attributes.collect(); + expected_attributes + .iter() + .all(|(expected_key, expected_value)| { + attributes.iter().any(|attribute| { + attribute.key.as_str() == *expected_key + && attribute.value.to_string() == *expected_value + }) + }) +} + +#[test] +fn counter_exports_increment_and_absolute_delta() { + let (recorder, provider, exporter) = setup_recorder(); + + metrics::with_local_recorder(&recorder, || { + let counter = metrics::counter!( + description: "Total test requests", + unit: Unit::Count, + "otlp_test_requests_total", + "route" => "search", + ); + counter.increment(2); + counter.absolute(10); + counter.absolute(4); + counter.increment(3); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_requests_total"); + assert_eq!(metric.description(), "Total test requests"); + assert_eq!(metric.unit(), "1"); + + let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() else { + panic!("counter should export a u64 sum"); + }; + assert!(sum.is_monotonic()); + let data_points: Vec<_> = sum.data_points().collect(); + assert_eq!(data_points.len(), 1); + assert_eq!(data_points[0].value(), 13); + assert!(has_attributes( + data_points[0].attributes(), + &[("route", "search")] + )); +} + +#[test] +fn counter_exports_distinct_data_points_for_distinct_label_sets() { + let (recorder, provider, exporter) = setup_recorder(); + + metrics::with_local_recorder(&recorder, || { + metrics::describe_counter!( + "otlp_test_labeled_requests_total", + Unit::Count, + "Total labeled requests" + ); + metrics::counter!( + "otlp_test_labeled_requests_total", + "method" => "GET", + "status" => "200", + ) + .increment(1); + metrics::counter!( + "otlp_test_labeled_requests_total", + "method" => "POST", + "status" => "201", + ) + .increment(2); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_labeled_requests_total"); + assert_eq!(metric.description(), "Total labeled requests"); + assert_eq!(metric.unit(), "1"); + + let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() else { + panic!("counter should export a u64 sum"); + }; + let data_points: Vec<_> = sum.data_points().collect(); + assert_eq!(data_points.len(), 2); + + let get_point = data_points + .iter() + .find(|data_point| { + has_attributes( + data_point.attributes(), + &[("method", "GET"), ("status", "200")], + ) + }) + .expect("GET counter data point should exist"); + assert_eq!(get_point.value(), 1); + + let post_point = data_points + .iter() + .find(|data_point| { + has_attributes( + data_point.attributes(), + &[("method", "POST"), ("status", "201")], + ) + }) + .expect("POST counter data point should exist"); + assert_eq!(post_point.value(), 2); +} + +#[test] +fn counter_accumulates_increments_across_flushes() { + let (recorder, provider, exporter) = setup_recorder(); + + metrics::with_local_recorder(&recorder, || { + metrics::counter!("otlp_test_events_total").increment(5); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_events_total"); + let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() else { + panic!("counter should export a u64 sum"); + }; + let first_point = sum + .data_points() + .next() + .expect("counter data point should exist"); + assert_eq!(first_point.value(), 5); + + metrics::with_local_recorder(&recorder, || { + metrics::counter!("otlp_test_events_total").increment(3); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_events_total"); + let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() else { + panic!("counter should export a u64 sum"); + }; + let latest_point = sum + .data_points() + .next() + .expect("counter data point should exist"); + assert_eq!(latest_point.value(), 8); +} + +#[test] +fn gauge_exports_latest_observed_value() { + let (recorder, provider, exporter) = setup_recorder(); + + metrics::with_local_recorder(&recorder, || { + let gauge = metrics::gauge!( + description: "Active test shards", + unit: Unit::Count, + "otlp_test_active_shards", + "node" => "node-a", + ); + gauge.set(5.0); + gauge.increment(2.0); + gauge.decrement(1.5); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_active_shards"); + assert_eq!(metric.description(), "Active test shards"); + assert_eq!(metric.unit(), "1"); + + let AggregatedMetrics::F64(MetricData::Gauge(gauge)) = metric.data() else { + panic!("gauge should export an f64 gauge"); + }; + let data_points: Vec<_> = gauge.data_points().collect(); + assert_eq!(data_points.len(), 1); + assert_eq!(data_points[0].value(), 5.5); + assert!(has_attributes( + data_points[0].attributes(), + &[("node", "node-a")] + )); +} + +#[test] +fn gauge_exports_distinct_data_points_for_distinct_label_sets() { + let (recorder, provider, exporter) = setup_recorder(); + + metrics::with_local_recorder(&recorder, || { + metrics::describe_gauge!("otlp_test_cpu_usage", Unit::Percent, "Current CPU usage"); + metrics::gauge!("otlp_test_cpu_usage", "core" => "0").set(45.5); + metrics::gauge!("otlp_test_cpu_usage", "core" => "1").set(62.3); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_cpu_usage"); + assert_eq!(metric.description(), "Current CPU usage"); + assert_eq!(metric.unit(), "%"); + + let AggregatedMetrics::F64(MetricData::Gauge(gauge)) = metric.data() else { + panic!("gauge should export an f64 gauge"); + }; + let data_points: Vec<_> = gauge.data_points().collect(); + assert_eq!(data_points.len(), 2); + + let core_0_point = data_points + .iter() + .find(|data_point| has_attributes(data_point.attributes(), &[("core", "0")])) + .expect("core 0 gauge data point should exist"); + assert_eq!(core_0_point.value(), 45.5); + + let core_1_point = data_points + .iter() + .find(|data_point| has_attributes(data_point.attributes(), &[("core", "1")])) + .expect("core 1 gauge data point should exist"); + assert_eq!(core_1_point.value(), 62.3); +} + +#[test] +fn gauge_updates_value_across_flushes() { + let (recorder, provider, exporter) = setup_recorder(); + + metrics::with_local_recorder(&recorder, || { + metrics::gauge!("otlp_test_memory_usage").set(1024.0); + }); + exported_metrics(&provider, &exporter); + + metrics::with_local_recorder(&recorder, || { + metrics::gauge!("otlp_test_memory_usage").set(2048.0); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_memory_usage"); + let AggregatedMetrics::F64(MetricData::Gauge(gauge)) = metric.data() else { + panic!("gauge should export an f64 gauge"); + }; + let data_point = gauge + .data_points() + .next() + .expect("gauge data point should exist"); + assert_eq!(data_point.value(), 2048.0); +} + +#[test] +fn histogram_exports_distinct_data_points_for_distinct_label_sets() { + let (recorder, provider, exporter) = setup_recorder(); + + metrics::with_local_recorder(&recorder, || { + metrics::describe_histogram!( + "otlp_test_response_time_seconds", + Unit::Seconds, + "Response time distribution" + ); + metrics::histogram!("otlp_test_response_time_seconds", "endpoint" => "/api/users") + .record(0.123); + metrics::histogram!("otlp_test_response_time_seconds", "endpoint" => "/api/users") + .record(0.456); + metrics::histogram!("otlp_test_response_time_seconds", "endpoint" => "/api/posts") + .record(0.789); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_response_time_seconds"); + assert_eq!(metric.description(), "Response time distribution"); + assert_eq!(metric.unit(), "s"); + + let AggregatedMetrics::F64(MetricData::Histogram(histogram)) = metric.data() else { + panic!("histogram should export an f64 histogram"); + }; + let data_points: Vec<_> = histogram.data_points().collect(); + assert_eq!(data_points.len(), 2); + + let users_point = data_points + .iter() + .find(|data_point| has_attributes(data_point.attributes(), &[("endpoint", "/api/users")])) + .expect("users histogram data point should exist"); + assert_eq!(users_point.count(), 2); + assert!((users_point.sum() - (0.123 + 0.456)).abs() < f64::EPSILON); + + let posts_point = data_points + .iter() + .find(|data_point| has_attributes(data_point.attributes(), &[("endpoint", "/api/posts")])) + .expect("posts histogram data point should exist"); + assert_eq!(posts_point.count(), 1); + assert!((posts_point.sum() - 0.789).abs() < f64::EPSILON); +} + +#[test] +fn histogram_exports_configured_boundaries() { + let (recorder, provider, exporter) = setup_recorder(); + recorder.set_histogram_bounds( + &KeyName::from("otlp_test_request_duration_seconds"), + vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0], + ); + + metrics::with_local_recorder(&recorder, || { + let histogram = metrics::histogram!( + description: "Test request duration", + unit: Unit::Seconds, + "otlp_test_request_duration_seconds", + "route" => "ingest", + ); + histogram.record(0.03); + histogram.record(0.12); + histogram.record(0.75); + histogram.record(3.5); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + let metric = find_metric(&resource_metrics, "otlp_test_request_duration_seconds"); + assert_eq!(metric.description(), "Test request duration"); + assert_eq!(metric.unit(), "s"); + + let AggregatedMetrics::F64(MetricData::Histogram(histogram)) = metric.data() else { + panic!("histogram should export an f64 histogram"); + }; + let data_points: Vec<_> = histogram.data_points().collect(); + assert_eq!(data_points.len(), 1); + assert!(has_attributes( + data_points[0].attributes(), + &[("route", "ingest")] + )); + assert_eq!( + data_points[0].bounds().collect::>(), + vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] + ); + assert_eq!(data_points[0].count(), 4); + assert!((data_points[0].sum() - (0.03 + 0.12 + 0.75 + 3.5)).abs() < f64::EPSILON); + assert_eq!( + data_points[0].bucket_counts().collect::>(), + vec![0, 1, 0, 1, 0, 1, 0, 1, 0, 0] + ); +} + +#[test] +fn metric_descriptions_and_units_are_exported() { + let (recorder, provider, exporter) = setup_recorder(); + + metrics::with_local_recorder(&recorder, || { + metrics::describe_counter!( + "otlp_test_described_counter", + Unit::Count, + "Counter description" + ); + metrics::describe_gauge!( + "otlp_test_described_gauge", + Unit::Bytes, + "Gauge description" + ); + metrics::describe_histogram!( + "otlp_test_described_histogram", + Unit::Milliseconds, + "Histogram description" + ); + + metrics::counter!("otlp_test_described_counter").increment(1); + metrics::gauge!("otlp_test_described_gauge").set(42.0); + metrics::histogram!("otlp_test_described_histogram").record(0.5); + }); + + let resource_metrics = exported_metrics(&provider, &exporter); + + let counter = find_metric(&resource_metrics, "otlp_test_described_counter"); + assert_eq!(counter.description(), "Counter description"); + assert_eq!(counter.unit(), "1"); + + let gauge = find_metric(&resource_metrics, "otlp_test_described_gauge"); + assert_eq!(gauge.description(), "Gauge description"); + assert_eq!(gauge.unit(), "By"); + + let histogram = find_metric(&resource_metrics, "otlp_test_described_histogram"); + assert_eq!(histogram.description(), "Histogram description"); + assert_eq!(histogram.unit(), "ms"); +} diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs index 116836a09ca..211e74c85d4 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs @@ -15,6 +15,7 @@ mod config; pub(crate) mod logs; pub(crate) mod metrics; +mod metrics_exporter; pub(crate) mod traces; pub(crate) use config::{OtlpExporterConfig, OtlpProtocol, quickwit_resource};