Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/RockBot.Wisp/SpawnWispsExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ internal sealed class SpawnWispsExecutor(
IWorkingMemory workingMemory,
WispOptions options,
ILogger<SpawnWispsExecutor> logger,
WispDispatchCircuitBreaker? circuitBreaker = null,
ISkillStore? skillStore = null,
ISkillUsageStore? skillUsageStore = null) : IToolExecutor
{
Expand Down Expand Up @@ -99,6 +100,30 @@ private async Task<WispExecutionResult> ExecuteOneAsync(
var defHash = ComputeDefinitionHash(defJson);
var shapeHash = ComputeShapeHash(definition);

// Circuit breaker: refuse to run a wisp whose exact definition has been
// dispatched too many times in the recent window. This is the only guard
// that spans agent-loop invocations / scheduled re-fires / message loops —
// the per-loop repetitive-call detector cannot see across them. A tripped
// dispatch does NO work (no LLM, no tool calls) and is NOT written to the
// execution log, so a runaway collapses into a cheap, self-limiting error
// instead of an unbounded spin. The refusal message is intentionally stable
// (no counts/ids) so the agent-loop's repetitive-result detector also sees
// it as identical and nudges the model off the loop.
if (circuitBreaker is not null)
{
var decision = circuitBreaker.Admit(defHash);
if (!decision.Allowed)
{
logger.LogWarning(
"Wisp dispatch circuit breaker tripped for definition {DefHash} ('{Description}'): " +
"{Count} dispatches within {WindowMinutes:F0}m (limit {Limit}). Refusing to run; session={Session}.",
defHash, definition.Description, decision.Count, decision.Window.TotalMinutes,
options.DispatchCircuitBreakerMaxPerWindow, sessionId ?? "(none)");
WispDiagnostics.CircuitBreakerTrips.Add(1);
return BuildCircuitBreakerResult(wispId, definition);
}
}

var result = await wispExecutor.ExecuteAsync(definition, wispId, parentSessionId: sessionId, ct);

// Log execution (fire-and-forget, don't block the batch)
Expand Down Expand Up @@ -356,6 +381,39 @@ private async Task WriteBatchSummaryAsync(WispBatchResult batch, CancellationTok
}
}

/// <summary>
/// Builds the synthetic failed result returned when the circuit breaker refuses a
/// dispatch. The error message is stable (no counts or ids) so identical refusals
/// look identical to the agent-loop's repetitive-result detector, which then nudges
/// the model to stop. Categorised <see cref="FailureCategory.External"/> — the wisp
/// definition itself isn't malformed; it's being dispatched too aggressively.
/// </summary>
private static WispExecutionResult BuildCircuitBreakerResult(string wispId, WispDefinition definition) =>
new()
{
WispId = wispId,
IsSuccess = false,
Duration = TimeSpan.Zero,
Definition = definition,
StepResults =
[
new WispStepResult
{
StepId = "circuit-breaker",
StepIndex = 0,
IsSuccess = false,
Duration = TimeSpan.Zero,
Error = new WispStepError
{
Category = FailureCategory.External,
Message = "Circuit breaker: this exact wisp definition has been dispatched too many " +
"times in a short window and is temporarily blocked. Stop re-running it — " +
"fix the underlying problem or take a different approach."
}
}
]
};

private static IReadOnlyList<WispDefinition>? ParseDefinitions(
Dictionary<string, JsonElement> args, out string? error)
{
Expand Down
24 changes: 24 additions & 0 deletions src/RockBot.Wisp/WispDiagnostics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Diagnostics.Metrics;

namespace RockBot.Wisp;

/// <summary>
/// Diagnostics instrumentation for the wisp subsystem. Uses a BCL <see cref="Meter"/>,
/// which is zero-cost when no listener is attached.
/// </summary>
public static class WispDiagnostics
{
public const string MeterName = "RockBot.Wisp";

public static readonly Meter Meter = new(MeterName);

/// <summary>
/// Incremented each time <see cref="WispDispatchCircuitBreaker"/> refuses a dispatch.
/// A non-zero rate is the signature of a runaway re-dispatch loop — worth alerting on.
/// </summary>
public static readonly Counter<long> CircuitBreakerTrips =
Meter.CreateCounter<long>(
"rockbot.wisp.circuit_breaker.trips",
unit: "{trip}",
description: "Number of wisp dispatches refused by the dispatch circuit breaker");
}
93 changes: 93 additions & 0 deletions src/RockBot.Wisp/WispDispatchCircuitBreaker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
namespace RockBot.Wisp;

/// <summary>
/// Process-wide guard against a runaway re-dispatch of the <em>same</em> wisp. Every
/// wisp flows through <see cref="SpawnWispsExecutor"/>, which is the only choke point
/// that survives across agent-loop invocations, completion-eval reprompts, scheduled
/// re-fires, and message/A2A re-triggers — none of which the per-loop
/// <c>RepetitiveToolCallDetector</c> can see, since it is rebuilt fresh per
/// <c>AgentLoopRunner.RunAsync</c> and dies with that loop.
///
/// <para>The breaker keeps a fixed-window dispatch count keyed by the exact definition
/// hash. When the same definition is dispatched more than
/// <see cref="WispOptions.DispatchCircuitBreakerMaxPerWindow"/> times within
/// <see cref="WispOptions.DispatchCircuitBreakerWindow"/>, further dispatches of that
/// definition are refused until the window rolls over. Keying on the exact definition
/// hash (not the value-stripped shape hash) means legitimately-varying wisps — same
/// shape, different dates/ids — never trip it; only a truly identical dispatch loop
/// does. A fixed window keeps memory at O(distinct definitions) regardless of dispatch
/// rate, which matters precisely in the runaway case the breaker exists to contain.</para>
/// </summary>
public sealed class WispDispatchCircuitBreaker
{
private readonly WispOptions _options;
private readonly TimeProvider _clock;
private readonly object _gate = new();
private readonly Dictionary<string, Window> _windows = new(StringComparer.Ordinal);

/// <summary>Idle-definition sweep runs when the map grows past this many entries.</summary>
private const int SweepThreshold = 1_024;

private struct Window
{
public DateTimeOffset Start;
public int Count;
}

/// <summary>The outcome of an <see cref="Admit"/> check.</summary>
/// <param name="Allowed">False when the breaker is tripped and the dispatch must be refused.</param>
/// <param name="Count">Dispatch count for this definition within the current window (including this attempt).</param>
/// <param name="Window">The rolling window the count is measured over.</param>
public readonly record struct Decision(bool Allowed, int Count, TimeSpan Window);

public WispDispatchCircuitBreaker(WispOptions options, TimeProvider? timeProvider = null)
{
_options = options;
_clock = timeProvider ?? TimeProvider.System;
}

/// <summary>
/// Records a dispatch attempt for <paramref name="definitionHash"/> and reports
/// whether it is permitted. Every attempt is counted (so a sustained runaway stays
/// tripped for the rest of the window); only the count crossing the threshold flips
/// <see cref="Decision.Allowed"/> to false.
/// </summary>
public Decision Admit(string definitionHash)
{
var window = _options.DispatchCircuitBreakerWindow;
var max = _options.DispatchCircuitBreakerMaxPerWindow;

if (!_options.DispatchCircuitBreakerEnabled || max <= 0 || window <= TimeSpan.Zero
|| string.IsNullOrEmpty(definitionHash))
return new Decision(Allowed: true, Count: 0, Window: window);

var now = _clock.GetUtcNow();

lock (_gate)
{
if (!_windows.TryGetValue(definitionHash, out var w) || now - w.Start >= window)
w = new Window { Start = now, Count = 0 };

// A fixed window resets every `window`, so Count stays bounded by
// (dispatch rate × window) and cannot realistically overflow.
w.Count++;
_windows[definitionHash] = w;

if (_windows.Count > SweepThreshold)
SweepExpired(now, window);

return new Decision(Allowed: w.Count <= max, Count: w.Count, Window: window);
}
}

/// <summary>Drops definitions whose window has fully expired. Caller holds <see cref="_gate"/>.</summary>
private void SweepExpired(DateTimeOffset now, TimeSpan window)
{
var stale = new List<string>();
foreach (var (hash, w) in _windows)
if (now - w.Start >= window)
stale.Add(hash);
foreach (var hash in stale)
_windows.Remove(hash);
}
}
27 changes: 27 additions & 0 deletions src/RockBot.Wisp/WispOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,33 @@ public sealed class WispOptions
/// </summary>
public string ScheduledTaskSessionPrefix { get; set; } = "patrol/";

/// <summary>
/// When true, the <see cref="WispDispatchCircuitBreaker"/> refuses to run a wisp
/// whose exact definition has been dispatched more than
/// <see cref="DispatchCircuitBreakerMaxPerWindow"/> times within
/// <see cref="DispatchCircuitBreakerWindow"/>. Guards against runaway re-dispatch
/// loops that the per-agent-loop repetitive-call detector cannot see (it is rebuilt
/// per RunAsync and dies with the loop, so it never spans reprompts, scheduled
/// re-fires, or message/A2A re-triggers).
/// </summary>
public bool DispatchCircuitBreakerEnabled { get; set; } = true;

/// <summary>
/// Rolling window over which identical wisp dispatches are counted by the
/// <see cref="WispDispatchCircuitBreaker"/>.
/// </summary>
public TimeSpan DispatchCircuitBreakerWindow { get; set; } = TimeSpan.FromMinutes(5);

/// <summary>
/// Maximum dispatches of the <em>same exact</em> wisp definition permitted within
/// <see cref="DispatchCircuitBreakerWindow"/> before the breaker trips for that
/// definition. Deliberately conservative: no legitimate workflow re-runs a byte-
/// identical wisp this many times in a few minutes, so the breaker only ever fires
/// on a genuine runaway. Keyed on the exact definition hash, so wisps that vary by
/// date/id (different definition hash) are unaffected.
/// </summary>
public int DispatchCircuitBreakerMaxPerWindow { get; set; } = 30;

/// <summary>
/// How long a Direct MCP step waits for the MCP management tools to register
/// (event-gated on the first <c>McpServersIndexed</c> message from the bridge)
Expand Down
1 change: 1 addition & 0 deletions src/RockBot.Wisp/WispServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static AgentHostBuilder AddWisps(
configure?.Invoke(options);
builder.Services.AddSingleton(options);

builder.Services.AddSingleton<WispDispatchCircuitBreaker>();
builder.Services.AddSingleton<WispExecutor>();
builder.Services.AddSingleton<IWispExecutionLog, FileWispExecutionLog>();
builder.Services.AddSingleton<IPrunableLog>(sp => (IPrunableLog)sp.GetRequiredService<IWispExecutionLog>());
Expand Down
3 changes: 2 additions & 1 deletion src/RockBot.Wisp/WispToolRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal sealed class WispToolRegistrar(
WispOptions options,
ILoggerFactory loggerFactory,
ILogger<WispToolRegistrar> logger,
WispDispatchCircuitBreaker circuitBreaker,
IWispExecutionLog? executionLog = null,
IFeedbackStore? feedbackStore = null,
ISkillStore? skillStore = null,
Expand Down Expand Up @@ -118,7 +119,7 @@ tool call will fail — check your path against the upstream step's output.
ParametersSchema = SpawnWispsSchema,
Source = "wisp"
}, new SpawnWispsExecutor(wispExecutor, executionLog, feedbackStore, workingMemory, options,
loggerFactory.CreateLogger<SpawnWispsExecutor>(), skillStore, skillUsageStore));
loggerFactory.CreateLogger<SpawnWispsExecutor>(), circuitBreaker, skillStore, skillUsageStore));
logger.LogInformation("Registered tool: spawn_wisps");

