From 2e579cf89fd5d4b06f2c8f85bff71a016c169057 Mon Sep 17 00:00:00 2001 From: Peder Date: Thu, 11 Jun 2026 17:36:07 +0200 Subject: [PATCH] Support multi-server interceptor chains per SEP orchestration model The SEP chain execution pattern discovers interceptors from one or more MCP servers, merges them into a single chain sorted globally by priorityHint (alphabetical tie-break), and routes each interceptor/invoke to the server hosting it. The SDK previously ran each client's full chain sequentially, so cross-server priorities were ignored and each server's validations only saw its own mutations. - The orchestrator now operates on chain entries (descriptor + invoker), mirroring the SEP's ChainEntry, with one global sort across servers. - New public InterceptorChain/InterceptorChainEntry API: parallel fail-closed discovery via interceptors/list on every server, then merged execution. Duplicate names across servers are not deduplicated; each entry invokes on its own host. - InterceptorChainRunner (gateway + InterceptingMcpClient) now executes one merged chain instead of per-client sequential chains; single-client ExecuteChainAsync delegates to the same path unchanged. Behavior changes for multi-client gateways: validations from all servers run as one parallel batch after all mutations, sinks run once per phase, TimeoutMs bounds the whole merged chain, and any server's list failure fails the chain. Closes #15. Co-Authored-By: Claude Fable 5 --- csharp/sdk/CLAUDE.md | 4 +- csharp/sdk/README.md | 14 +- .../Client/InterceptorChain.cs | 96 +++++++ .../Client/InterceptorChainEntry.cs | 18 ++ .../Client/InterceptorChainOrchestrator.cs | 91 ++++--- .../Client/McpClientInterceptorExtensions.cs | 13 +- .../Gateway/InterceptorChainRunner.cs | 45 ++-- .../Gateway/McpInterceptorGatewayOptions.cs | 3 +- .../InterceptorChainOrchestratorTests.cs | 181 +++++++++++++ .../InterceptorChainTests.cs | 245 ++++++++++++++++++ .../McpInterceptorGatewayTests.cs | 67 ++++- 11 files changed, 700 insertions(+), 77 deletions(-) create mode 100644 csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChain.cs create mode 100644 csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainEntry.cs create mode 100644 csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainTests.cs diff --git a/csharp/sdk/CLAUDE.md b/csharp/sdk/CLAUDE.md index 4955562..ac5e0fb 100644 --- a/csharp/sdk/CLAUDE.md +++ b/csharp/sdk/CLAUDE.md @@ -6,7 +6,7 @@ C# implementation of gateway-level interceptors from [SEP-1763](https://github.c ## Build & test ``` dotnet build # from csharp/sdk/ -dotnet test # 87 tests across the interceptor test project +dotnet test # 97 tests across the interceptor test project ``` ## Key architectural constraints @@ -55,7 +55,7 @@ Interceptor methods auto-bind from `InvokeInterceptorRequestParams`: **`McpInterceptorGateway`**: Configures an `McpServer` as a transparent proxy. Reads backend `ServerCapabilities`, registers handler delegates (`CallToolHandler`, `ListToolsHandler`, etc.) that route through interceptor chains before forwarding to the backend. By default the gateway is transparent-only; SEP passthrough is opt-in via `ExposeInterceptorProtocol = true`. To connecting clients, the proxy appears to be the backend server. -**`InterceptorChainRunner`** (internal): Shared interception logic used by both `InterceptingMcpClient` and `McpInterceptorGateway`. Supports multiple interceptor clients executed sequentially — each client's `ExecuteChainAsync` (SDK-level orchestration via list + invoke) receives the mutated payload from the previous one. +**`InterceptorChainRunner`** (internal): Shared interception logic used by both `InterceptingMcpClient` and `McpInterceptorGateway`. Multiple interceptor clients form a single merged chain per the SEP: interceptors discovered from every client (parallel `interceptors/list`, fail-closed) are sorted globally by priority hint (alphabetical tie-break) and each is invoked on the client hosting it. Public API: `Client/InterceptorChain.cs` (`DiscoverAsync` + `ExecuteAsync` over `IEnumerable`); duplicate names across servers are not deduplicated. **Gateway split**: `GatewayProxyConfigurator` owns transparent MCP proxy wiring, `GatewayInterceptorProtocolBridge` owns optional SEP passthrough wiring, and `GatewayConnectionForwardingRegistrar` owns connection-bound notification forwarding registration. diff --git a/csharp/sdk/README.md b/csharp/sdk/README.md index 3d59abc..6560654 100644 --- a/csharp/sdk/README.md +++ b/csharp/sdk/README.md @@ -78,6 +78,18 @@ var chainResult = await interceptorClient.ExecuteChainAsync(new ExecuteChainRequ Phase = InterceptorPhase.Request, Payload = myPayload, }); + +// Or span multiple interceptor servers: interceptors are discovered from every server, +// merged into one chain sorted globally by priority hint, and each is invoked on the +// server that hosts it +var multiResult = await InterceptorChain.ExecuteAsync( + [interceptorClientA, interceptorClientB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = myPayload, + }); ``` ### Gateway Pattern (Client-Side) @@ -135,7 +147,7 @@ gateway.RegisterNotificationForwarding(server); await server.RunAsync(); ``` -The proxy mirrors the backend's advertised capability graph and forwards `*_list_changed` notifications for the supported list surfaces. Multiple interceptor clients can be chained — they execute in order, each receiving the previous client's mutated payload. +The proxy mirrors the backend's advertised capability graph and forwards `*_list_changed` notifications for the supported list surfaces. Multiple interceptor clients form a single merged chain per the SEP — interceptors from all servers are sorted globally by priority hint (alphabetical tie-break) and each is invoked on the server that hosts it. If you want the gateway to connect to external SEP-exposing interceptor servers itself, use the async factory and provide standard MCP client transports: diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChain.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChain.cs new file mode 100644 index 0000000..a6054a0 --- /dev/null +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChain.cs @@ -0,0 +1,96 @@ +using ModelContextProtocol.Client; +using ModelContextProtocol.Interceptors.Protocol; + +namespace ModelContextProtocol.Interceptors.Client; + +/// +/// An interceptor chain assembled from one or more MCP servers, per the SEP orchestration pattern: +/// discover via interceptors/list on every server, merge all entries into a single chain +/// sorted by priority hint (alphabetical tie-break), and execute each interceptor via +/// interceptor/invoke on the server that hosts it — mutations sequentially (each receiving +/// the previous one's output), validations in parallel. +/// +/// +/// Interceptors with the same name on different servers are not deduplicated; each entry is +/// invoked on its own host, so results may share an . +/// +public sealed class InterceptorChain +{ + /// Creates a chain from pre-discovered entries. + public InterceptorChain(IReadOnlyList entries) + { + ArgumentNullException.ThrowIfNull(entries); + Entries = entries; + } + + /// Gets all interceptor entries in the chain, collected from one or more servers. + public IReadOnlyList Entries { get; } + + /// + /// Discovers interceptors by calling interceptors/list on every server in parallel and + /// assembles them into a chain, preserving server order for stable ordering of exact ties. + /// + /// + /// Discovery is fail-closed: if any server's interceptors/list fails, the whole call + /// throws rather than silently dropping that server's interceptors. + /// + public static async ValueTask DiscoverAsync( + IEnumerable servers, + ListInterceptorsRequestParams? requestParams = null, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(servers); + + var serverList = servers.ToList(); + var listed = await Task.WhenAll(serverList.Select( + server => server.ListInterceptorsAsync(requestParams, cancellationToken).AsTask())).ConfigureAwait(false); + + var entries = new List(); + for (var i = 0; i < serverList.Count; i++) + { + foreach (var interceptor in listed[i].Interceptors) + { + entries.Add(new InterceptorChainEntry { Interceptor = interceptor, Server = serverList[i] }); + } + } + + return new InterceptorChain(entries); + } + + /// + /// Executes the chain for the given event and phase using the SEP execution model. + /// + public ValueTask ExecuteAsync( + ExecuteChainRequestParams requestParams, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(requestParams); + + var executionEntries = Entries + .Select(e => new InterceptorChainOrchestrator.ChainExecutionEntry( + e.Interceptor, + (invokeParams, ct) => e.Server.InvokeInterceptorAsync(invokeParams, ct))) + .ToList(); + + return InterceptorChainOrchestrator.ExecuteAsync(executionEntries, requestParams, cancellationToken); + } + + /// + /// Discovers interceptors across the given servers (filtered by the request's event) and + /// executes the merged chain in one call. + /// + public static async ValueTask ExecuteAsync( + IEnumerable servers, + ExecuteChainRequestParams requestParams, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(requestParams); + + var chain = await DiscoverAsync( + servers, + new ListInterceptorsRequestParams { Event = requestParams.Event }, + cancellationToken).ConfigureAwait(false); + + return await chain.ExecuteAsync(requestParams, cancellationToken).ConfigureAwait(false); + } +} diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainEntry.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainEntry.cs new file mode 100644 index 0000000..22ec780 --- /dev/null +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainEntry.cs @@ -0,0 +1,18 @@ +using ModelContextProtocol.Client; +using ModelContextProtocol.Interceptors.Protocol; + +namespace ModelContextProtocol.Interceptors.Client; + +/// +/// An entry in an interceptor chain: an interceptor descriptor paired with the MCP server that +/// hosts it. The server reference is used to route interceptor/invoke calls to the correct +/// server when a chain spans multiple servers. +/// +public sealed class InterceptorChainEntry +{ + /// Gets the interceptor descriptor, as returned by interceptors/list. + public required Interceptor Interceptor { get; init; } + + /// Gets the client connected to the MCP server hosting this interceptor. + public required McpClient Server { get; init; } +} diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainOrchestrator.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainOrchestrator.cs index 4c32677..35007c8 100644 --- a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainOrchestrator.cs +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainOrchestrator.cs @@ -5,13 +5,14 @@ namespace ModelContextProtocol.Interceptors.Client; /// -/// Client-side chain orchestrator. Takes a list of interceptor descriptors (typically obtained -/// via interceptors/list) and an invoker delegate (typically wired to -/// interceptor/invoke over the wire), and runs them according to the SEP execution model. +/// Client-side chain orchestrator. Takes chain entries — interceptor descriptors (typically obtained +/// via interceptors/list, potentially from multiple servers) each paired with an invoker +/// delegate routing to the server hosting it — and runs them according to the SEP execution model. /// /// /// Per the SEP, chain execution is a convenience utility provided by SDKs — not a wire JSON-RPC -/// method. This orchestrator enforces the trust-boundary-aware ordering: +/// method. This orchestrator merges all entries into a single chain sorted by priority hint +/// (alphabetical tie-break) and enforces the trust-boundary-aware ordering: /// /// Sending (request phase): mutations (sequential by priority) → validations (parallel) → sinks (fire-and-forget) /// Receiving (response phase): validations (parallel) → sinks (fire-and-forget) → mutations (sequential by priority) @@ -24,11 +25,31 @@ internal delegate ValueTask InterceptorInvoker( InvokeInterceptorRequestParams request, CancellationToken cancellationToken); - internal static async ValueTask ExecuteAsync( + /// + /// An interceptor descriptor paired with the invoker that routes interceptor/invoke + /// to the server hosting it. Mirrors the SEP's ChainEntry. + /// + internal readonly record struct ChainExecutionEntry(Interceptor Descriptor, InterceptorInvoker Invoker); + + /// + /// Convenience overload for the single-server case: every descriptor shares one invoker. + /// + internal static ValueTask ExecuteAsync( IEnumerable interceptors, InterceptorInvoker invoker, ExecuteChainRequestParams chainParams, CancellationToken cancellationToken) + { + return ExecuteAsync( + interceptors.Select(i => new ChainExecutionEntry(i, invoker)).ToList(), + chainParams, + cancellationToken); + } + + internal static async ValueTask ExecuteAsync( + IReadOnlyList entries, + ExecuteChainRequestParams chainParams, + CancellationToken cancellationToken) { if (chainParams.Phase is not (InterceptorPhase.Request or InterceptorPhase.Response)) { @@ -45,15 +66,17 @@ internal static async ValueTask ExecuteAsync( ChainAbortInfo? abortInfo = null; var status = InterceptorChainStatus.Success; - var applicable = FilterInterceptors(interceptors, chainParams); - - var mutations = applicable.Where(i => i.Type == InterceptorType.Mutation) - .OrderBy(i => i.PriorityHint?.GetEffective(chainParams.Phase) ?? 0) - .ThenBy(i => i.Name, StringComparer.Ordinal) + // Merge & sort: one global order across all servers by effective priority + // (ascending, alphabetical tie-break), then partition by type. The stable sort + // preserves caller-supplied server order for exact ties. + var applicable = FilterEntries(entries, chainParams) + .OrderBy(e => e.Descriptor.PriorityHint?.GetEffective(chainParams.Phase) ?? 0) + .ThenBy(e => e.Descriptor.Name, StringComparer.Ordinal) .ToList(); - var validations = applicable.Where(i => i.Type == InterceptorType.Validation).ToList(); - var sinks = applicable.Where(i => i.Type == InterceptorType.Sink).ToList(); + var mutations = applicable.Where(e => e.Descriptor.Type == InterceptorType.Mutation).ToList(); + var validations = applicable.Where(e => e.Descriptor.Type == InterceptorType.Validation).ToList(); + var sinks = applicable.Where(e => e.Descriptor.Type == InterceptorType.Sink).ToList(); using var timeoutCts = chainParams.TimeoutMs.HasValue ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) @@ -69,23 +92,23 @@ internal static async ValueTask ExecuteAsync( if (chainParams.Phase == InterceptorPhase.Request) { // Sending: Mutations -> Validations -> Sinks - (currentPayload, status, abortInfo) = await ExecuteMutationsAsync(mutations, invoker, chainParams, currentPayload, results, ct); + (currentPayload, status, abortInfo) = await ExecuteMutationsAsync(mutations, chainParams, currentPayload, results, ct); if (status != InterceptorChainStatus.Success) goto Done; - (status, abortInfo) = await ExecuteValidationsAsync(validations, invoker, chainParams, currentPayload, results, summary, ct); + (status, abortInfo) = await ExecuteValidationsAsync(validations, chainParams, currentPayload, results, summary, ct); if (status != InterceptorChainStatus.Success) goto Done; - await ExecuteSinksAsync(sinks, invoker, chainParams, currentPayload, results, ct); + await ExecuteSinksAsync(sinks, chainParams, currentPayload, results, ct); } else { // Receiving: Validations -> Sinks -> Mutations - (status, abortInfo) = await ExecuteValidationsAsync(validations, invoker, chainParams, currentPayload, results, summary, ct); + (status, abortInfo) = await ExecuteValidationsAsync(validations, chainParams, currentPayload, results, summary, ct); if (status != InterceptorChainStatus.Success) goto Done; - await ExecuteSinksAsync(sinks, invoker, chainParams, currentPayload, results, ct); + await ExecuteSinksAsync(sinks, chainParams, currentPayload, results, ct); - (currentPayload, status, abortInfo) = await ExecuteMutationsAsync(mutations, invoker, chainParams, currentPayload, results, ct); + (currentPayload, status, abortInfo) = await ExecuteMutationsAsync(mutations, chainParams, currentPayload, results, ct); } } catch (OperationCanceledException) when (timeoutCts?.IsCancellationRequested == true) @@ -109,14 +132,13 @@ internal static async ValueTask ExecuteAsync( } private static async ValueTask<(JsonNode payload, InterceptorChainStatus status, ChainAbortInfo? abort)> ExecuteMutationsAsync( - List mutations, - InterceptorInvoker invoker, + List mutations, ExecuteChainRequestParams chainParams, JsonNode currentPayload, List results, CancellationToken ct) { - foreach (var descriptor in mutations) + foreach (var (descriptor, invoker) in mutations) { var isAudit = descriptor.Mode == InterceptorMode.Audit; var failOpen = descriptor.FailOpen == true; @@ -157,21 +179,21 @@ internal static async ValueTask ExecuteAsync( } private static async ValueTask<(InterceptorChainStatus status, ChainAbortInfo? abort)> ExecuteValidationsAsync( - List validations, - InterceptorInvoker invoker, + List validations, ExecuteChainRequestParams chainParams, JsonNode currentPayload, List results, ChainValidationSummary summary, CancellationToken ct) { - var tasks = validations.Select(async descriptor => + var tasks = validations.Select(async entry => { + var descriptor = entry.Descriptor; try { var invokeParams = CreateInvokeParams(descriptor, chainParams, currentPayload); var sw = Stopwatch.StartNew(); - var result = await invoker(invokeParams, ct); + var result = await entry.Invoker(invokeParams, ct); sw.Stop(); result.InterceptorName = descriptor.Name; result.DurationMs = sw.ElapsedMilliseconds; @@ -238,20 +260,20 @@ internal static async ValueTask ExecuteAsync( } private static async ValueTask ExecuteSinksAsync( - List sinks, - InterceptorInvoker invoker, + List sinks, ExecuteChainRequestParams chainParams, JsonNode currentPayload, List results, CancellationToken ct) { - var tasks = sinks.Select(async descriptor => + var tasks = sinks.Select(async entry => { + var descriptor = entry.Descriptor; try { var invokeParams = CreateInvokeParams(descriptor, chainParams, currentPayload); var sw = Stopwatch.StartNew(); - var result = await invoker(invokeParams, ct); + var result = await entry.Invoker(invokeParams, ct); sw.Stop(); result.InterceptorName = descriptor.Name; result.DurationMs = sw.ElapsedMilliseconds; @@ -271,14 +293,15 @@ private static async ValueTask ExecuteSinksAsync( results.AddRange(completedResults); } - private static List FilterInterceptors( - IEnumerable interceptors, + private static List FilterEntries( + IReadOnlyList entries, ExecuteChainRequestParams chainParams) { - var result = new List(); + var result = new List(); - foreach (var descriptor in interceptors) + foreach (var entry in entries) { + var descriptor = entry.Descriptor; if (chainParams.InterceptorNames is { Count: > 0 } names && !names.Contains(descriptor.Name)) { continue; @@ -304,7 +327,7 @@ private static List FilterInterceptors( continue; } - result.Add(descriptor); + result.Add(entry); } return result; diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/McpClientInterceptorExtensions.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/McpClientInterceptorExtensions.cs index d53eb77..e0a7b03 100644 --- a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/McpClientInterceptorExtensions.cs +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/McpClientInterceptorExtensions.cs @@ -49,21 +49,14 @@ public static ValueTask InvokeInterceptorAsync( /// dispatches each one via interceptor/invoke, orchestrating ordering, parallelism, /// audit-mode, and fail-open semantics locally. /// - public static async ValueTask ExecuteChainAsync( + public static ValueTask ExecuteChainAsync( this McpClient client, ExecuteChainRequestParams requestParams, CancellationToken cancellationToken = default) { + ArgumentNullException.ThrowIfNull(client); ArgumentNullException.ThrowIfNull(requestParams); - var listed = await client.ListInterceptorsAsync( - new ListInterceptorsRequestParams { Event = requestParams.Event }, - cancellationToken).ConfigureAwait(false); - - return await InterceptorChainOrchestrator.ExecuteAsync( - listed.Interceptors, - (invokeParams, ct) => client.InvokeInterceptorAsync(invokeParams, ct), - requestParams, - cancellationToken).ConfigureAwait(false); + return InterceptorChain.ExecuteAsync([client], requestParams, cancellationToken); } } diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/InterceptorChainRunner.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/InterceptorChainRunner.cs index d817bab..263534d 100644 --- a/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/InterceptorChainRunner.cs +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/InterceptorChainRunner.cs @@ -43,39 +43,30 @@ internal bool ShouldIntercept(string eventName) } /// - /// Runs the interceptor chain across all configured interceptor clients sequentially. - /// Each client's (SDK-level chain - /// orchestration via list + invoke) receives the original or last successful payload from the - /// previous one. Any non-success result stops the chain immediately. + /// Runs the interceptor chain phase across all configured interceptor clients as a single + /// merged chain per the SEP: interceptors discovered from every client are combined, sorted + /// globally by priority hint (alphabetical tie-break), and each is invoked on the client that + /// hosts it via . /// - /// The payload after the last successful client and the final chain status. + /// The final payload (the original on failure) and the chain status. internal async ValueTask<(JsonNode payload, InterceptorChainStatus status)> RunChainPhaseAsync( string eventName, InterceptorPhase phase, JsonNode payload, CancellationToken ct) { - var currentPayload = payload; - - foreach (var client in _interceptorClients) - { - var chainResult = await client.ExecuteChainAsync( - new ExecuteChainRequestParams - { - Event = eventName, - Phase = phase, - Payload = currentPayload, - TimeoutMs = _timeoutMs, - Context = _defaultContext, - }, - ct); - - if (chainResult.Status != InterceptorChainStatus.Success) + var chainResult = await InterceptorChain.ExecuteAsync( + _interceptorClients, + new ExecuteChainRequestParams { - return (currentPayload, chainResult.Status); - } - - currentPayload = chainResult.FinalPayload ?? currentPayload; - } + Event = eventName, + Phase = phase, + Payload = payload, + TimeoutMs = _timeoutMs, + Context = _defaultContext, + }, + ct); - return (currentPayload, InterceptorChainStatus.Success); + return chainResult.Status == InterceptorChainStatus.Success + ? (chainResult.FinalPayload ?? payload, InterceptorChainStatus.Success) + : (payload, chainResult.Status); } /// diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/McpInterceptorGatewayOptions.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/McpInterceptorGatewayOptions.cs index 4913617..e7c7406 100644 --- a/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/McpInterceptorGatewayOptions.cs +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/McpInterceptorGatewayOptions.cs @@ -14,7 +14,8 @@ public sealed class McpInterceptorGatewayOptions public required McpClient BackendClient { get; set; } /// - /// Gets or sets the clients connected to interceptor servers, executed in order. + /// Gets or sets the clients connected to interceptor servers. Their interceptors are merged + /// into a single chain ordered globally by priority hint (alphabetical tie-break). /// Use this when the interceptor clients are already connected. /// public IReadOnlyList? InterceptorClients { get; set; } diff --git a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainOrchestratorTests.cs b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainOrchestratorTests.cs index a9ab420..d46c40b 100644 --- a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainOrchestratorTests.cs +++ b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainOrchestratorTests.cs @@ -565,8 +565,189 @@ public async Task ValidationSummaryCountsCorrectly() Assert.Equal(1, result.ValidationSummary.Infos); } + // ── Multi-server chains ───────────────────────────────────────────────── + + [Fact] + public async Task MultiServer_MutationsOrderByGlobalPriorityAcrossServers() + { + var executionOrder = new List(); + + TestEntry Mut(string name, PriorityHint hint) => CreateInterceptor(name, InterceptorType.Mutation, (req, _) => + { + executionOrder.Add(name); + return new ValueTask(new MutationInterceptorResult { Modified = false }); + }, priorityHint: hint); + + // Server A hosts the high-priority mutation, server B the low-priority one. + // The merged chain must run B's mutation first regardless of server order. + var serverA = Server(Mut("server-a-late", 100)); + var serverB = Server(Mut("server-b-early", -100)); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverA, .. serverB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.Equal(["server-b-early", "server-a-late"], executionOrder); + } + + [Fact] + public async Task MultiServer_MutationsTieBreakAlphabeticallyAcrossServers() + { + var executionOrder = new List(); + + TestEntry Mut(string name) => CreateInterceptor(name, InterceptorType.Mutation, (req, _) => + { + executionOrder.Add(name); + return new ValueTask(new MutationInterceptorResult { Modified = false }); + }, priorityHint: 0); + + var serverA = Server(Mut("zulu")); + var serverB = Server(Mut("alpha")); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverA, .. serverB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.Equal(["alpha", "zulu"], executionOrder); + } + + [Fact] + public async Task MultiServer_PayloadChainsAcrossServersInOneMutationPass() + { + var serverA = Server(CreateInterceptor("a-first", InterceptorType.Mutation, (req, _) => + { + var obj = req.Payload.AsObject(); + obj["fromA"] = true; + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }, priorityHint: -1)); + + var serverB = Server(CreateInterceptor("b-second", InterceptorType.Mutation, (req, _) => + { + Assert.True(req.Payload["fromA"]!.GetValue()); // Receives server A's output + var obj = req.Payload.AsObject(); + obj["fromB"] = true; + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }, priorityHint: 1)); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverB, .. serverA], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{"original":true}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.True(result.FinalPayload!["original"]!.GetValue()); + Assert.True(result.FinalPayload["fromA"]!.GetValue()); + Assert.True(result.FinalPayload["fromB"]!.GetValue()); + } + + [Fact] + public async Task MultiServer_ValidationsFromAllServersSeePostMutationPayload() + { + var validatedPayloads = new List(); + + var serverA = Server( + CreateInterceptor("mutator", InterceptorType.Mutation, (req, _) => + { + var obj = req.Payload.AsObject(); + obj["mutated"] = true; + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }), + CreateInterceptor("val-a", InterceptorType.Validation, (req, _) => + { + lock (validatedPayloads) validatedPayloads.Add(req.Payload); + return new ValueTask(ValidationInterceptorResult.Success()); + })); + + var serverB = Server(CreateInterceptor("val-b", InterceptorType.Validation, (req, _) => + { + lock (validatedPayloads) validatedPayloads.Add(req.Payload); + return new ValueTask(ValidationInterceptorResult.Success()); + })); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverA, .. serverB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.Equal(2, validatedPayloads.Count); + // Both servers' validations run after the mutation from server A, against its output. + Assert.All(validatedPayloads, p => Assert.True(p["mutated"]!.GetValue())); + } + + [Fact] + public async Task MultiServer_DuplicateNamesInvokeOncePerHostingServer() + { + var serverAInvocations = 0; + var serverBInvocations = 0; + + var serverA = Server(CreateInterceptor("shared-name", InterceptorType.Validation, (req, _) => + { + Interlocked.Increment(ref serverAInvocations); + return new ValueTask(ValidationInterceptorResult.Success()); + })); + + var serverB = Server(CreateInterceptor("shared-name", InterceptorType.Validation, (req, _) => + { + Interlocked.Increment(ref serverBInvocations); + return new ValueTask(ValidationInterceptorResult.Success()); + })); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverA, .. serverB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + // Each entry routes to its own host: one invocation per server, two results. + Assert.Equal(1, serverAInvocations); + Assert.Equal(1, serverBInvocations); + Assert.Equal(2, result.Results.Count); + Assert.All(result.Results, r => Assert.Equal("shared-name", r.InterceptorName)); + } + // ── Helpers ───────────────────────────────────────────────────────────── + /// + /// Models one fake server: each of its entries gets an invoker bound to that server's own + /// handler, so duplicate names across servers stay distinct. + /// + private static List Server(params TestEntry[] entries) + { + return entries.Select(e => new InterceptorChainOrchestrator.ChainExecutionEntry( + e.Descriptor, + (req, ct) => e.Handler(req, ct))).ToList(); + } + private static ValueTask RunAsync( IEnumerable entries, ExecuteChainRequestParams chainParams, diff --git a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainTests.cs b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainTests.cs new file mode 100644 index 0000000..08cb0b7 --- /dev/null +++ b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainTests.cs @@ -0,0 +1,245 @@ +using System.Text.Json; +using System.Text.Json.Nodes; +using ModelContextProtocol; +using ModelContextProtocol.Client; +using ModelContextProtocol.Interceptors.Client; +using ModelContextProtocol.Interceptors.Protocol; +using ModelContextProtocol.Interceptors.Server; +using ModelContextProtocol.Server; +using Xunit; + +namespace ModelContextProtocol.Interceptors.Tests; + +/// +/// Tests for the public API over real in-memory interceptor servers. +/// +public class InterceptorChainTests +{ + [Fact] + public async Task DiscoverAsync_MergesEntriesFromAllServersWithServerAttribution() + { + await using var fixture = await TwoServerFixture.CreateAsync( + serverAInterceptors: [MutationDescriptor("a-mutator", priorityHint: 10)], + serverBInterceptors: [MutationDescriptor("b-mutator", priorityHint: -10)]); + + var chain = await InterceptorChain.DiscoverAsync([fixture.ClientA, fixture.ClientB]); + + Assert.Equal(2, chain.Entries.Count); + var entryA = Assert.Single(chain.Entries, e => e.Interceptor.Name == "a-mutator"); + var entryB = Assert.Single(chain.Entries, e => e.Interceptor.Name == "b-mutator"); + Assert.Same(fixture.ClientA, entryA.Server); + Assert.Same(fixture.ClientB, entryB.Server); + } + + [Fact] + public async Task ExecuteAsync_RunsMergedChainInGlobalPriorityOrderAcrossServers() + { + // Server A's mutation has the higher priority value, so server B's must run first + // and server A's must observe B's payload — the opposite of per-server sequencing. + await using var fixture = await TwoServerFixture.CreateAsync( + serverAInterceptors: [MutationDescriptor("a-late", priorityHint: 10)], + serverBInterceptors: [MutationDescriptor("b-early", priorityHint: -10)], + handler: (req, _) => + { + var obj = req.Payload!.AsObject(); + var order = obj["order"]?.AsArray() ?? []; + order.Add(req.Name); + obj["order"] = order.DeepClone(); + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }); + + var result = await InterceptorChain.ExecuteAsync( + [fixture.ClientA, fixture.ClientB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.Equal(2, result.Results.Count); + var order = result.FinalPayload!["order"]!.AsArray().Select(n => n!.GetValue()).ToList(); + Assert.Equal(["b-early", "a-late"], order); + } + + [Fact] + public async Task ExecuteAsync_FailsClosedWhenAnyServerListFails() + { + await using var fixture = await TwoServerFixture.CreateAsync( + serverAInterceptors: [MutationDescriptor("a-mutator", priorityHint: 0)], + // Server B does not speak the interceptor protocol at all, so interceptors/list fails. + serverBInterceptors: null); + + await Assert.ThrowsAsync(async () => + { + await InterceptorChain.ExecuteAsync( + [fixture.ClientA, fixture.ClientB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }); + }); + } + + [Fact] + public async Task SingleClientExecuteChainAsync_StillWorksThroughMergedPath() + { + await using var fixture = await TwoServerFixture.CreateAsync( + serverAInterceptors: [MutationDescriptor("solo-mutator", priorityHint: 0)], + serverBInterceptors: null, + handler: (req, _) => + { + var obj = req.Payload!.AsObject(); + obj["mutated"] = true; + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }); + + var result = await fixture.ClientA.ExecuteChainAsync( + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{"original":true}""")!, + }); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.True(result.FinalPayload!["mutated"]!.GetValue()); + Assert.True(result.FinalPayload["original"]!.GetValue()); + } + + // ── Helpers ───────────────────────────────────────────────────────────── + + private static Interceptor MutationDescriptor(string name, PriorityHint priorityHint) => new() + { + Name = name, + Type = InterceptorType.Mutation, + Hooks = + [ + new InterceptorHook { Events = [InterceptionEvents.All], Phase = InterceptorPhase.Request }, + new InterceptorHook { Events = [InterceptionEvents.All], Phase = InterceptorPhase.Response }, + ], + PriorityHint = priorityHint, + }; + + private sealed class TwoServerFixture : IAsyncDisposable + { + private readonly List _disposables; + + public McpClient ClientA { get; } + public McpClient ClientB { get; } + + private TwoServerFixture(McpClient clientA, McpClient clientB, List disposables) + { + ClientA = clientA; + ClientB = clientB; + _disposables = disposables; + } + + /// + /// Spins up two in-memory servers. A interceptor list creates a + /// plain MCP server without the interceptor protocol (its interceptors/list fails). + /// + public static async Task CreateAsync( + Interceptor[]? serverAInterceptors, + Interceptor[]? serverBInterceptors, + Func>? handler = null) + { + handler ??= (req, _) => new ValueTask(new MutationInterceptorResult { Modified = false }); + var disposables = new List(); + + try + { + var (serverA, clientA) = await McpInterceptorGatewayTests.GatewayTestFixture.CreateServerClientPairForTesting( + "chain-server-a", options => Configure(options, serverAInterceptors, handler)); + disposables.Add(serverA); + disposables.Add(clientA); + + var (serverB, clientB) = await McpInterceptorGatewayTests.GatewayTestFixture.CreateServerClientPairForTesting( + "chain-server-b", options => Configure(options, serverBInterceptors, handler)); + disposables.Add(serverB); + disposables.Add(clientB); + + return new TwoServerFixture(clientA, clientB, disposables); + } + catch + { + foreach (var d in disposables) + { + await d.DisposeAsync(); + } + + throw; + } + } + + private static void Configure( + McpServerOptions options, + Interceptor[]? interceptors, + Func> handler) + { + if (interceptors is null) + { + return; + } + + var collection = new McpServerPrimitiveCollection(); + foreach (var descriptor in interceptors) + { + collection.Add(new DelegatingInterceptor(descriptor, handler)); + } + + var filter = new InterceptorMessageFilter(collection); + options.Filters.Message.IncomingFilters.Add(filter.CreateFilter); + + options.Capabilities ??= new(); +#pragma warning disable MCPEXP001 + options.Capabilities.Extensions ??= new Dictionary(); + options.Capabilities.Extensions[InterceptorProtocolConstants.ExtensionCapabilityKey] = JsonSerializer.SerializeToElement( + new InterceptorsCapability { SupportedEvents = [InterceptionEvents.All] }, + InterceptorJsonUtilities.DefaultOptions); +#pragma warning restore MCPEXP001 + } + + public async ValueTask DisposeAsync() + { + for (var i = _disposables.Count - 1; i >= 0; i--) + { + try + { + await _disposables[i].DisposeAsync(); + } + catch + { + // Swallow disposal errors in tests + } + } + } + } + + private sealed class DelegatingInterceptor : McpServerInterceptor + { + private readonly Interceptor _interceptor; + private readonly Func> _handler; + + public DelegatingInterceptor( + Interceptor interceptor, + Func> handler) + { + _interceptor = interceptor; + _handler = handler; + } + + public override Interceptor ProtocolInterceptor => _interceptor; + public override IReadOnlyList Metadata => []; + + public override ValueTask InvokeAsync( + InvokeInterceptorRequestParams request, + McpServer server, + IServiceProvider? services, + CancellationToken cancellationToken = default) => + _handler(request, cancellationToken); + } +} diff --git a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/McpInterceptorGatewayTests.cs b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/McpInterceptorGatewayTests.cs index 55d3dce..a27d5bb 100644 --- a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/McpInterceptorGatewayTests.cs +++ b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/McpInterceptorGatewayTests.cs @@ -432,15 +432,78 @@ public async Task CallToolAsync_ChainsAllInterceptorClients() })], ]); - // Call tool — both interceptors should run (A first, then B) + // Call tool — both interceptors should run (A first, then B: equal priority, + // alphabetical tie-break in the merged chain) var result = await fixture.ProxyClient.CallToolAsync("echo", new Dictionary { ["message"] = "hello" }); - // Both interceptor chains ran in order: A prepended first, then B + // A prepended first, then B var text = result.Content[0].ToString()!; Assert.Contains("B:A:hello", text); } + [Fact] + public async Task CallToolAsync_OrdersMutationsByGlobalPriorityAcrossInterceptorClients() + { + // The first server hosts the higher-priority-value (later) mutation, the second server + // the lower-priority-value (earlier) one. The merged chain must run the second server's + // mutation first — under per-server sequencing it would run last. + static McpServerInterceptor PrependMutator(string name, int priority, string prefix) => + new TestInterceptor( + new Interceptor + { + Name = name, + Type = InterceptorType.Mutation, + PriorityHint = priority, + Hooks = [new InterceptorHook { Events = [InterceptionEvents.All], Phase = InterceptorPhase.Request }], + }, + (req, _, _, _) => + { + var obj = JsonNode.Parse(req.Payload!.ToJsonString())!.AsObject(); + if (obj["arguments"]?["message"] is JsonNode msgNode) + { + obj["arguments"]!["message"] = prefix + msgNode.GetValue(); + } + return new ValueTask(new MutationInterceptorResult + { + Modified = true, + Payload = obj, + }); + }); + + await using var fixture = await GatewayTestFixture.CreateWithMultipleInterceptorServersAsync( + backendConfigure: (options) => + { + options.Capabilities ??= new(); + options.Capabilities.Tools ??= new(); + options.Handlers.ListToolsHandler = (request, ct) => + new ValueTask(new ListToolsResult + { + Tools = [new Tool { Name = "echo", Description = "Echo" }], + }); + options.Handlers.CallToolHandler = (request, ct) => + { + var msg = request.Params!.Arguments?["message"]; + return new ValueTask(new CallToolResult + { + Content = [new TextContentBlock { Text = $"echo: {msg}" }], + }); + }; + }, + interceptorConfigs: + [ + [PrependMutator("late-mutator", priority: 1000, prefix: "LATE:")], + [PrependMutator("early-mutator", priority: -1000, prefix: "EARLY:")], + ]); + + var result = await fixture.ProxyClient.CallToolAsync("echo", + new Dictionary { ["message"] = "hello" }); + + // early (server 2) ran first, late (server 1) ran second and prepended last. + var text = result.Content[0].ToString()!; + Assert.Contains("LATE:EARLY:hello", text); + } + [Fact] public async Task SubscribeMutation_UsesModifiedPayload() {