Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/blockchain/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,14 @@ pub(crate) fn run_aggregation_worker(
let mut groups_aggregated = 0usize;
let mut total_raw_sigs = 0usize;
let mut total_children = 0usize;
let jobs_total = snapshot.jobs.len();
let mut jobs_attempted = 0usize;

for job in snapshot.jobs {
if cancel.is_cancelled() {
break;
}
jobs_attempted += 1;

let slot = job.slot;
let raw_sigs = job.raw_ids.len();
Expand Down Expand Up @@ -450,6 +453,13 @@ pub(crate) fn run_aggregation_worker(
}
}

// Jobs the loop never reached (deadline cancellation or actor gone) are
// skipped aggregation submissions per leanMetrics.
let jobs_dropped = jobs_total - jobs_attempted;
if jobs_dropped > 0 {
metrics::inc_aggregator_skipped_other(jobs_dropped as u64);
}

let _ = actor.send(AggregationDone {
session_id,
groups_considered,
Expand Down
13 changes: 10 additions & 3 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,16 @@ impl BlockChainServer {
proposer_validator_id.is_some(),
);

if interval == 2 && is_aggregator {
coverage::emit_agg_start_new_coverage(&self.store, self.attestation_committee_count);
self.start_aggregation_session(slot, ctx).await;
if interval == 2 {
if is_aggregator {
coverage::emit_agg_start_new_coverage(
&self.store,
self.attestation_committee_count,
);
self.start_aggregation_session(slot, ctx).await;
} else {
metrics::inc_aggregator_skipped_not_aggregator();
}
}

// Now build and publish the block (after attestations have been accepted)
Expand Down
51 changes: 51 additions & 0 deletions crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,33 @@ static LEAN_NODE_SYNC_STATUS: std::sync::LazyLock<IntGaugeVec> = std::sync::Lazy
register_int_gauge_vec!("lean_node_sync_status", "Node sync status", &["status"]).unwrap()
});

// --- Aggregator Skips ---

/// Cross-client label set for `lean_aggregator_skipped_total` (leanMetrics).
///
/// `not_synced`, `missing_state` and `spawn_failed` never fire in ethlambda
/// today: aggregation is not gated on sync status, needs no per-target
/// pre-state resolution, and the `spawn_blocking` worker cannot fail to
/// start. They are seeded at zero so fleet-wide dashboards see the full
/// series.
const AGGREGATOR_SKIP_REASONS: &[&str] = &[
"not_aggregator",
"not_synced",
"missing_state",
"spawn_failed",
"other",
];

static LEAN_AGGREGATOR_SKIPPED_TOTAL: std::sync::LazyLock<IntCounterVec> =
std::sync::LazyLock::new(|| {
register_int_counter_vec!(
"lean_aggregator_skipped_total",
"Aggregation submissions skipped, by reason",
&["reason"]
)
.unwrap()
});

// --- Initialization ---

/// Register all metrics with the Prometheus registry so they appear in `/metrics` from startup.
Expand Down Expand Up @@ -588,6 +615,13 @@ pub fn init() {
std::sync::LazyLock::force(&LEAN_BLOCK_PROPOSAL_AGGREGATES_SELECTED);
// Sync status
std::sync::LazyLock::force(&LEAN_NODE_SYNC_STATUS);
// Aggregator skip counter: instantiate every cross-client reason so the
// full series is visible from startup, including reasons ethlambda
// never fires.
std::sync::LazyLock::force(&LEAN_AGGREGATOR_SKIPPED_TOTAL);
for &reason in AGGREGATOR_SKIP_REASONS {
LEAN_AGGREGATOR_SKIPPED_TOTAL.with_label_values(&[reason]);
}
}

// --- Public API ---
Expand Down Expand Up @@ -725,6 +759,23 @@ pub fn observe_committee_signatures_aggregation(elapsed: std::time::Duration) {
LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS.observe(elapsed.as_secs_f64());
}

/// One aggregation cycle (interval-2 tick) skipped because this node has no
/// aggregation duty. Bookkeeping label that lets dashboards separate "no
/// duty" from genuine misses.
pub fn inc_aggregator_skipped_not_aggregator() {
LEAN_AGGREGATOR_SKIPPED_TOTAL
.with_label_values(&["not_aggregator"])
.inc();
}

/// Aggregation jobs dropped without being attempted, e.g. because the
/// session deadline cancelled the worker before it reached them.
pub fn inc_aggregator_skipped_other(count: u64) {
LEAN_AGGREGATOR_SKIPPED_TOTAL
.with_label_values(&["other"])
.inc_by(count);
}

/// Update a table byte size gauge.
pub fn update_table_bytes(table_name: &str, bytes: u64) {
LEAN_TABLE_BYTES
Expand Down
Loading