Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 67 additions & 13 deletions rust/fleet-data/src/toposort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
//!
//! Extracted from the byte-identical copies that previously lived in
//! `fleet-plan-tree::waves()` and `fleet-waves::waves()`. Behaviour is
//! preserved verbatim: missing predecessors collapse to level 0, indices
//! inside each wave are sorted ascending, and cycles in `depends_on` are
//! NOT handled (the recursive resolver will overflow the stack on a cycle).
//! Callers must ensure their plan.json is acyclic — the producer side
//! (Colony plan publisher) already guarantees this.
//! preserved verbatim for acyclic input: missing predecessors collapse to
//! level 0 and indices inside each wave are sorted ascending.
//!
//! Cycles in `depends_on` (from hand-edited, partially-written, or
//! schema-bug plan.json files) are broken at the back-edge: when the
//! resolver re-enters a node it is already visiting, that edge contributes
//! level 0 instead of recursing. This avoids the stack overflow that
//! previously crashed `fleet-plan-tree` and `fleet-waves` on malformed
//! input. The producer side (Colony plan publisher) still guarantees
//! acyclic graphs in practice; this is dashboard-side defence in depth.

use crate::plan::Subtask;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

/// Assign each subtask to a wave such that every `depends_on` predecessor
/// sits in a strictly lower wave. Returns `out[level] = Vec<subtask_index>`
Expand All @@ -19,7 +24,8 @@ pub fn waves(subtasks: &[Subtask]) -> Vec<Vec<u32>> {
let by_idx: HashMap<u32, &Subtask> =
subtasks.iter().map(|s| (s.subtask_index, s)).collect();
for s in subtasks {
resolve(s.subtask_index, &by_idx, &mut level);
let mut visiting: HashSet<u32> = HashSet::new();
resolve(s.subtask_index, &by_idx, &mut level, &mut visiting);
}
let max = level.values().copied().max().unwrap_or(0);
let mut out: Vec<Vec<u32>> = (0..=max).map(|_| Vec::new()).collect();
Expand All @@ -31,14 +37,27 @@ pub fn waves(subtasks: &[Subtask]) -> Vec<Vec<u32>> {
out
}

fn resolve(idx: u32, by: &HashMap<u32, &Subtask>, memo: &mut HashMap<u32, u32>) -> u32 {
fn resolve(
idx: u32,
by: &HashMap<u32, &Subtask>,
memo: &mut HashMap<u32, u32>,
visiting: &mut HashSet<u32>,
) -> u32 {
if let Some(&v) = memo.get(&idx) {
return v;
}
// Cycle guard: if we are already resolving this node further up the
// recursion stack, treat the back-edge as contributing level 0 instead
// of recursing into ourselves. The outer call will memoise a real
// level once the rest of the dependencies resolve.
if !visiting.insert(idx) {
return 0;
}
let s = match by.get(&idx) {
Some(s) => s,
None => {
memo.insert(idx, 0);
visiting.remove(&idx);
return 0;
}
};
Expand All @@ -47,12 +66,13 @@ fn resolve(idx: u32, by: &HashMap<u32, &Subtask>, memo: &mut HashMap<u32, u32>)
} else {
s.depends_on
.iter()
.map(|d| resolve(*d, by, memo))
.map(|d| resolve(*d, by, memo, visiting))
.max()
.unwrap_or(0)
+ 1
};
memo.insert(idx, lvl);
visiting.remove(&idx);
lvl
}

Expand Down Expand Up @@ -137,8 +157,42 @@ mod tests {
assert_eq!(out, vec![vec![99], vec![1]]);
}

// NOTE: cycles in `depends_on` are NOT handled — the recursive resolver
// overflows the stack. Adding an executable cycle test would crash the
// test binary, so the invariant is documented at the module level
// instead. Producers (Colony plan publisher) guarantee acyclic input.
#[test]
fn cycle_broken_to_level_zero() {
// A -> B -> A cycle. Before the fix this overflowed the stack and
// crashed fleet-plan-tree / fleet-waves. After the fix the
// back-edge is broken so both nodes resolve to a sane finite
// level instead of recursing forever. Exact levels depend on
// which node the outer loop visits first (one node will see the
// broken back-edge contribute 0 and land in wave 1; the other
// sees that as its dep and lands in wave 2). The important
// invariants are: (a) no panic / overflow, (b) both nodes
// present, (c) wave count stays small and bounded by the node
// count.
let plan = vec![st(0, vec![1]), st(1, vec![0])];
let out = waves(&plan);
let flat: Vec<u32> = out.iter().flatten().copied().collect();
assert!(flat.contains(&0), "node 0 missing from waves: {out:?}");
assert!(flat.contains(&1), "node 1 missing from waves: {out:?}");
assert!(
out.len() <= plan.len() + 1,
"cycle produced {} waves for {} nodes: {out:?}",
out.len(),
plan.len()
);
}

#[test]
fn self_cycle_broken() {
// A -> A self-loop. The visiting guard fires on re-entry so the
// depends_on iteration sees a 0 from the broken back-edge and
// the node lands in wave 1 (max(0) + 1) without recursing
// forever. The critical assertion is no overflow + node 0 is
// still emitted exactly once.
let plan = vec![st(0, vec![0])];
let out = waves(&plan);
let flat: Vec<u32> = out.iter().flatten().copied().collect();
assert_eq!(flat, vec![0], "self-cycle should still emit node 0 exactly once: {out:?}");
assert!(out.len() <= 2, "self-cycle produced {} waves: {out:?}", out.len());
}
}