diff --git a/crates/blockchain/src/aggregation.rs b/crates/blockchain/src/aggregation.rs index 7f6f7427..8fe3e2d2 100644 --- a/crates/blockchain/src/aggregation.rs +++ b/crates/blockchain/src/aggregation.rs @@ -403,11 +403,14 @@ pub(crate) fn run_aggregation_worker( let mut groups_aggregated = 0usize; let mut total_raw_sigs = 0usize; let mut total_children = 0usize; + let jobs_total = snapshot.jobs.len(); + let mut jobs_attempted = 0usize; for job in snapshot.jobs { if cancel.is_cancelled() { break; } + jobs_attempted += 1; let slot = job.slot; let raw_sigs = job.raw_ids.len(); @@ -450,6 +453,13 @@ pub(crate) fn run_aggregation_worker( } } + // Jobs the loop never reached (deadline cancellation or actor gone) are + // skipped aggregation submissions per leanMetrics. + let jobs_dropped = jobs_total - jobs_attempted; + if jobs_dropped > 0 { + metrics::inc_aggregator_skipped_other(jobs_dropped as u64); + } + let _ = actor.send(AggregationDone { session_id, groups_considered, diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 7cc11b1d..3bfd9367 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -254,9 +254,16 @@ impl BlockChainServer { proposer_validator_id.is_some(), ); - if interval == 2 && is_aggregator { - coverage::emit_agg_start_new_coverage(&self.store, self.attestation_committee_count); - self.start_aggregation_session(slot, ctx).await; + if interval == 2 { + if is_aggregator { + coverage::emit_agg_start_new_coverage( + &self.store, + self.attestation_committee_count, + ); + self.start_aggregation_session(slot, ctx).await; + } else { + metrics::inc_aggregator_skipped_not_aggregator(); + } } // Now build and publish the block (after attestations have been accepted) diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 8a8cbcf3..facb4097 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -513,6 +513,33 @@ static LEAN_NODE_SYNC_STATUS: std::sync::LazyLock = std::sync::Lazy register_int_gauge_vec!("lean_node_sync_status", "Node sync status", &["status"]).unwrap() }); +// --- Aggregator Skips --- + +/// Cross-client label set for `lean_aggregator_skipped_total` (leanMetrics). +/// +/// `not_synced`, `missing_state` and `spawn_failed` never fire in ethlambda +/// today: aggregation is not gated on sync status, needs no per-target +/// pre-state resolution, and the `spawn_blocking` worker cannot fail to +/// start. They are seeded at zero so fleet-wide dashboards see the full +/// series. +const AGGREGATOR_SKIP_REASONS: &[&str] = &[ + "not_aggregator", + "not_synced", + "missing_state", + "spawn_failed", + "other", +]; + +static LEAN_AGGREGATOR_SKIPPED_TOTAL: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_counter_vec!( + "lean_aggregator_skipped_total", + "Aggregation submissions skipped, by reason", + &["reason"] + ) + .unwrap() + }); + // --- Initialization --- /// Register all metrics with the Prometheus registry so they appear in `/metrics` from startup. @@ -588,6 +615,13 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_BLOCK_PROPOSAL_AGGREGATES_SELECTED); // Sync status std::sync::LazyLock::force(&LEAN_NODE_SYNC_STATUS); + // Aggregator skip counter: instantiate every cross-client reason so the + // full series is visible from startup, including reasons ethlambda + // never fires. + std::sync::LazyLock::force(&LEAN_AGGREGATOR_SKIPPED_TOTAL); + for &reason in AGGREGATOR_SKIP_REASONS { + LEAN_AGGREGATOR_SKIPPED_TOTAL.with_label_values(&[reason]); + } } // --- Public API --- @@ -725,6 +759,23 @@ pub fn observe_committee_signatures_aggregation(elapsed: std::time::Duration) { LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS.observe(elapsed.as_secs_f64()); } +/// One aggregation cycle (interval-2 tick) skipped because this node has no +/// aggregation duty. Bookkeeping label that lets dashboards separate "no +/// duty" from genuine misses. +pub fn inc_aggregator_skipped_not_aggregator() { + LEAN_AGGREGATOR_SKIPPED_TOTAL + .with_label_values(&["not_aggregator"]) + .inc(); +} + +/// Aggregation jobs dropped without being attempted, e.g. because the +/// session deadline cancelled the worker before it reached them. +pub fn inc_aggregator_skipped_other(count: u64) { + LEAN_AGGREGATOR_SKIPPED_TOTAL + .with_label_values(&["other"]) + .inc_by(count); +} + /// Update a table byte size gauge. pub fn update_table_bytes(table_name: &str, bytes: u64) { LEAN_TABLE_BYTES