Skip to content

Commit 94da89a

Browse files
committed
Serve subscriptions/listen with a pluggable event bus (SEP-2575)
On the 2026-07-28 wire there is no standing GET stream: clients opt in to server events via a subscriptions/listen request whose response is the stream. Add the server-side runtime: - mcp/server/subscriptions.py: an EventBus protocol (publish/subscribe over four typed ServerEvent kinds) with an in-process InMemoryEventBus default; implement it over an external pub/sub backend (e.g. Redis) to fan events out across replicas. ListenHandler serves the method: ack-first, per-stream filter honoring, subscription-id tagging on every frame, and close() ends all streams gracefully with the stamped SubscriptionsListenResult. - MCPServer takes subscriptions= (defaults to the in-memory bus), registers the handler automatically, and exposes the bus as a property. Context gains notify_tools_changed / notify_prompts_changed / notify_resources_changed / notify_resource_updated to publish from inside handlers. - Lowlevel Server users compose the same parts themselves via the existing on_subscriptions_listen slot; no lowlevel, session, or transport changes. - Remove the server-stateless conformance baselines: the scenario's listen checks (ack-first, subscription-id tagging, filter honoring) now pass.
1 parent 0b200ef commit 94da89a

9 files changed

Lines changed: 550 additions & 15 deletions

File tree

.github/actions/conformance/expected-failures.2026-07-28.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
client: []
2424

2525
server:
26-
# SEP-2575 subscriptions/listen is not implemented yet; see the matching
27-
# entry in expected-failures.yml for the full rationale.
28-
- server-stateless
2926
# SEP-2243 Mcp-Param-* server-side validation is not implemented yet; see
3027
# the matching entry in expected-failures.yml for the full rationale.
3128
- http-custom-header-server-validation

.github/actions/conformance/expected-failures.yml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,6 @@
1313
client: []
1414

1515
server:
16-
# SEP-2575 subscriptions/listen is not implemented yet. The everything-
17-
# server's legacy resources/subscribe handlers make it advertise
18-
# `resources.subscribe` in server/discover, and as of conformance #372 a
19-
# server that advertises a subscription capability but answers
20-
# subscriptions/listen with -32601 fails the three listen MUST checks
21-
# ("Not testable") instead of skipping them. Remove this entry when the
22-
# listen runtime lands. NOTE: while listed, this entry also masks new
23-
# failures in the scenario's other 25 (currently passing) checks — the
24-
# baseline is per-scenario, not per-check.
25-
- server-stateless
2616
# SEP-2243 Mcp-Param-* server-side validation is not implemented yet. The
2717
# everything-server's `test_x_mcp_header` tool arms these checks (without an
2818
# x-mcp-header-annotated tool the harness skips all of them silently); the

docs/advanced/low-level-server.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ Each of these is one idea you now have the vocabulary for; each has its own chap
183183

184184
* `on_call_tool`, `on_get_prompt`, and `on_read_resource` may return an `InputRequiredResult` instead of their normal result to pause the call and ask the client for input; see **[Multi-round-trip requests](multi-round-trip.md)**.
185185
* `on_list_resources`, `on_read_resource`, `on_list_prompts`, `on_get_prompt`, `on_completion` are the same `(ctx, params) -> result` shape for the other primitives.
186+
* `on_subscriptions_listen` serves the 2026-07-28 `subscriptions/listen` stream. Pass an `mcp.server.subscriptions.ListenHandler` built over an `EventBus` (the in-memory default, or your own — e.g. Redis-backed), keep the bus where your other handlers can reach it (the lifespan is a natural home), and publish `ServerEvent`s to it. The handler owns the wire semantics: ack-first, per-stream filtering, and subscription-id tagging.
186187
* `server.streamable_http_app()` returns the same Starlette app `MCPServer`'s does; deploy it the way **[Running your server](../run/index.md)** deploys any other ASGI app. There is no `server.run(transport=...)` down here: `server.run(read_stream, write_stream, server.create_initialization_options())` drives one connection over a pair of streams, and that one line is the whole story.
187188

