From 1cb53249625d7956c9485ced4594db11a421728a Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Wed, 4 Feb 2026 14:09:41 +0400 Subject: [PATCH] feat(stats): Zero down deleted VPC gauges Signed-off-by: Sergey Matov --- stats/src/dpstats.rs | 186 +++++++++++++++++++++++++++++++++---------- stats/src/vpc.rs | 2 +- 2 files changed, 147 insertions(+), 41 deletions(-) diff --git a/stats/src/dpstats.rs b/stats/src/dpstats.rs index 01d4ba50a..4b8646cf5 100644 --- a/stats/src/dpstats.rs +++ b/stats/src/dpstats.rs @@ -10,13 +10,14 @@ use pipeline::NetworkFunction; use concurrency::sync::Arc; use kanal::ReceiveError; -use std::collections::{HashSet, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; use vpcmap::VpcDiscriminant; use vpcmap::map::VpcMapReader; use crate::vpc_stats::VpcStatsStore; -use crate::{RegisteredVpcMetrics, Specification, VpcMetricsSpec}; +use crate::{MetricSpec, Register, RegisteredVpcMetrics, Specification, VpcMetricsSpec}; +use metrics::Unit; use net::buffer::PacketBufferMut; use net::packet::DoneReason; use rand::RngCore; @@ -63,6 +64,42 @@ fn snapshot_vpc_pairs(reader: &VpcMapReader) -> Vec<(VpcDiscriminant } } +#[inline] +fn vpc_id_packet_count() -> &'static str { + "vpc_packet_count" +} +#[inline] +fn vpc_id_packet_rate() -> &'static str { + "vpc_packet_rate" +} +#[inline] +fn vpc_id_byte_count() -> &'static str { + "vpc_byte_count" +} +#[inline] +fn vpc_id_byte_rate() -> &'static str { + "vpc_byte_rate" +} + +#[inline] +fn set_vpc_gauges_to_zero(labels: Vec<(String, String)>) { + let packet_count: crate::register::Registered = + MetricSpec::new(vpc_id_packet_count(), Unit::Count, labels.clone()).register(); + packet_count.metric.set(0.0); + + let packet_rate: crate::register::Registered = + MetricSpec::new(vpc_id_packet_rate(), Unit::BitsPerSecond, labels.clone()).register(); + packet_rate.metric.set(0.0); + + let byte_count: crate::register::Registered = + MetricSpec::new(vpc_id_byte_count(), Unit::Count, labels.clone()).register(); + byte_count.metric.set(0.0); + + let byte_rate: crate::register::Registered = + MetricSpec::new(vpc_id_byte_rate(), Unit::BitsPerSecond, labels).register(); + byte_rate.metric.set(0.0); +} + /// A `StatsCollector` is responsible for collecting and aggregating packet statistics for a /// collection of workers running packet processing pipelines on various threads. #[derive(Debug)] @@ -83,6 +120,9 @@ pub struct StatsCollector { /// Shared store for snapshots/rates usable by gRPC, CLI, etc. vpc_store: Arc, alive_vpcs: HashSet, + /// `known` is a reference to the previous snapshot of `alive` VPCs, used to detect removals. + known_vpcs: HashSet, + known_names: HashMap, } impl StatsCollector { @@ -127,11 +167,17 @@ impl StatsCollector { }; let name_pairs = snapshot_vpc_pairs(&vpcmap_r); - vpc_store.set_many_vpc_names_sync(name_pairs); + vpc_store.set_many_vpc_names_sync(name_pairs.clone()); let alive_vpcs: HashSet = vpc_data.iter().map(|(disc, _, _)| *disc).collect(); + let mut known_names: HashMap = HashMap::new(); + for (disc, name) in name_pairs { + known_names.insert(disc, name); + } + let known_vpcs = alive_vpcs.clone(); + let metrics = VpcMetricsSpec::new(vpc_data) .into_iter() .map(|(disc, spec)| (disc, spec.build())) @@ -155,6 +201,8 @@ impl StatsCollector { updates, vpc_store, alive_vpcs, + known_vpcs, + known_names, }; let writer = PacketStatsWriter(s); (stats, writer, store_clone) @@ -182,10 +230,54 @@ impl StatsCollector { let pairs = snapshot_vpc_pairs(&self.vpcmap_r); self.vpc_store.set_many_vpc_names_sync(pairs.clone()); - self.alive_vpcs = pairs.iter().map(|(d, _)| *d).collect(); + let new_alive: HashSet = pairs.iter().map(|(d, _)| *d).collect(); + + let mut removed: Vec = + self.known_vpcs.difference(&new_alive).copied().collect(); + removed.sort(); + + for (disc, name) in &pairs { + self.known_names.insert(*disc, name.clone()); + } + + self.alive_vpcs = new_alive.clone(); // prune any removed VPCs / pairs so they do not show up in snapshots/status self.vpc_store.prune_to_vpcs(&self.alive_vpcs).await; + + if !removed.is_empty() { + let mut alive_names: Vec = pairs.iter().map(|(_, n)| n.clone()).collect(); + alive_names.sort(); + alive_names.dedup(); + + for disc in removed { + let removed_name = self + .known_names + .get(&disc) + .cloned() + .unwrap_or_else(|| format!("{disc:?}")); + + // total/drops series for the removed VPC + set_vpc_gauges_to_zero(vec![("total".to_string(), removed_name.clone())]); + set_vpc_gauges_to_zero(vec![("drops".to_string(), removed_name.clone())]); + + // peering series in both directions + for other_name in &alive_names { + set_vpc_gauges_to_zero(vec![ + ("from".to_string(), removed_name.clone()), + ("to".to_string(), other_name.clone()), + ]); + set_vpc_gauges_to_zero(vec![ + ("from".to_string(), other_name.clone()), + ("to".to_string(), removed_name.clone()), + ]); + } + + self.known_names.remove(&disc); + } + } + + self.known_vpcs = new_alive; } /// Run the collector (async). Does not return if awaited. @@ -364,42 +456,6 @@ impl StatsCollector { start, duration, capacity, )); - // Update raw packet/byte COUNTS for "total" metrics (monotonic counters) - concluded.vpc.iter().for_each(|(&src, tx_summary)| { - let metrics = match self.metrics.get(&src) { - None => { - warn!("lost metrics for src {src}"); - return; - } - Some(metrics) => metrics, - }; - tx_summary - .dst - .iter() - .for_each(|(&dst, &stats)| match metrics.peering.get(&dst) { - None => { - warn!("lost metrics for src {src} to dst {dst}"); - } - Some(action) => { - action.tx.packet.count.metric.increment(stats.packets); - action.tx.byte.count.metric.increment(stats.bytes); - } - }); - let action = &metrics.drops; - action - .tx - .packet - .count - .metric - .increment(tx_summary.drops.packets); - action - .tx - .byte - .count - .metric - .increment(tx_summary.drops.bytes); - }); - // Mirror counters into the store (monotonic) for (&src, tx_summary) in &concluded.vpc { if !self.alive_vpcs.contains(&src) { @@ -439,6 +495,52 @@ impl StatsCollector { // Push this *apportioned per-batch* snapshot into the SG window. self.submitted.push(concluded.vpc.clone()); + // Refresh count gauges from the store (so reuse doesn't carry stale totals). + let pair_snap = self.vpc_store.snapshot_pairs().await; + for ((src, dst), fs) in pair_snap { + if !self.alive_vpcs.contains(&src) || !self.alive_vpcs.contains(&dst) { + continue; + } + if let Some(metrics) = self.metrics.get(&src) + && let Some(action) = metrics.peering.get(&dst) + { + action.tx.packet.count.metric.set(fs.ctr.packets as f64); + action.tx.byte.count.metric.set(fs.ctr.bytes as f64); + } + } + + let vpc_snap = self.vpc_store.snapshot_vpcs().await; + for (src, fs) in vpc_snap { + if !self.alive_vpcs.contains(&src) { + continue; + } + if let Some(metrics) = self.metrics.get(&src) { + metrics + .total + .tx + .packet + .count + .metric + .set(fs.ctr.packets as f64); + metrics.total.tx.byte.count.metric.set(fs.ctr.bytes as f64); + + metrics + .drops + .tx + .packet + .count + .metric + .set(fs.drops.packets as f64); + metrics + .drops + .tx + .byte + .count + .metric + .set(fs.drops.bytes as f64); + } + } + // Build per-source filters and smooth. let filters_by_src: hashbrown::HashMap< VpcDiscriminant, @@ -487,6 +589,10 @@ impl StatsCollector { total_bps += bps; } + // total per-vpc rates + metrics.total.tx.packet.rate.metric.set(total_pps); + metrics.total.tx.byte.rate.metric.set(total_bps); + self.vpc_store .set_vpc_rates(src, total_pps, total_bps) .await; diff --git a/stats/src/vpc.rs b/stats/src/vpc.rs index a1a4d76b2..33b8eea6f 100644 --- a/stats/src/vpc.rs +++ b/stats/src/vpc.rs @@ -33,7 +33,7 @@ impl CountAndRateSpec { #[derive(Debug, Serialize)] pub struct RegisteredCountAndRate { - pub count: Registered, + pub count: Registered, pub rate: Registered, }