Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 146 additions & 40 deletions stats/src/dpstats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use pipeline::NetworkFunction;

use concurrency::sync::Arc;
use kanal::ReceiveError;
use std::collections::{HashSet, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use vpcmap::VpcDiscriminant;
use vpcmap::map::VpcMapReader;

use crate::vpc_stats::VpcStatsStore;
use crate::{RegisteredVpcMetrics, Specification, VpcMetricsSpec};
use crate::{MetricSpec, Register, RegisteredVpcMetrics, Specification, VpcMetricsSpec};
use metrics::Unit;
use net::buffer::PacketBufferMut;
use net::packet::DoneReason;
use rand::RngCore;
Expand Down Expand Up @@ -63,6 +64,42 @@ fn snapshot_vpc_pairs(reader: &VpcMapReader<VpcMapName>) -> Vec<(VpcDiscriminant
}
}

#[inline]
fn vpc_id_packet_count() -> &'static str {
"vpc_packet_count"
}
#[inline]
fn vpc_id_packet_rate() -> &'static str {
"vpc_packet_rate"
}
#[inline]
fn vpc_id_byte_count() -> &'static str {
"vpc_byte_count"
}
#[inline]
fn vpc_id_byte_rate() -> &'static str {
"vpc_byte_rate"
}

#[inline]
fn set_vpc_gauges_to_zero(labels: Vec<(String, String)>) {
let packet_count: crate::register::Registered<metrics::Gauge> =
MetricSpec::new(vpc_id_packet_count(), Unit::Count, labels.clone()).register();
packet_count.metric.set(0.0);

let packet_rate: crate::register::Registered<metrics::Gauge> =
MetricSpec::new(vpc_id_packet_rate(), Unit::BitsPerSecond, labels.clone()).register();
packet_rate.metric.set(0.0);

let byte_count: crate::register::Registered<metrics::Gauge> =
MetricSpec::new(vpc_id_byte_count(), Unit::Count, labels.clone()).register();
byte_count.metric.set(0.0);

let byte_rate: crate::register::Registered<metrics::Gauge> =
MetricSpec::new(vpc_id_byte_rate(), Unit::BitsPerSecond, labels).register();
byte_rate.metric.set(0.0);
}

/// A `StatsCollector` is responsible for collecting and aggregating packet statistics for a
/// collection of workers running packet processing pipelines on various threads.
#[derive(Debug)]
Expand All @@ -83,6 +120,9 @@ pub struct StatsCollector {
/// Shared store for snapshots/rates usable by gRPC, CLI, etc.
vpc_store: Arc<VpcStatsStore>,
alive_vpcs: HashSet<VpcDiscriminant>,
/// `known` is a reference to the previous snapshot of `alive` VPCs, used to detect removals.
known_vpcs: HashSet<VpcDiscriminant>,
known_names: HashMap<VpcDiscriminant, String>,
}

impl StatsCollector {
Expand Down Expand Up @@ -127,11 +167,17 @@ impl StatsCollector {
};

let name_pairs = snapshot_vpc_pairs(&vpcmap_r);
vpc_store.set_many_vpc_names_sync(name_pairs);
vpc_store.set_many_vpc_names_sync(name_pairs.clone());

let alive_vpcs: HashSet<VpcDiscriminant> =
vpc_data.iter().map(|(disc, _, _)| *disc).collect();

let mut known_names: HashMap<VpcDiscriminant, String> = HashMap::new();
for (disc, name) in name_pairs {
known_names.insert(disc, name);
}
let known_vpcs = alive_vpcs.clone();

let metrics = VpcMetricsSpec::new(vpc_data)
.into_iter()
.map(|(disc, spec)| (disc, spec.build()))
Expand All @@ -155,6 +201,8 @@ impl StatsCollector {
updates,
vpc_store,
alive_vpcs,
known_vpcs,
known_names,
};
let writer = PacketStatsWriter(s);
(stats, writer, store_clone)
Expand Down Expand Up @@ -182,10 +230,54 @@ impl StatsCollector {
let pairs = snapshot_vpc_pairs(&self.vpcmap_r);
self.vpc_store.set_many_vpc_names_sync(pairs.clone());

self.alive_vpcs = pairs.iter().map(|(d, _)| *d).collect();
let new_alive: HashSet<VpcDiscriminant> = pairs.iter().map(|(d, _)| *d).collect();

let mut removed: Vec<VpcDiscriminant> =
self.known_vpcs.difference(&new_alive).copied().collect();
removed.sort();

for (disc, name) in &pairs {
self.known_names.insert(*disc, name.clone());
}

self.alive_vpcs = new_alive.clone();

// prune any removed VPCs / pairs so they do not show up in snapshots/status
self.vpc_store.prune_to_vpcs(&self.alive_vpcs).await;

if !removed.is_empty() {
let mut alive_names: Vec<String> = pairs.iter().map(|(_, n)| n.clone()).collect();
alive_names.sort();
alive_names.dedup();

for disc in removed {
let removed_name = self
.known_names
.get(&disc)
.cloned()
.unwrap_or_else(|| format!("{disc:?}"));

// total/drops series for the removed VPC
set_vpc_gauges_to_zero(vec![("total".to_string(), removed_name.clone())]);
set_vpc_gauges_to_zero(vec![("drops".to_string(), removed_name.clone())]);

// peering series in both directions
for other_name in &alive_names {
set_vpc_gauges_to_zero(vec![
("from".to_string(), removed_name.clone()),
("to".to_string(), other_name.clone()),
]);
set_vpc_gauges_to_zero(vec![
("from".to_string(), other_name.clone()),
("to".to_string(), removed_name.clone()),
]);
}

self.known_names.remove(&disc);
}
}

self.known_vpcs = new_alive;
}

/// Run the collector (async). Does not return if awaited.
Expand Down Expand Up @@ -364,42 +456,6 @@ impl StatsCollector {
start, duration, capacity,
));

