diff --git a/csharp/sdk/CLAUDE.md b/csharp/sdk/CLAUDE.md index 4955562..ac5e0fb 100644 --- a/csharp/sdk/CLAUDE.md +++ b/csharp/sdk/CLAUDE.md @@ -6,7 +6,7 @@ C# implementation of gateway-level interceptors from [SEP-1763](https://github.c ## Build & test ``` dotnet build # from csharp/sdk/ -dotnet test # 87 tests across the interceptor test project +dotnet test # 97 tests across the interceptor test project ``` ## Key architectural constraints @@ -55,7 +55,7 @@ Interceptor methods auto-bind from `InvokeInterceptorRequestParams`: **`McpInterceptorGateway`**: Configures an `McpServer` as a transparent proxy. Reads backend `ServerCapabilities`, registers handler delegates (`CallToolHandler`, `ListToolsHandler`, etc.) that route through interceptor chains before forwarding to the backend. By default the gateway is transparent-only; SEP passthrough is opt-in via `ExposeInterceptorProtocol = true`. To connecting clients, the proxy appears to be the backend server. -**`InterceptorChainRunner`** (internal): Shared interception logic used by both `InterceptingMcpClient` and `McpInterceptorGateway`. Supports multiple interceptor clients executed sequentially — each client's `ExecuteChainAsync` (SDK-level orchestration via list + invoke) receives the mutated payload from the previous one. +**`InterceptorChainRunner`** (internal): Shared interception logic used by both `InterceptingMcpClient` and `McpInterceptorGateway`. Multiple interceptor clients form a single merged chain per the SEP: interceptors discovered from every client (parallel `interceptors/list`, fail-closed) are sorted globally by priority hint (alphabetical tie-break) and each is invoked on the client hosting it. Public API: `Client/InterceptorChain.cs` (`DiscoverAsync` + `ExecuteAsync` over `IEnumerable`); duplicate names across servers are not deduplicated. **Gateway split**: `GatewayProxyConfigurator` owns transparent MCP proxy wiring, `GatewayInterceptorProtocolBridge` owns optional SEP passthrough wiring, and `GatewayConnectionForwardingRegistrar` owns connection-bound notification forwarding registration. diff --git a/csharp/sdk/README.md b/csharp/sdk/README.md index 3d59abc..6560654 100644 --- a/csharp/sdk/README.md +++ b/csharp/sdk/README.md @@ -78,6 +78,18 @@ var chainResult = await interceptorClient.ExecuteChainAsync(new ExecuteChainRequ Phase = InterceptorPhase.Request, Payload = myPayload, }); + +// Or span multiple interceptor servers: interceptors are discovered from every server, +// merged into one chain sorted globally by priority hint, and each is invoked on the +// server that hosts it +var multiResult = await InterceptorChain.ExecuteAsync( + [interceptorClientA, interceptorClientB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = myPayload, + }); ``` ### Gateway Pattern (Client-Side) @@ -135,7 +147,7 @@ gateway.RegisterNotificationForwarding(server); await server.RunAsync(); ``` -The proxy mirrors the backend's advertised capability graph and forwards `*_list_changed` notifications for the supported list surfaces. Multiple interceptor clients can be chained — they execute in order, each receiving the previous client's mutated payload. +The proxy mirrors the backend's advertised capability graph and forwards `*_list_changed` notifications for the supported list surfaces. Multiple interceptor clients form a single merged chain per the SEP — interceptors from all servers are sorted globally by priority hint (alphabetical tie-break) and each is invoked on the server that hosts it. If you want the gateway to connect to external SEP-exposing interceptor servers itself, use the async factory and provide standard MCP client transports: diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChain.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChain.cs new file mode 100644 index 0000000..a6054a0 --- /dev/null +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChain.cs @@ -0,0 +1,96 @@ +using ModelContextProtocol.Client; +using ModelContextProtocol.Interceptors.Protocol; + +namespace ModelContextProtocol.Interceptors.Client; + +/// +/// An interceptor chain assembled from one or more MCP servers, per the SEP orchestration pattern: +/// discover via interceptors/list on every server, merge all entries into a single chain +/// sorted by priority hint (alphabetical tie-break), and execute each interceptor via +/// interceptor/invoke on the server that hosts it — mutations sequentially (each receiving +/// the previous one's output), validations in parallel. +/// +/// +/// Interceptors with the same name on different servers are not deduplicated; each entry is +/// invoked on its own host, so results may share an . +/// +public sealed class InterceptorChain +{ + /// Creates a chain from pre-discovered entries. + public InterceptorChain(IReadOnlyList entries) + { + ArgumentNullException.ThrowIfNull(entries); + Entries = entries; + } + + /// Gets all interceptor entries in the chain, collected from one or more servers. + public IReadOnlyList Entries { get; } + + /// + /// Discovers interceptors by calling interceptors/list on every server in parallel and + /// assembles them into a chain, preserving server order for stable ordering of exact ties. + /// + /// + /// Discovery is fail-closed: if any server's interceptors/list fails, the whole call + /// throws rather than silently dropping that server's interceptors. + /// + public static async ValueTask DiscoverAsync( + IEnumerable servers, + ListInterceptorsRequestParams? requestParams = null, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(servers); + + var serverList = servers.ToList(); + var listed = await Task.WhenAll(serverList.Select( + server => server.ListInterceptorsAsync(requestParams, cancellationToken).AsTask())).ConfigureAwait(false); + + var entries = new List(); + for (var i = 0; i < serverList.Count; i++) + { + foreach (var interceptor in listed[i].Interceptors) + { + entries.Add(new InterceptorChainEntry { Interceptor = interceptor, Server = serverList[i] }); + } + } + + return new InterceptorChain(entries); + } + + /// + /// Executes the chain for the given event and phase using the SEP execution model. + /// + public ValueTask ExecuteAsync( + ExecuteChainRequestParams requestParams, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(requestParams); + + var executionEntries = Entries + .Select(e => new InterceptorChainOrchestrator.ChainExecutionEntry( + e.Interceptor, + (invokeParams, ct) => e.Server.InvokeInterceptorAsync(invokeParams, ct))) + .ToList(); + + return InterceptorChainOrchestrator.ExecuteAsync(executionEntries, requestParams, cancellationToken); + } + + /// + /// Discovers interceptors across the given servers (filtered by the request's event) and + /// executes the merged chain in one call. + /// + public static async ValueTask ExecuteAsync( + IEnumerable servers, + ExecuteChainRequestParams requestParams, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(requestParams); + + var chain = await DiscoverAsync( + servers, + new ListInterceptorsRequestParams { Event = requestParams.Event }, + cancellationToken).ConfigureAwait(false); + + return await chain.ExecuteAsync(requestParams, cancellationToken).ConfigureAwait(false); + } +} diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainEntry.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainEntry.cs new file mode 100644 index 0000000..22ec780 --- /dev/null +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainEntry.cs @@ -0,0 +1,18 @@ +using ModelContextProtocol.Client; +using ModelContextProtocol.Interceptors.Protocol; + +namespace ModelContextProtocol.Interceptors.Client; + +/// +/// An entry in an interceptor chain: an interceptor descriptor paired with the MCP server that +/// hosts it. The server reference is used to route interceptor/invoke calls to the correct +/// server when a chain spans multiple servers. +/// +public sealed class InterceptorChainEntry +{ + /// Gets the interceptor descriptor, as returned by interceptors/list. + public required Interceptor Interceptor { get; init; } + + /// Gets the client connected to the MCP server hosting this interceptor. + public required McpClient Server { get; init; } +} diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainOrchestrator.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainOrchestrator.cs index 4c32677..35007c8 100644 --- a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainOrchestrator.cs +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/InterceptorChainOrchestrator.cs @@ -5,13 +5,14 @@ namespace ModelContextProtocol.Interceptors.Client; /// -/// Client-side chain orchestrator. Takes a list of interceptor descriptors (typically obtained -/// via interceptors/list) and an invoker delegate (typically wired to -/// interceptor/invoke over the wire), and runs them according to the SEP execution model. +/// Client-side chain orchestrator. Takes chain entries — interceptor descriptors (typically obtained +/// via interceptors/list, potentially from multiple servers) each paired with an invoker +/// delegate routing to the server hosting it — and runs them according to the SEP execution model. /// /// /// Per the SEP, chain execution is a convenience utility provided by SDKs — not a wire JSON-RPC -/// method. This orchestrator enforces the trust-boundary-aware ordering: +/// method. This orchestrator merges all entries into a single chain sorted by priority hint +/// (alphabetical tie-break) and enforces the trust-boundary-aware ordering: /// /// Sending (request phase): mutations (sequential by priority) → validations (parallel) → sinks (fire-and-forget) /// Receiving (response phase): validations (parallel) → sinks (fire-and-forget) → mutations (sequential by priority) @@ -24,11 +25,31 @@ internal delegate ValueTask InterceptorInvoker( InvokeInterceptorRequestParams request, CancellationToken cancellationToken); - internal static async ValueTask ExecuteAsync( + /// + /// An interceptor descriptor paired with the invoker that routes interceptor/invoke + /// to the server hosting it. Mirrors the SEP's ChainEntry. + /// + internal readonly record struct ChainExecutionEntry(Interceptor Descriptor, InterceptorInvoker Invoker); + + /// + /// Convenience overload for the single-server case: every descriptor shares one invoker. + /// + internal static ValueTask ExecuteAsync( IEnumerable interceptors, InterceptorInvoker invoker, ExecuteChainRequestParams chainParams, CancellationToken cancellationToken) + { + return ExecuteAsync( + interceptors.Select(i => new ChainExecutionEntry(i, invoker)).ToList(), + chainParams, + cancellationToken); + } + + internal static async ValueTask ExecuteAsync( + IReadOnlyList entries, + ExecuteChainRequestParams chainParams, + CancellationToken cancellationToken) { if (chainParams.Phase is not (InterceptorPhase.Request or InterceptorPhase.Response)) { @@ -45,15 +66,17 @@ internal static async ValueTask ExecuteAsync( ChainAbortInfo? abortInfo = null; var status = InterceptorChainStatus.Success; - var applicable = FilterInterceptors(interceptors, chainParams); - - var mutations = applicable.Where(i => i.Type == InterceptorType.Mutation) - .OrderBy(i => i.PriorityHint?.GetEffective(chainParams.Phase) ?? 0) - .ThenBy(i => i.Name, StringComparer.Ordinal) + // Merge & sort: one global order across all servers by effective priority + // (ascending, alphabetical tie-break), then partition by type. The stable sort + // preserves caller-supplied server order for exact ties. + var applicable = FilterEntries(entries, chainParams) + .OrderBy(e => e.Descriptor.PriorityHint?.GetEffective(chainParams.Phase) ?? 0) + .ThenBy(e => e.Descriptor.Name, StringComparer.Ordinal) .ToList(); - var validations = applicable.Where(i => i.Type == InterceptorType.Validation).ToList(); - var sinks = applicable.Where(i => i.Type == InterceptorType.Sink).ToList(); + var mutations = applicable.Where(e => e.Descriptor.Type == InterceptorType.Mutation).ToList(); + var validations = applicable.Where(e => e.Descriptor.Type == InterceptorType.Validation).ToList(); + var sinks = applicable.Where(e => e.Descriptor.Type == InterceptorType.Sink).ToList(); using var timeoutCts = chainParams.TimeoutMs.HasValue ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) @@ -69,23 +92,23 @@ internal static async ValueTask ExecuteAsync( if (chainParams.Phase == InterceptorPhase.Request) { // Sending: Mutations -> Validations -> Sinks - (currentPayload, status, abortInfo) = await ExecuteMutationsAsync(mutations, invoker, chainParams, currentPayload, results, ct); + (currentPayload, status, abortInfo) = await ExecuteMutationsAsync(mutations, chainParams, currentPayload, results, ct); if (status != InterceptorChainStatus.Success) goto Done; - (status, abortInfo) = await ExecuteValidationsAsync(validations, invoker, chainParams, currentPayload, results, summary, ct); + (status, abortInfo) = await ExecuteValidationsAsync(validations, chainParams, currentPayload, results, summary, ct); if (status != InterceptorChainStatus.Success) goto Done; - await ExecuteSinksAsync(sinks, invoker, chainParams, currentPayload, results, ct); + await ExecuteSinksAsync(sinks, chainParams, currentPayload, results, ct); } else { // Receiving: Validations -> Sinks -> Mutations - (status, abortInfo) = await ExecuteValidationsAsync(validations, invoker, chainParams, currentPayload, results, summary, ct); + (status, abortInfo) = await ExecuteValidationsAsync(validations, chainParams, currentPayload, results, summary, ct); if (status != InterceptorChainStatus.Success) goto Done; - await ExecuteSinksAsync(sinks, invoker, chainParams, currentPayload, results, ct); + await ExecuteSinksAsync(sinks, chainParams, currentPayload, results, ct); - (currentPayload, status, abortInfo) = await ExecuteMutationsAsync(mutations, invoker, chainParams, currentPayload, results, ct); + (currentPayload, status, abortInfo) = await ExecuteMutationsAsync(mutations, chainParams, currentPayload, results, ct); } } catch (OperationCanceledException) when (timeoutCts?.IsCancellationRequested == true) @@ -109,14 +132,13 @@ internal static async ValueTask ExecuteAsync( } private static async ValueTask<(JsonNode payload, InterceptorChainStatus status, ChainAbortInfo? abort)> ExecuteMutationsAsync( - List mutations, - InterceptorInvoker invoker, + List mutations, ExecuteChainRequestParams chainParams, JsonNode currentPayload, List results, CancellationToken ct) { - foreach (var descriptor in mutations) + foreach (var (descriptor, invoker) in mutations) { var isAudit = descriptor.Mode == InterceptorMode.Audit; var failOpen = descriptor.FailOpen == true; @@ -157,21 +179,21 @@ internal static async ValueTask ExecuteAsync( } private static async ValueTask<(InterceptorChainStatus status, ChainAbortInfo? abort)> ExecuteValidationsAsync( - List validations, - InterceptorInvoker invoker, + List validations, ExecuteChainRequestParams chainParams, JsonNode currentPayload, List results, ChainValidationSummary summary, CancellationToken ct) { - var tasks = validations.Select(async descriptor => + var tasks = validations.Select(async entry => { + var descriptor = entry.Descriptor; try { var invokeParams = CreateInvokeParams(descriptor, chainParams, currentPayload); var sw = Stopwatch.StartNew(); - var result = await invoker(invokeParams, ct); + var result = await entry.Invoker(invokeParams, ct); sw.Stop(); result.InterceptorName = descriptor.Name; result.DurationMs = sw.ElapsedMilliseconds; @@ -238,20 +260,20 @@ internal static async ValueTask ExecuteAsync( } private static async ValueTask ExecuteSinksAsync( - List sinks, - InterceptorInvoker invoker, + List sinks, ExecuteChainRequestParams chainParams, JsonNode currentPayload, List results, CancellationToken ct) { - var tasks = sinks.Select(async descriptor => + var tasks = sinks.Select(async entry => { + var descriptor = entry.Descriptor; try { var invokeParams = CreateInvokeParams(descriptor, chainParams, currentPayload); var sw = Stopwatch.StartNew(); - var result = await invoker(invokeParams, ct); + var result = await entry.Invoker(invokeParams, ct); sw.Stop(); result.InterceptorName = descriptor.Name; result.DurationMs = sw.ElapsedMilliseconds; @@ -271,14 +293,15 @@ private static async ValueTask ExecuteSinksAsync( results.AddRange(completedResults); } - private static List FilterInterceptors( - IEnumerable interceptors, + private static List FilterEntries( + IReadOnlyList entries, ExecuteChainRequestParams chainParams) { - var result = new List(); + var result = new List(); - foreach (var descriptor in interceptors) + foreach (var entry in entries) { + var descriptor = entry.Descriptor; if (chainParams.InterceptorNames is { Count: > 0 } names && !names.Contains(descriptor.Name)) { continue; @@ -304,7 +327,7 @@ private static List FilterInterceptors( continue; } - result.Add(descriptor); + result.Add(entry); } return result; diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/McpClientInterceptorExtensions.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/McpClientInterceptorExtensions.cs index d53eb77..e0a7b03 100644 --- a/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/McpClientInterceptorExtensions.cs +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Client/McpClientInterceptorExtensions.cs @@ -49,21 +49,14 @@ public static ValueTask InvokeInterceptorAsync( /// dispatches each one via interceptor/invoke, orchestrating ordering, parallelism, /// audit-mode, and fail-open semantics locally. /// - public static async ValueTask ExecuteChainAsync( + public static ValueTask ExecuteChainAsync( this McpClient client, ExecuteChainRequestParams requestParams, CancellationToken cancellationToken = default) { + ArgumentNullException.ThrowIfNull(client); ArgumentNullException.ThrowIfNull(requestParams); - var listed = await client.ListInterceptorsAsync( - new ListInterceptorsRequestParams { Event = requestParams.Event }, - cancellationToken).ConfigureAwait(false); - - return await InterceptorChainOrchestrator.ExecuteAsync( - listed.Interceptors, - (invokeParams, ct) => client.InvokeInterceptorAsync(invokeParams, ct), - requestParams, - cancellationToken).ConfigureAwait(false); + return InterceptorChain.ExecuteAsync([client], requestParams, cancellationToken); } } diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/InterceptorChainRunner.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/InterceptorChainRunner.cs index d817bab..263534d 100644 --- a/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/InterceptorChainRunner.cs +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/InterceptorChainRunner.cs @@ -43,39 +43,30 @@ internal bool ShouldIntercept(string eventName) } /// - /// Runs the interceptor chain across all configured interceptor clients sequentially. - /// Each client's (SDK-level chain - /// orchestration via list + invoke) receives the original or last successful payload from the - /// previous one. Any non-success result stops the chain immediately. + /// Runs the interceptor chain phase across all configured interceptor clients as a single + /// merged chain per the SEP: interceptors discovered from every client are combined, sorted + /// globally by priority hint (alphabetical tie-break), and each is invoked on the client that + /// hosts it via . /// - /// The payload after the last successful client and the final chain status. + /// The final payload (the original on failure) and the chain status. internal async ValueTask<(JsonNode payload, InterceptorChainStatus status)> RunChainPhaseAsync( string eventName, InterceptorPhase phase, JsonNode payload, CancellationToken ct) { - var currentPayload = payload; - - foreach (var client in _interceptorClients) - { - var chainResult = await client.ExecuteChainAsync( - new ExecuteChainRequestParams - { - Event = eventName, - Phase = phase, - Payload = currentPayload, - TimeoutMs = _timeoutMs, - Context = _defaultContext, - }, - ct); - - if (chainResult.Status != InterceptorChainStatus.Success) + var chainResult = await InterceptorChain.ExecuteAsync( + _interceptorClients, + new ExecuteChainRequestParams { - return (currentPayload, chainResult.Status); - } - - currentPayload = chainResult.FinalPayload ?? currentPayload; - } + Event = eventName, + Phase = phase, + Payload = payload, + TimeoutMs = _timeoutMs, + Context = _defaultContext, + }, + ct); - return (currentPayload, InterceptorChainStatus.Success); + return chainResult.Status == InterceptorChainStatus.Success + ? (chainResult.FinalPayload ?? payload, InterceptorChainStatus.Success) + : (payload, chainResult.Status); } /// diff --git a/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/McpInterceptorGatewayOptions.cs b/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/McpInterceptorGatewayOptions.cs index 4913617..e7c7406 100644 --- a/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/McpInterceptorGatewayOptions.cs +++ b/csharp/sdk/src/ModelContextProtocol.Interceptors/Gateway/McpInterceptorGatewayOptions.cs @@ -14,7 +14,8 @@ public sealed class McpInterceptorGatewayOptions public required McpClient BackendClient { get; set; } /// - /// Gets or sets the clients connected to interceptor servers, executed in order. + /// Gets or sets the clients connected to interceptor servers. Their interceptors are merged + /// into a single chain ordered globally by priority hint (alphabetical tie-break). /// Use this when the interceptor clients are already connected. /// public IReadOnlyList? InterceptorClients { get; set; } diff --git a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainOrchestratorTests.cs b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainOrchestratorTests.cs index a9ab420..d46c40b 100644 --- a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainOrchestratorTests.cs +++ b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainOrchestratorTests.cs @@ -565,8 +565,189 @@ public async Task ValidationSummaryCountsCorrectly() Assert.Equal(1, result.ValidationSummary.Infos); } + // ── Multi-server chains ───────────────────────────────────────────────── + + [Fact] + public async Task MultiServer_MutationsOrderByGlobalPriorityAcrossServers() + { + var executionOrder = new List(); + + TestEntry Mut(string name, PriorityHint hint) => CreateInterceptor(name, InterceptorType.Mutation, (req, _) => + { + executionOrder.Add(name); + return new ValueTask(new MutationInterceptorResult { Modified = false }); + }, priorityHint: hint); + + // Server A hosts the high-priority mutation, server B the low-priority one. + // The merged chain must run B's mutation first regardless of server order. + var serverA = Server(Mut("server-a-late", 100)); + var serverB = Server(Mut("server-b-early", -100)); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverA, .. serverB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.Equal(["server-b-early", "server-a-late"], executionOrder); + } + + [Fact] + public async Task MultiServer_MutationsTieBreakAlphabeticallyAcrossServers() + { + var executionOrder = new List(); + + TestEntry Mut(string name) => CreateInterceptor(name, InterceptorType.Mutation, (req, _) => + { + executionOrder.Add(name); + return new ValueTask(new MutationInterceptorResult { Modified = false }); + }, priorityHint: 0); + + var serverA = Server(Mut("zulu")); + var serverB = Server(Mut("alpha")); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverA, .. serverB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.Equal(["alpha", "zulu"], executionOrder); + } + + [Fact] + public async Task MultiServer_PayloadChainsAcrossServersInOneMutationPass() + { + var serverA = Server(CreateInterceptor("a-first", InterceptorType.Mutation, (req, _) => + { + var obj = req.Payload.AsObject(); + obj["fromA"] = true; + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }, priorityHint: -1)); + + var serverB = Server(CreateInterceptor("b-second", InterceptorType.Mutation, (req, _) => + { + Assert.True(req.Payload["fromA"]!.GetValue()); // Receives server A's output + var obj = req.Payload.AsObject(); + obj["fromB"] = true; + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }, priorityHint: 1)); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverB, .. serverA], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{"original":true}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.True(result.FinalPayload!["original"]!.GetValue()); + Assert.True(result.FinalPayload["fromA"]!.GetValue()); + Assert.True(result.FinalPayload["fromB"]!.GetValue()); + } + + [Fact] + public async Task MultiServer_ValidationsFromAllServersSeePostMutationPayload() + { + var validatedPayloads = new List(); + + var serverA = Server( + CreateInterceptor("mutator", InterceptorType.Mutation, (req, _) => + { + var obj = req.Payload.AsObject(); + obj["mutated"] = true; + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }), + CreateInterceptor("val-a", InterceptorType.Validation, (req, _) => + { + lock (validatedPayloads) validatedPayloads.Add(req.Payload); + return new ValueTask(ValidationInterceptorResult.Success()); + })); + + var serverB = Server(CreateInterceptor("val-b", InterceptorType.Validation, (req, _) => + { + lock (validatedPayloads) validatedPayloads.Add(req.Payload); + return new ValueTask(ValidationInterceptorResult.Success()); + })); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverA, .. serverB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.Equal(2, validatedPayloads.Count); + // Both servers' validations run after the mutation from server A, against its output. + Assert.All(validatedPayloads, p => Assert.True(p["mutated"]!.GetValue())); + } + + [Fact] + public async Task MultiServer_DuplicateNamesInvokeOncePerHostingServer() + { + var serverAInvocations = 0; + var serverBInvocations = 0; + + var serverA = Server(CreateInterceptor("shared-name", InterceptorType.Validation, (req, _) => + { + Interlocked.Increment(ref serverAInvocations); + return new ValueTask(ValidationInterceptorResult.Success()); + })); + + var serverB = Server(CreateInterceptor("shared-name", InterceptorType.Validation, (req, _) => + { + Interlocked.Increment(ref serverBInvocations); + return new ValueTask(ValidationInterceptorResult.Success()); + })); + + var result = await InterceptorChainOrchestrator.ExecuteAsync( + [.. serverA, .. serverB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }, + CancellationToken.None); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + // Each entry routes to its own host: one invocation per server, two results. + Assert.Equal(1, serverAInvocations); + Assert.Equal(1, serverBInvocations); + Assert.Equal(2, result.Results.Count); + Assert.All(result.Results, r => Assert.Equal("shared-name", r.InterceptorName)); + } + // ── Helpers ───────────────────────────────────────────────────────────── + /// + /// Models one fake server: each of its entries gets an invoker bound to that server's own + /// handler, so duplicate names across servers stay distinct. + /// + private static List Server(params TestEntry[] entries) + { + return entries.Select(e => new InterceptorChainOrchestrator.ChainExecutionEntry( + e.Descriptor, + (req, ct) => e.Handler(req, ct))).ToList(); + } + private static ValueTask RunAsync( IEnumerable entries, ExecuteChainRequestParams chainParams, diff --git a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainTests.cs b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainTests.cs new file mode 100644 index 0000000..08cb0b7 --- /dev/null +++ b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/InterceptorChainTests.cs @@ -0,0 +1,245 @@ +using System.Text.Json; +using System.Text.Json.Nodes; +using ModelContextProtocol; +using ModelContextProtocol.Client; +using ModelContextProtocol.Interceptors.Client; +using ModelContextProtocol.Interceptors.Protocol; +using ModelContextProtocol.Interceptors.Server; +using ModelContextProtocol.Server; +using Xunit; + +namespace ModelContextProtocol.Interceptors.Tests; + +/// +/// Tests for the public API over real in-memory interceptor servers. +/// +public class InterceptorChainTests +{ + [Fact] + public async Task DiscoverAsync_MergesEntriesFromAllServersWithServerAttribution() + { + await using var fixture = await TwoServerFixture.CreateAsync( + serverAInterceptors: [MutationDescriptor("a-mutator", priorityHint: 10)], + serverBInterceptors: [MutationDescriptor("b-mutator", priorityHint: -10)]); + + var chain = await InterceptorChain.DiscoverAsync([fixture.ClientA, fixture.ClientB]); + + Assert.Equal(2, chain.Entries.Count); + var entryA = Assert.Single(chain.Entries, e => e.Interceptor.Name == "a-mutator"); + var entryB = Assert.Single(chain.Entries, e => e.Interceptor.Name == "b-mutator"); + Assert.Same(fixture.ClientA, entryA.Server); + Assert.Same(fixture.ClientB, entryB.Server); + } + + [Fact] + public async Task ExecuteAsync_RunsMergedChainInGlobalPriorityOrderAcrossServers() + { + // Server A's mutation has the higher priority value, so server B's must run first + // and server A's must observe B's payload — the opposite of per-server sequencing. + await using var fixture = await TwoServerFixture.CreateAsync( + serverAInterceptors: [MutationDescriptor("a-late", priorityHint: 10)], + serverBInterceptors: [MutationDescriptor("b-early", priorityHint: -10)], + handler: (req, _) => + { + var obj = req.Payload!.AsObject(); + var order = obj["order"]?.AsArray() ?? []; + order.Add(req.Name); + obj["order"] = order.DeepClone(); + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }); + + var result = await InterceptorChain.ExecuteAsync( + [fixture.ClientA, fixture.ClientB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.Equal(2, result.Results.Count); + var order = result.FinalPayload!["order"]!.AsArray().Select(n => n!.GetValue()).ToList(); + Assert.Equal(["b-early", "a-late"], order); + } + + [Fact] + public async Task ExecuteAsync_FailsClosedWhenAnyServerListFails() + { + await using var fixture = await TwoServerFixture.CreateAsync( + serverAInterceptors: [MutationDescriptor("a-mutator", priorityHint: 0)], + // Server B does not speak the interceptor protocol at all, so interceptors/list fails. + serverBInterceptors: null); + + await Assert.ThrowsAsync(async () => + { + await InterceptorChain.ExecuteAsync( + [fixture.ClientA, fixture.ClientB], + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{}""")!, + }); + }); + } + + [Fact] + public async Task SingleClientExecuteChainAsync_StillWorksThroughMergedPath() + { + await using var fixture = await TwoServerFixture.CreateAsync( + serverAInterceptors: [MutationDescriptor("solo-mutator", priorityHint: 0)], + serverBInterceptors: null, + handler: (req, _) => + { + var obj = req.Payload!.AsObject(); + obj["mutated"] = true; + return new ValueTask(new MutationInterceptorResult { Modified = true, Payload = obj }); + }); + + var result = await fixture.ClientA.ExecuteChainAsync( + new ExecuteChainRequestParams + { + Event = InterceptionEvents.ToolsCall, + Phase = InterceptorPhase.Request, + Payload = JsonNode.Parse("""{"original":true}""")!, + }); + + Assert.Equal(InterceptorChainStatus.Success, result.Status); + Assert.True(result.FinalPayload!["mutated"]!.GetValue()); + Assert.True(result.FinalPayload["original"]!.GetValue()); + } + + // ── Helpers ───────────────────────────────────────────────────────────── + + private static Interceptor MutationDescriptor(string name, PriorityHint priorityHint) => new() + { + Name = name, + Type = InterceptorType.Mutation, + Hooks = + [ + new InterceptorHook { Events = [InterceptionEvents.All], Phase = InterceptorPhase.Request }, + new InterceptorHook { Events = [InterceptionEvents.All], Phase = InterceptorPhase.Response }, + ], + PriorityHint = priorityHint, + }; + + private sealed class TwoServerFixture : IAsyncDisposable + { + private readonly List _disposables; + + public McpClient ClientA { get; } + public McpClient ClientB { get; } + + private TwoServerFixture(McpClient clientA, McpClient clientB, List disposables) + { + ClientA = clientA; + ClientB = clientB; + _disposables = disposables; + } + + /// + /// Spins up two in-memory servers. A interceptor list creates a + /// plain MCP server without the interceptor protocol (its interceptors/list fails). + /// + public static async Task CreateAsync( + Interceptor[]? serverAInterceptors, + Interceptor[]? serverBInterceptors, + Func>? handler = null) + { + handler ??= (req, _) => new ValueTask(new MutationInterceptorResult { Modified = false }); + var disposables = new List(); + + try + { + var (serverA, clientA) = await McpInterceptorGatewayTests.GatewayTestFixture.CreateServerClientPairForTesting( + "chain-server-a", options => Configure(options, serverAInterceptors, handler)); + disposables.Add(serverA); + disposables.Add(clientA); + + var (serverB, clientB) = await McpInterceptorGatewayTests.GatewayTestFixture.CreateServerClientPairForTesting( + "chain-server-b", options => Configure(options, serverBInterceptors, handler)); + disposables.Add(serverB); + disposables.Add(clientB); + + return new TwoServerFixture(clientA, clientB, disposables); + } + catch + { + foreach (var d in disposables) + { + await d.DisposeAsync(); + } + + throw; + } + } + + private static void Configure( + McpServerOptions options, + Interceptor[]? interceptors, + Func> handler) + { + if (interceptors is null) + { + return; + } + + var collection = new McpServerPrimitiveCollection(); + foreach (var descriptor in interceptors) + { + collection.Add(new DelegatingInterceptor(descriptor, handler)); + } + + var filter = new InterceptorMessageFilter(collection); + options.Filters.Message.IncomingFilters.Add(filter.CreateFilter); + + options.Capabilities ??= new(); +#pragma warning disable MCPEXP001 + options.Capabilities.Extensions ??= new Dictionary(); + options.Capabilities.Extensions[InterceptorProtocolConstants.ExtensionCapabilityKey] = JsonSerializer.SerializeToElement( + new InterceptorsCapability { SupportedEvents = [InterceptionEvents.All] }, + InterceptorJsonUtilities.DefaultOptions); +#pragma warning restore MCPEXP001 + } + + public async ValueTask DisposeAsync() + { + for (var i = _disposables.Count - 1; i >= 0; i--) + { + try + { + await _disposables[i].DisposeAsync(); + } + catch + { + // Swallow disposal errors in tests + } + } + } + } + + private sealed class DelegatingInterceptor : McpServerInterceptor + { + private readonly Interceptor _interceptor; + private readonly Func> _handler; + + public DelegatingInterceptor( + Interceptor interceptor, + Func> handler) + { + _interceptor = interceptor; + _handler = handler; + } + + public override Interceptor ProtocolInterceptor => _interceptor; + public override IReadOnlyList Metadata => []; + + public override ValueTask InvokeAsync( + InvokeInterceptorRequestParams request, + McpServer server, + IServiceProvider? services, + CancellationToken cancellationToken = default) => + _handler(request, cancellationToken); + } +} diff --git a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/McpInterceptorGatewayTests.cs b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/McpInterceptorGatewayTests.cs index 55d3dce..a27d5bb 100644 --- a/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/McpInterceptorGatewayTests.cs +++ b/csharp/sdk/tests/ModelContextProtocol.Interceptors.Tests/McpInterceptorGatewayTests.cs @@ -432,15 +432,78 @@ public async Task CallToolAsync_ChainsAllInterceptorClients() })], ]); - // Call tool — both interceptors should run (A first, then B) + // Call tool — both interceptors should run (A first, then B: equal priority, + // alphabetical tie-break in the merged chain) var result = await fixture.ProxyClient.CallToolAsync("echo", new Dictionary { ["message"] = "hello" }); - // Both interceptor chains ran in order: A prepended first, then B + // A prepended first, then B var text = result.Content[0].ToString()!; Assert.Contains("B:A:hello", text); } + [Fact] + public async Task CallToolAsync_OrdersMutationsByGlobalPriorityAcrossInterceptorClients() + { + // The first server hosts the higher-priority-value (later) mutation, the second server + // the lower-priority-value (earlier) one. The merged chain must run the second server's + // mutation first — under per-server sequencing it would run last. + static McpServerInterceptor PrependMutator(string name, int priority, string prefix) => + new TestInterceptor( + new Interceptor + { + Name = name, + Type = InterceptorType.Mutation, + PriorityHint = priority, + Hooks = [new InterceptorHook { Events = [InterceptionEvents.All], Phase = InterceptorPhase.Request }], + }, + (req, _, _, _) => + { + var obj = JsonNode.Parse(req.Payload!.ToJsonString())!.AsObject(); + if (obj["arguments"]?["message"] is JsonNode msgNode) + { + obj["arguments"]!["message"] = prefix + msgNode.GetValue(); + } + return new ValueTask(new MutationInterceptorResult + { + Modified = true, + Payload = obj, + }); + }); + + await using var fixture = await GatewayTestFixture.CreateWithMultipleInterceptorServersAsync( + backendConfigure: (options) => + { + options.Capabilities ??= new(); + options.Capabilities.Tools ??= new(); + options.Handlers.ListToolsHandler = (request, ct) => + new ValueTask(new ListToolsResult + { + Tools = [new Tool { Name = "echo", Description = "Echo" }], + }); + options.Handlers.CallToolHandler = (request, ct) => + { + var msg = request.Params!.Arguments?["message"]; + return new ValueTask(new CallToolResult + { + Content = [new TextContentBlock { Text = $"echo: {msg}" }], + }); + }; + }, + interceptorConfigs: + [ + [PrependMutator("late-mutator", priority: 1000, prefix: "LATE:")], + [PrependMutator("early-mutator", priority: -1000, prefix: "EARLY:")], + ]); + + var result = await fixture.ProxyClient.CallToolAsync("echo", + new Dictionary { ["message"] = "hello" }); + + // early (server 2) ran first, late (server 1) ran second and prepended last. + var text = result.Content[0].ToString()!; + Assert.Contains("LATE:EARLY:hello", text); + } + [Fact] public async Task SubscribeMutation_UsesModifiedPayload() {