return Task.CompletedTask;
Expand Down
117 changes: 117 additions & 0 deletions tests/RockBot.Wisp.Tests/WispDispatchCircuitBreakerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using RockBot.Wisp;

namespace RockBot.Wisp.Tests;

[TestClass]
public class WispDispatchCircuitBreakerTests
{
private static readonly DateTimeOffset Start = new(2026, 6, 5, 12, 0, 0, TimeSpan.Zero);

private static (WispDispatchCircuitBreaker Breaker, TestClock Clock) NewBreaker(
int max = 3, int windowMinutes = 5, bool enabled = true)
{
var options = new WispOptions
{
DispatchCircuitBreakerEnabled = enabled,
DispatchCircuitBreakerMaxPerWindow = max,
DispatchCircuitBreakerWindow = TimeSpan.FromMinutes(windowMinutes),
};
var clock = new TestClock(Start);
return (new WispDispatchCircuitBreaker(options, clock), clock);
}

[TestMethod]
public void Admit_UpToLimit_AllAllowed_ThenTrips()
{
var (breaker, _) = NewBreaker(max: 3);

for (var i = 1; i <= 3; i++)
{
var d = breaker.Admit("hash-a");
Assert.IsTrue(d.Allowed, $"Dispatch {i} (≤ limit) must be allowed");
Assert.AreEqual(i, d.Count);
}

var tripped = breaker.Admit("hash-a");
Assert.IsFalse(tripped.Allowed, "The dispatch that crosses the limit must be refused");
Assert.AreEqual(4, tripped.Count);
}

[TestMethod]
public void Admit_StaysTrippedForRestOfWindow()
{
var (breaker, clock) = NewBreaker(max: 2, windowMinutes: 5);

breaker.Admit("h");
breaker.Admit("h");
Assert.IsFalse(breaker.Admit("h").Allowed);

// Still inside the window a few minutes later → still refused.
clock.Advance(TimeSpan.FromMinutes(4));
Assert.IsFalse(breaker.Admit("h").Allowed, "Must remain tripped until the window rolls over");
}

[TestMethod]
public void Admit_AfterWindowExpiry_ResetsAndAllows()
{
var (breaker, clock) = NewBreaker(max: 2, windowMinutes: 5);

breaker.Admit("h");
breaker.Admit("h");
Assert.IsFalse(breaker.Admit("h").Allowed);

clock.Advance(TimeSpan.FromMinutes(5)); // window fully elapsed
var afterReset = breaker.Admit("h");
Assert.IsTrue(afterReset.Allowed, "A new window must allow dispatches again");
Assert.AreEqual(1, afterReset.Count, "Count must reset at the window boundary");
}

[TestMethod]
public void Admit_DifferentDefinitions_AreCountedIndependently()
{
var (breaker, _) = NewBreaker(max: 2);

breaker.Admit("a");
breaker.Admit("a");
Assert.IsFalse(breaker.Admit("a").Allowed, "'a' should be tripped");

// A different definition hash has its own budget.
Assert.IsTrue(breaker.Admit("b").Allowed, "'b' must be unaffected by 'a' tripping");
Assert.IsTrue(breaker.Admit("b").Allowed);
Assert.IsFalse(breaker.Admit("b").Allowed, "'b' trips on its own threshold");
}

[TestMethod]
public void Admit_WhenDisabled_AlwaysAllows()
{
var (breaker, _) = NewBreaker(max: 1, enabled: false);

for (var i = 0; i < 100; i++)
Assert.IsTrue(breaker.Admit("h").Allowed, "Disabled breaker must never refuse");
}

[TestMethod]
public void Admit_WithNonPositiveLimit_AlwaysAllows()
{
var (breaker, _) = NewBreaker(max: 0);

for (var i = 0; i < 100; i++)
Assert.IsTrue(breaker.Admit("h").Allowed, "A non-positive limit disables the breaker");
}

[TestMethod]
public void Admit_EmptyHash_AlwaysAllows()
{
var (breaker, _) = NewBreaker(max: 1);

Assert.IsTrue(breaker.Admit("").Allowed);
Assert.IsTrue(breaker.Admit("").Allowed, "An empty/unknown definition hash is never gated");
}

private sealed class TestClock(DateTimeOffset start) : TimeProvider
{
private DateTimeOffset _now = start;
public override DateTimeOffset GetUtcNow() => _now;
public void Advance(TimeSpan by) => _now = _now.Add(by);
}
}
Loading