diff --git a/src/RockBot.Wisp/SpawnWispsExecutor.cs b/src/RockBot.Wisp/SpawnWispsExecutor.cs index 5b29444..dffb3c9 100644 --- a/src/RockBot.Wisp/SpawnWispsExecutor.cs +++ b/src/RockBot.Wisp/SpawnWispsExecutor.cs @@ -20,6 +20,7 @@ internal sealed class SpawnWispsExecutor( IWorkingMemory workingMemory, WispOptions options, ILogger logger, + WispDispatchCircuitBreaker? circuitBreaker = null, ISkillStore? skillStore = null, ISkillUsageStore? skillUsageStore = null) : IToolExecutor { @@ -99,6 +100,30 @@ private async Task ExecuteOneAsync( var defHash = ComputeDefinitionHash(defJson); var shapeHash = ComputeShapeHash(definition); + // Circuit breaker: refuse to run a wisp whose exact definition has been + // dispatched too many times in the recent window. This is the only guard + // that spans agent-loop invocations / scheduled re-fires / message loops — + // the per-loop repetitive-call detector cannot see across them. A tripped + // dispatch does NO work (no LLM, no tool calls) and is NOT written to the + // execution log, so a runaway collapses into a cheap, self-limiting error + // instead of an unbounded spin. The refusal message is intentionally stable + // (no counts/ids) so the agent-loop's repetitive-result detector also sees + // it as identical and nudges the model off the loop. + if (circuitBreaker is not null) + { + var decision = circuitBreaker.Admit(defHash); + if (!decision.Allowed) + { + logger.LogWarning( + "Wisp dispatch circuit breaker tripped for definition {DefHash} ('{Description}'): " + + "{Count} dispatches within {WindowMinutes:F0}m (limit {Limit}). Refusing to run; session={Session}.", + defHash, definition.Description, decision.Count, decision.Window.TotalMinutes, + options.DispatchCircuitBreakerMaxPerWindow, sessionId ?? "(none)"); + WispDiagnostics.CircuitBreakerTrips.Add(1); + return BuildCircuitBreakerResult(wispId, definition); + } + } + var result = await wispExecutor.ExecuteAsync(definition, wispId, parentSessionId: sessionId, ct); // Log execution (fire-and-forget, don't block the batch) @@ -356,6 +381,39 @@ private async Task WriteBatchSummaryAsync(WispBatchResult batch, CancellationTok } } + /// + /// Builds the synthetic failed result returned when the circuit breaker refuses a + /// dispatch. The error message is stable (no counts or ids) so identical refusals + /// look identical to the agent-loop's repetitive-result detector, which then nudges + /// the model to stop. Categorised — the wisp + /// definition itself isn't malformed; it's being dispatched too aggressively. + /// + private static WispExecutionResult BuildCircuitBreakerResult(string wispId, WispDefinition definition) => + new() + { + WispId = wispId, + IsSuccess = false, + Duration = TimeSpan.Zero, + Definition = definition, + StepResults = + [ + new WispStepResult + { + StepId = "circuit-breaker", + StepIndex = 0, + IsSuccess = false, + Duration = TimeSpan.Zero, + Error = new WispStepError + { + Category = FailureCategory.External, + Message = "Circuit breaker: this exact wisp definition has been dispatched too many " + + "times in a short window and is temporarily blocked. Stop re-running it — " + + "fix the underlying problem or take a different approach." + } + } + ] + }; + private static IReadOnlyList? ParseDefinitions( Dictionary args, out string? error) { diff --git a/src/RockBot.Wisp/WispDiagnostics.cs b/src/RockBot.Wisp/WispDiagnostics.cs new file mode 100644 index 0000000..0f8e50b --- /dev/null +++ b/src/RockBot.Wisp/WispDiagnostics.cs @@ -0,0 +1,24 @@ +using System.Diagnostics.Metrics; + +namespace RockBot.Wisp; + +/// +/// Diagnostics instrumentation for the wisp subsystem. Uses a BCL , +/// which is zero-cost when no listener is attached. +/// +public static class WispDiagnostics +{ + public const string MeterName = "RockBot.Wisp"; + + public static readonly Meter Meter = new(MeterName); + + /// + /// Incremented each time refuses a dispatch. + /// A non-zero rate is the signature of a runaway re-dispatch loop — worth alerting on. + /// + public static readonly Counter CircuitBreakerTrips = + Meter.CreateCounter( + "rockbot.wisp.circuit_breaker.trips", + unit: "{trip}", + description: "Number of wisp dispatches refused by the dispatch circuit breaker"); +} diff --git a/src/RockBot.Wisp/WispDispatchCircuitBreaker.cs b/src/RockBot.Wisp/WispDispatchCircuitBreaker.cs new file mode 100644 index 0000000..abfc6df --- /dev/null +++ b/src/RockBot.Wisp/WispDispatchCircuitBreaker.cs @@ -0,0 +1,93 @@ +namespace RockBot.Wisp; + +/// +/// Process-wide guard against a runaway re-dispatch of the same wisp. Every +/// wisp flows through , which is the only choke point +/// that survives across agent-loop invocations, completion-eval reprompts, scheduled +/// re-fires, and message/A2A re-triggers — none of which the per-loop +/// RepetitiveToolCallDetector can see, since it is rebuilt fresh per +/// AgentLoopRunner.RunAsync and dies with that loop. +/// +/// The breaker keeps a fixed-window dispatch count keyed by the exact definition +/// hash. When the same definition is dispatched more than +/// times within +/// , further dispatches of that +/// definition are refused until the window rolls over. Keying on the exact definition +/// hash (not the value-stripped shape hash) means legitimately-varying wisps — same +/// shape, different dates/ids — never trip it; only a truly identical dispatch loop +/// does. A fixed window keeps memory at O(distinct definitions) regardless of dispatch +/// rate, which matters precisely in the runaway case the breaker exists to contain. +/// +public sealed class WispDispatchCircuitBreaker +{ + private readonly WispOptions _options; + private readonly TimeProvider _clock; + private readonly object _gate = new(); + private readonly Dictionary _windows = new(StringComparer.Ordinal); + + /// Idle-definition sweep runs when the map grows past this many entries. + private const int SweepThreshold = 1_024; + + private struct Window + { + public DateTimeOffset Start; + public int Count; + } + + /// The outcome of an check. + /// False when the breaker is tripped and the dispatch must be refused. + /// Dispatch count for this definition within the current window (including this attempt). + /// The rolling window the count is measured over. + public readonly record struct Decision(bool Allowed, int Count, TimeSpan Window); + + public WispDispatchCircuitBreaker(WispOptions options, TimeProvider? timeProvider = null) + { + _options = options; + _clock = timeProvider ?? TimeProvider.System; + } + + /// + /// Records a dispatch attempt for and reports + /// whether it is permitted. Every attempt is counted (so a sustained runaway stays + /// tripped for the rest of the window); only the count crossing the threshold flips + /// to false. + /// + public Decision Admit(string definitionHash) + { + var window = _options.DispatchCircuitBreakerWindow; + var max = _options.DispatchCircuitBreakerMaxPerWindow; + + if (!_options.DispatchCircuitBreakerEnabled || max <= 0 || window <= TimeSpan.Zero + || string.IsNullOrEmpty(definitionHash)) + return new Decision(Allowed: true, Count: 0, Window: window); + + var now = _clock.GetUtcNow(); + + lock (_gate) + { + if (!_windows.TryGetValue(definitionHash, out var w) || now - w.Start >= window) + w = new Window { Start = now, Count = 0 }; + + // A fixed window resets every `window`, so Count stays bounded by + // (dispatch rate × window) and cannot realistically overflow. + w.Count++; + _windows[definitionHash] = w; + + if (_windows.Count > SweepThreshold) + SweepExpired(now, window); + + return new Decision(Allowed: w.Count <= max, Count: w.Count, Window: window); + } + } + + /// Drops definitions whose window has fully expired. Caller holds . + private void SweepExpired(DateTimeOffset now, TimeSpan window) + { + var stale = new List(); + foreach (var (hash, w) in _windows) + if (now - w.Start >= window) + stale.Add(hash); + foreach (var hash in stale) + _windows.Remove(hash); + } +} diff --git a/src/RockBot.Wisp/WispOptions.cs b/src/RockBot.Wisp/WispOptions.cs index 6a7837f..2d76b35 100644 --- a/src/RockBot.Wisp/WispOptions.cs +++ b/src/RockBot.Wisp/WispOptions.cs @@ -44,6 +44,33 @@ public sealed class WispOptions /// public string ScheduledTaskSessionPrefix { get; set; } = "patrol/"; + /// + /// When true, the refuses to run a wisp + /// whose exact definition has been dispatched more than + /// times within + /// . Guards against runaway re-dispatch + /// loops that the per-agent-loop repetitive-call detector cannot see (it is rebuilt + /// per RunAsync and dies with the loop, so it never spans reprompts, scheduled + /// re-fires, or message/A2A re-triggers). + /// + public bool DispatchCircuitBreakerEnabled { get; set; } = true; + + /// + /// Rolling window over which identical wisp dispatches are counted by the + /// . + /// + public TimeSpan DispatchCircuitBreakerWindow { get; set; } = TimeSpan.FromMinutes(5); + + /// + /// Maximum dispatches of the same exact wisp definition permitted within + /// before the breaker trips for that + /// definition. Deliberately conservative: no legitimate workflow re-runs a byte- + /// identical wisp this many times in a few minutes, so the breaker only ever fires + /// on a genuine runaway. Keyed on the exact definition hash, so wisps that vary by + /// date/id (different definition hash) are unaffected. + /// + public int DispatchCircuitBreakerMaxPerWindow { get; set; } = 30; + /// /// How long a Direct MCP step waits for the MCP management tools to register /// (event-gated on the first McpServersIndexed message from the bridge) diff --git a/src/RockBot.Wisp/WispServiceCollectionExtensions.cs b/src/RockBot.Wisp/WispServiceCollectionExtensions.cs index 41e3b8e..1d7b836 100644 --- a/src/RockBot.Wisp/WispServiceCollectionExtensions.cs +++ b/src/RockBot.Wisp/WispServiceCollectionExtensions.cs @@ -20,6 +20,7 @@ public static AgentHostBuilder AddWisps( configure?.Invoke(options); builder.Services.AddSingleton(options); + builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => (IPrunableLog)sp.GetRequiredService()); diff --git a/src/RockBot.Wisp/WispToolRegistrar.cs b/src/RockBot.Wisp/WispToolRegistrar.cs index b3dbcba..d3eb21a 100644 --- a/src/RockBot.Wisp/WispToolRegistrar.cs +++ b/src/RockBot.Wisp/WispToolRegistrar.cs @@ -15,6 +15,7 @@ internal sealed class WispToolRegistrar( WispOptions options, ILoggerFactory loggerFactory, ILogger logger, + WispDispatchCircuitBreaker circuitBreaker, IWispExecutionLog? executionLog = null, IFeedbackStore? feedbackStore = null, ISkillStore? skillStore = null, @@ -118,7 +119,7 @@ tool call will fail — check your path against the upstream step's output. ParametersSchema = SpawnWispsSchema, Source = "wisp" }, new SpawnWispsExecutor(wispExecutor, executionLog, feedbackStore, workingMemory, options, - loggerFactory.CreateLogger(), skillStore, skillUsageStore)); + loggerFactory.CreateLogger(), circuitBreaker, skillStore, skillUsageStore)); logger.LogInformation("Registered tool: spawn_wisps"); return Task.CompletedTask; diff --git a/tests/RockBot.Wisp.Tests/WispDispatchCircuitBreakerTests.cs b/tests/RockBot.Wisp.Tests/WispDispatchCircuitBreakerTests.cs new file mode 100644 index 0000000..37efbaa --- /dev/null +++ b/tests/RockBot.Wisp.Tests/WispDispatchCircuitBreakerTests.cs @@ -0,0 +1,117 @@ +using RockBot.Wisp; + +namespace RockBot.Wisp.Tests; + +[TestClass] +public class WispDispatchCircuitBreakerTests +{ + private static readonly DateTimeOffset Start = new(2026, 6, 5, 12, 0, 0, TimeSpan.Zero); + + private static (WispDispatchCircuitBreaker Breaker, TestClock Clock) NewBreaker( + int max = 3, int windowMinutes = 5, bool enabled = true) + { + var options = new WispOptions + { + DispatchCircuitBreakerEnabled = enabled, + DispatchCircuitBreakerMaxPerWindow = max, + DispatchCircuitBreakerWindow = TimeSpan.FromMinutes(windowMinutes), + }; + var clock = new TestClock(Start); + return (new WispDispatchCircuitBreaker(options, clock), clock); + } + + [TestMethod] + public void Admit_UpToLimit_AllAllowed_ThenTrips() + { + var (breaker, _) = NewBreaker(max: 3); + + for (var i = 1; i <= 3; i++) + { + var d = breaker.Admit("hash-a"); + Assert.IsTrue(d.Allowed, $"Dispatch {i} (≤ limit) must be allowed"); + Assert.AreEqual(i, d.Count); + } + + var tripped = breaker.Admit("hash-a"); + Assert.IsFalse(tripped.Allowed, "The dispatch that crosses the limit must be refused"); + Assert.AreEqual(4, tripped.Count); + } + + [TestMethod] + public void Admit_StaysTrippedForRestOfWindow() + { + var (breaker, clock) = NewBreaker(max: 2, windowMinutes: 5); + + breaker.Admit("h"); + breaker.Admit("h"); + Assert.IsFalse(breaker.Admit("h").Allowed); + + // Still inside the window a few minutes later → still refused. + clock.Advance(TimeSpan.FromMinutes(4)); + Assert.IsFalse(breaker.Admit("h").Allowed, "Must remain tripped until the window rolls over"); + } + + [TestMethod] + public void Admit_AfterWindowExpiry_ResetsAndAllows() + { + var (breaker, clock) = NewBreaker(max: 2, windowMinutes: 5); + + breaker.Admit("h"); + breaker.Admit("h"); + Assert.IsFalse(breaker.Admit("h").Allowed); + + clock.Advance(TimeSpan.FromMinutes(5)); // window fully elapsed + var afterReset = breaker.Admit("h"); + Assert.IsTrue(afterReset.Allowed, "A new window must allow dispatches again"); + Assert.AreEqual(1, afterReset.Count, "Count must reset at the window boundary"); + } + + [TestMethod] + public void Admit_DifferentDefinitions_AreCountedIndependently() + { + var (breaker, _) = NewBreaker(max: 2); + + breaker.Admit("a"); + breaker.Admit("a"); + Assert.IsFalse(breaker.Admit("a").Allowed, "'a' should be tripped"); + + // A different definition hash has its own budget. + Assert.IsTrue(breaker.Admit("b").Allowed, "'b' must be unaffected by 'a' tripping"); + Assert.IsTrue(breaker.Admit("b").Allowed); + Assert.IsFalse(breaker.Admit("b").Allowed, "'b' trips on its own threshold"); + } + + [TestMethod] + public void Admit_WhenDisabled_AlwaysAllows() + { + var (breaker, _) = NewBreaker(max: 1, enabled: false); + + for (var i = 0; i < 100; i++) + Assert.IsTrue(breaker.Admit("h").Allowed, "Disabled breaker must never refuse"); + } + + [TestMethod] + public void Admit_WithNonPositiveLimit_AlwaysAllows() + { + var (breaker, _) = NewBreaker(max: 0); + + for (var i = 0; i < 100; i++) + Assert.IsTrue(breaker.Admit("h").Allowed, "A non-positive limit disables the breaker"); + } + + [TestMethod] + public void Admit_EmptyHash_AlwaysAllows() + { + var (breaker, _) = NewBreaker(max: 1); + + Assert.IsTrue(breaker.Admit("").Allowed); + Assert.IsTrue(breaker.Admit("").Allowed, "An empty/unknown definition hash is never gated"); + } + + private sealed class TestClock(DateTimeOffset start) : TimeProvider + { + private DateTimeOffset _now = start; + public override DateTimeOffset GetUtcNow() => _now; + public void Advance(TimeSpan by) => _now = _now.Add(by); + } +}