188189
## Recap

docs/tutorial/context.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ What a server offers is not fixed at import time. Register a tool at runtime, th
104104

105105
The siblings are `send_resource_list_changed()`, `send_prompt_list_changed()`, and `send_resource_updated(uri)` for a change to one specific resource.
106106

107+
On a 2026-07-28 connection, clients receive change notifications only on a `subscriptions/listen` stream they opened. The `Context` publish methods — `ctx.notify_tools_changed()`, `ctx.notify_prompts_changed()`, `ctx.notify_resources_changed()`, and `ctx.notify_resource_updated(uri)` — deliver to every subscribed stream at once, and are synchronous (no `await`). Behind a load balancer, pass your own `EventBus` implementation as `MCPServer(subscriptions=...)` to fan events out across replicas; the in-process default covers a single server.
108+
107109
!!! check
108110
Before anyone runs `enable_recommendations`, the tool you are promising does not exist. Call it
109111
anyway and the result is an error the model can read:

src/mcp/server/mcpserver/context.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
elicit_with_validation,
1717
)
1818
from mcp.server.lowlevel.helper_types import ReadResourceContents
19+
from mcp.server.subscriptions import PromptsListChanged, ResourcesListChanged, ResourceUpdated, ToolsListChanged
1920
from mcp.shared.exceptions import MCPDeprecationWarning
2021