// Update raw packet/byte COUNTS for "total" metrics (monotonic counters)
concluded.vpc.iter().for_each(|(&src, tx_summary)| {
let metrics = match self.metrics.get(&src) {
None => {
warn!("lost metrics for src {src}");
return;
}
Some(metrics) => metrics,
};
tx_summary
.dst
.iter()
.for_each(|(&dst, &stats)| match metrics.peering.get(&dst) {
None => {
warn!("lost metrics for src {src} to dst {dst}");
}
Some(action) => {
action.tx.packet.count.metric.increment(stats.packets);
action.tx.byte.count.metric.increment(stats.bytes);
}
});
let action = &metrics.drops;
action
.tx
.packet
.count
.metric
.increment(tx_summary.drops.packets);
action
.tx
.byte
.count
.metric
.increment(tx_summary.drops.bytes);
});

// Mirror counters into the store (monotonic)
for (&src, tx_summary) in &concluded.vpc {
if !self.alive_vpcs.contains(&src) {
Expand Down Expand Up @@ -439,6 +495,52 @@ impl StatsCollector {
// Push this *apportioned per-batch* snapshot into the SG window.
self.submitted.push(concluded.vpc.clone());

// Refresh count gauges from the store (so reuse doesn't carry stale totals).
let pair_snap = self.vpc_store.snapshot_pairs().await;
for ((src, dst), fs) in pair_snap {
if !self.alive_vpcs.contains(&src) || !self.alive_vpcs.contains(&dst) {
continue;
}
if let Some(metrics) = self.metrics.get(&src)
&& let Some(action) = metrics.peering.get(&dst)
{
action.tx.packet.count.metric.set(fs.ctr.packets as f64);
action.tx.byte.count.metric.set(fs.ctr.bytes as f64);
}
}

let vpc_snap = self.vpc_store.snapshot_vpcs().await;
for (src, fs) in vpc_snap {
if !self.alive_vpcs.contains(&src) {
continue;
}
if let Some(metrics) = self.metrics.get(&src) {
metrics
.total
.tx
.packet
.count
.metric
.set(fs.ctr.packets as f64);
metrics.total.tx.byte.count.metric.set(fs.ctr.bytes as f64);

metrics
.drops
.tx
.packet
.count
.metric
.set(fs.drops.packets as f64);
metrics
.drops
.tx
.byte
.count
.metric
.set(fs.drops.bytes as f64);
}
}

// Build per-source filters and smooth.
let filters_by_src: hashbrown::HashMap<
VpcDiscriminant,
Expand Down Expand Up @@ -487,6 +589,10 @@ impl StatsCollector {
total_bps += bps;
}

// total per-vpc rates
metrics.total.tx.packet.rate.metric.set(total_pps);
metrics.total.tx.byte.rate.metric.set(total_bps);

self.vpc_store
.set_vpc_rates(src, total_pps, total_bps)
.await;
Expand Down
2 changes: 1 addition & 1 deletion stats/src/vpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl CountAndRateSpec {

#[derive(Debug, Serialize)]
pub struct RegisteredCountAndRate {
pub count: Registered<metrics::Counter>,
pub count: Registered<metrics::Gauge>,
pub rate: Registered<metrics::Gauge>,
}

Expand Down
Loading