2122
if TYPE_CHECKING:
@@ -78,9 +79,9 @@ def __init__(
7879
@property
7980
def mcp_server(self) -> MCPServer:
8081
"""Access to the MCPServer instance."""
81-
if self._mcp_server is None: # pragma: no cover
82+
if self._mcp_server is None:
8283
raise ValueError("Context is not available outside of a request")
83-
return self._mcp_server # pragma: no cover
84+
return self._mcp_server
8485

8586
@property
8687
def request_context(self) -> ServerRequestContext[LifespanContextT, RequestT]:
@@ -109,6 +110,22 @@ async def report_progress(self, progress: float, total: float | None = None, mes
109110
"""
110111
await self.request_context.session.report_progress(progress, total, message)
111112

113+
def notify_tools_changed(self) -> None:
114+
"""Publish a tools list-changed event to `subscriptions/listen` subscribers."""
115+
self.mcp_server.subscriptions.publish(ToolsListChanged())
116+
117+
def notify_prompts_changed(self) -> None:
118+
"""Publish a prompts list-changed event to `subscriptions/listen` subscribers."""
119+
self.mcp_server.subscriptions.publish(PromptsListChanged())
120+
121+
def notify_resources_changed(self) -> None:
122+
"""Publish a resources list-changed event to `subscriptions/listen` subscribers."""
123+
self.mcp_server.subscriptions.publish(ResourcesListChanged())
124+
125+
def notify_resource_updated(self, uri: str | AnyUrl) -> None:
126+
"""Publish a resource-updated event for `uri` to `subscriptions/listen` subscribers."""
127+
self.mcp_server.subscriptions.publish(ResourceUpdated(uri=str(uri)))
128+
112129
async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]:
113130
"""Read a resource by URI.
114131

src/mcp/server/mcpserver/server.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
from mcp.server.stdio import stdio_server
8888
from mcp.server.streamable_http import EventStore
8989
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
90+
from mcp.server.subscriptions import EventBus, InMemoryEventBus, ListenHandler
9091
from mcp.server.transport_security import TransportSecuritySettings
9192
from mcp.shared.exceptions import MCPError
9293
from mcp.shared.uri_template import UriTemplate
@@ -171,6 +172,7 @@ def __init__(
171172
auth: AuthSettings | None = None,
172173
resource_security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY,
173174
cache_hints: Mapping[CacheableMethod, CacheHint] | None = None,
175+
subscriptions: EventBus | None = None,
174176
):
175177
self._resource_security = resource_security
176178
self.settings = Settings(
@@ -190,6 +192,10 @@ def __init__(
190192
resources=resources, warn_on_duplicate_resources=self.settings.warn_on_duplicate_resources
191193
)
192194
self._prompt_manager = PromptManager(warn_on_duplicate_prompts=self.settings.warn_on_duplicate_prompts)
195+
# The subscriptions/listen fan-out seam (2026-07-28). The default bus is
196+
# in-process; pass an `EventBus` implementation over an external pub/sub
197+
# backend to fan events out across replicas.
198+
self._subscriptions: EventBus = subscriptions if subscriptions is not None else InMemoryEventBus()
193199
self._lowlevel_server = Server(
194200
name=name or "mcp-server",
195201
title=title,
@@ -206,6 +212,7 @@ def __init__(
206212
on_list_resource_templates=self._handle_list_resource_templates,
207213
on_list_prompts=self._handle_list_prompts,
208214
on_get_prompt=self._handle_get_prompt,
215+
on_subscriptions_listen=ListenHandler(self._subscriptions),
209216
# TODO(Marcelo): It seems there's a type mismatch between the lifespan type from an MCPServer and Server.
210217
# We need to create a Lifespan type that is a generic on the server type, like Starlette does.
211218
lifespan=(lifespan_wrapper(self, self.settings.lifespan) if self.settings.lifespan else default_lifespan), # type: ignore
@@ -263,6 +270,16 @@ def icons(self) -> list[Icon] | None:
263270
def version(self) -> str | None:
264271
return self._lowlevel_server.version
265272

273+
@property
274+
def subscriptions(self) -> EventBus:
275+
"""The `subscriptions/listen` event bus.
276+
277+
Publish a `ServerEvent` here (or via the `Context.notify_*` methods)
278+
to deliver it to subscribed clients. The bus passed to the constructor,
279+
or the in-process default.
280+
"""
281+
return self._subscriptions
282+
266283
@property
267284
def session_manager(self) -> StreamableHTTPSessionManager:
268285
"""Get the StreamableHTTP session manager.

src/mcp/server/subscriptions.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
"""Server-side `subscriptions/listen` support (2026-07-28, SEP-2575).
2+
3+
On the 2026-07-28 wire there is no standing GET stream: a client opts in to
4+
server events by sending a `subscriptions/listen` request whose response IS
5+
the stream. This module provides the two pieces a server needs:
6+
7+
- `EventBus`: the pluggable fan-out seam. The bus carries typed `ServerEvent`
8+
values, not wire notifications - the listen handler owns subscription-id
9+
stamping and per-stream filtering, so a custom bus (e.g. backed by Redis
10+
pub/sub for multi-replica deployments) never sees JSON-RPC. The in-process
11+
default is `InMemoryEventBus`.
12+
- `ListenHandler`: the request handler that serves `subscriptions/listen`.
13+
`MCPServer` registers one automatically; lowlevel `Server` users pass an
14+
instance as `on_subscriptions_listen=`.
15+
16+
Per the spec, the handler acknowledges first (the ack is the first frame on
17+
the stream), tags every frame with the listen request's JSON-RPC id under
18+
`_meta["io.modelcontextprotocol/subscriptionId"]`, and never delivers an
19+
event kind the client did not request. Delivery is fire-and-forget with no
20+
replay: a dropped stream is not resumable - clients re-listen and refetch.
21+
"""
22+
23+
from __future__ import annotations
24+
25+
import math
26+
from collections.abc import Callable
27+
from dataclasses import dataclass
28+
from typing import Any, Protocol
29+
30+
import anyio
31+
import anyio.streams.memory
32+
from mcp_types import (
33+
INVALID_REQUEST,
34+
NotificationParams,
35+
PromptListChangedNotification,
36+
ResourceListChangedNotification,
37+
ResourceUpdatedNotification,
38+
ResourceUpdatedNotificationParams,
39+
ServerNotification,
40+
SubscriptionFilter,
41+
SubscriptionsAcknowledgedNotification,
42+
SubscriptionsAcknowledgedNotificationParams,
43+
SubscriptionsListenRequestParams,
44+
SubscriptionsListenResult,
45+
ToolListChangedNotification,
46+
)
47+
48+
from mcp.server.context import ServerRequestContext
49+
from mcp.shared.exceptions import MCPError
50+
51+
SUBSCRIPTION_ID_META_KEY = "io.modelcontextprotocol/subscriptionId"
52+
"""The `_meta` key carrying the subscription id on every listen-stream frame.
53+
54+
The value is the `subscriptions/listen` request's JSON-RPC id, verbatim.
55+
"""
56+
57+
58+
@dataclass(frozen=True)
59+
class ToolsListChanged:
60+
"""The server's tool list changed."""
61+
62+
63+
@dataclass(frozen=True)
64+
class PromptsListChanged:
65+
"""The server's prompt list changed."""
66+
67+
68+
@dataclass(frozen=True)
69+
class ResourcesListChanged:
70+
"""The server's resource list changed."""
71+
72+
73+
@dataclass(frozen=True)
74+
class ResourceUpdated:
75+
"""The resource at `uri` changed and may need to be read again."""
76+
77+
uri: str
78+
79+
80+
ServerEvent = ToolsListChanged | PromptsListChanged | ResourcesListChanged | ResourceUpdated
81+
"""An event a server publishes for delivery to listen subscribers."""
82+
83+
84+
class EventBus(Protocol):
85+
"""Fan-out seam between event publishers and open listen streams.
86+
87+
Implement this over an external pub/sub backend (Redis, NATS, ...) to fan
88+
events out across replicas: `publish` forwards the event to the backend,
89+
and each replica's bus invokes its local listeners for events arriving
90+
from the backend. The same instance can be shared across servers.
91+
92+
Both methods are synchronous and must be called from the server's event
93+
loop thread. Listeners must not raise.
94+
"""
95+
96+
def publish(self, event: ServerEvent) -> None:
97+
"""Deliver `event` to every subscribed listener."""
98+
...
99+
100+
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
101+
"""Register `listener` and return an idempotent unsubscribe callable."""
102+
...
103+
104+
105+
class InMemoryEventBus:
106+
"""In-process `EventBus`: synchronous fan-out to a set of listeners."""
107+
108+
def __init__(self) -> None:
109+
self._listeners: set[Callable[[ServerEvent], None]] = set()
110+
111+
def publish(self, event: ServerEvent) -> None:
112+
"""Deliver `event` to every subscribed listener."""
113+
for listener in list(self._listeners):
114+
listener(event)
115+
116+
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
117+
"""Register `listener` and return an idempotent unsubscribe callable."""
118+
self._listeners.add(listener)
119+
120+
def unsubscribe() -> None:
121+
self._listeners.discard(listener)
122+
123+
return unsubscribe
124+
125+
126+
def _honored_subset(requested: SubscriptionFilter) -> SubscriptionFilter:
127+
"""The subset of `requested` the server will deliver, for the ack.
128+
129+
Every requested kind is honored - whether an event kind ever fires
130+
depends on what the server publishes, exactly as a subscription to a
131+
nonexistent resource URI is honored and never fires. Non-true flags and
132+
an empty URI list are dropped rather than echoed as falsy values.
133+
"""
134+
return SubscriptionFilter(
135+
tools_list_changed=True if requested.tools_list_changed else None,
136+
prompts_list_changed=True if requested.prompts_list_changed else None,
137+
resources_list_changed=True if requested.resources_list_changed else None,
138+
resource_subscriptions=list(requested.resource_subscriptions) if requested.resource_subscriptions else None,
139+
)
140+
141+
142+
def _event_matches(honored: SubscriptionFilter, event: ServerEvent) -> bool:
143+
"""Whether `event` is within the stream's honored filter."""
144+
if isinstance(event, ToolsListChanged):
145+
return honored.tools_list_changed is True
146+
if isinstance(event, PromptsListChanged):
147+
return honored.prompts_list_changed is True
148+
if isinstance(event, ResourcesListChanged):
149+
return honored.resources_list_changed is True
150+
return honored.resource_subscriptions is not None and event.uri in honored.resource_subscriptions
151+
152+
153+
def _event_to_notification(event: ServerEvent, meta: dict[str, Any]) -> ServerNotification:
154+
"""Build the stamped wire notification for `event`."""
155+
if isinstance(event, ToolsListChanged):
156+
return ToolListChangedNotification(params=NotificationParams(_meta=meta))
157+
if isinstance(event, PromptsListChanged):
158+
return PromptListChangedNotification(params=NotificationParams(_meta=meta))
159+
if isinstance(event, ResourcesListChanged):
160+
return ResourceListChangedNotification(params=NotificationParams(_meta=meta))
161+
return ResourceUpdatedNotification(params=ResourceUpdatedNotificationParams(uri=event.uri, _meta=meta))
162+
163+
164+
class ListenHandler:
165+
"""Serves `subscriptions/listen`: one call is one subscription stream.
166+
167+
Register on a lowlevel `Server` via `on_subscriptions_listen=` (or
168+
`add_request_handler`); `MCPServer` does so automatically. Each call
169+
acknowledges the honored filter first, then forwards matching bus events
170+
onto the request's response stream until the client disconnects (which
171+
cancels the handler; the stream just ends, per the spec's abrupt-close
172+
contract) or `close` ends all streams gracefully.
173+
174+
Requires a transport that can stream a request's response (streamable
175+
HTTP's SSE mode, stdio).
176+
"""
177+
178+
def __init__(self, bus: EventBus) -> None:
179+
self._bus = bus
180+
self._streams: set[anyio.streams.memory.MemoryObjectSendStream[ServerEvent]] = set()
181+
182+
async def __call__(
183+
self,
184+
ctx: ServerRequestContext[Any, Any],
185+
params: SubscriptionsListenRequestParams,
186+
) -> SubscriptionsListenResult:
187+
"""Serve one listen stream."""
188+
subscription_id = ctx.request_id
189+
if subscription_id is None:
190+
raise MCPError(INVALID_REQUEST, "subscriptions/listen requires a request id")
191+
honored = _honored_subset(params.notifications)
192+
meta: dict[str, Any] = {SUBSCRIPTION_ID_META_KEY: subscription_id}
193+
194+
# Ack first, subscribe second: no event can precede the ack frame.
195+
await ctx.session.send_notification(
196+
SubscriptionsAcknowledgedNotification(
197+
params=SubscriptionsAcknowledgedNotificationParams(notifications=honored, _meta=meta)
198+
),
199+
related_request_id=subscription_id,
200+
)
201+
202+
# Unbounded buffer so publishers never block on a slow consumer (the
203+
# transport write happens in this handler task, not the publisher's).
204+
send, recv = anyio.create_memory_object_stream[ServerEvent](math.inf)
205+
206+
def deliver(event: ServerEvent) -> None:
207+
if _event_matches(honored, event):
208+
try:
209+
send.send_nowait(event)
210+
except anyio.ClosedResourceError:
211+
# `aclose` closed this stream; the loop below is unwinding.
212+
pass
213+
214+
unsubscribe = self._bus.subscribe(deliver)
215+
self._streams.add(send)
216+
try:
217+
async for event in recv:
218+
await ctx.session.send_notification(
219+
_event_to_notification(event, meta), related_request_id=subscription_id
220+
)
221+
finally:
222+
unsubscribe()
223+
self._streams.discard(send)
224+
send.close()
225+
recv.close()
226+
return SubscriptionsListenResult(_meta=meta)
227+
228+
def close(self) -> None:
229+
"""Gracefully end every open listen stream.
230+
231+
Each stream sends its `SubscriptionsListenResult` (stamped with the
232+
subscription id) as the final frame and closes - the spec's graceful
233+
closure flow, signalling clients not to re-listen.
234+
"""
235+
for stream in list(self._streams):
236+
stream.close()

0 commit comments

Comments
 (0)