Skip to content

Commit 05fb2b6

Browse files
committed
Apply review fixes to subscriptions/listen serving
- Pass the server-scoped subscription bus into the fallback Contexts built by programmatic call_tool/read_resource/get_prompt, so ctx.notify_* works there instead of raising. - Reject subscriptions/listen with 406 when the Accept header lacks text/event-stream: the response is always an SSE stream, so JSON-response mode must not serve a content type the client never accepted. - Release a stream's subscription slot at backlog overflow time; the stream's own cleanup can be wedged in a transport write that closing the buffer cannot wake. - End InMemorySubscriptionBus.publish with a checkpoint so a same-task publish burst lets listen streams drain between events instead of overflowing a healthy stream's buffer. - Describe the graceful-close result as a deliberate end rather than a "don't re-listen" signal: a stream ended at the overflow cap sends the same result and the client should re-listen. - Move a function-body import to the top of the file and fix a stale comment referencing a removed property.
1 parent a6b9f03 commit 05fb2b6

9 files changed

Lines changed: 150 additions & 29 deletions

File tree

docs/advanced/subscriptions.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ Down on the low-level `Server` there is no pre-wired anything — and the same p
7777

7878
* You own the bus, so you publish to it directly: `await bus.publish(ResourceUpdated(uri=...))`. Put it wherever your handlers can reach it — module scope here, the lifespan in a bigger app.
7979
* `ListenHandler(bus)` is the same handler `MCPServer` registers; `on_subscriptions_listen=` is an ordinary handler slot. Don't want the SDK's semantics? Write your own handler for the slot — the spec obligations come with it.
80-
* `ListenHandler.close()` gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately and the client shouldn't re-listen. Without it, streams end when the client disconnects.
80+
* `ListenHandler.close()` gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately — a clean end, as opposed to the abrupt drop a client may treat as a cue to reconnect. Without it, streams end when the client disconnects.
8181

8282
## Recap
8383

src/mcp/server/_streamable_http_modern.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,13 @@ async def handle_modern_request(
383383
await _write(rej, scope, receive, send)
384384
return
385385

386+
if req.method == "subscriptions/listen" and not has_sse:
387+
# A listen response IS a notification stream, never JSON (the
388+
# json_response carve-out below), so this one method requires the
389+
# SSE accept even in JSON-response mode; SSE mode gated it above.
390+
await Response(status_code=406)(scope, receive, send)
391+
return
392+
386393
duplicated = find_duplicated_routing_header(request.headers.items())
387394
if duplicated is not None:
388395
# The raw carrier is the only place duplicates are visible; the classifier sees a folded mapping.

src/mcp/server/mcpserver/server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ async def call_tool(
488488
) -> CallToolResult | InputRequiredResult:
489489
"""Call a tool by name with arguments."""
490490
if context is None:
491-
context = Context(mcp_server=self)
491+
context = Context(mcp_server=self, subscriptions=self._subscriptions)
492492
return await self._tool_manager.call_tool(name, arguments, context, convert_result=True)
493493

494494
async def list_resources(self) -> list[MCPResource]:
@@ -540,7 +540,7 @@ async def read_resource(
540540
ResourceError: If template creation or resource reading fails.
541541
"""
542542
if context is None:
543-
context = Context(mcp_server=self)
543+
context = Context(mcp_server=self, subscriptions=self._subscriptions)
544544
resource = await self._resource_manager.get_resource(uri, context)
545545
if isinstance(resource, InputRequiredResult):
546546
return resource
@@ -1260,7 +1260,7 @@ async def get_prompt(
12601260
carrying the echoed opaque state.
12611261
"""
12621262
if context is None:
1263-
context = Context(mcp_server=self)
1263+
context = Context(mcp_server=self, subscriptions=self._subscriptions)
12641264
try:
12651265
prompt = self._prompt_manager.get_prompt(name)
12661266
if not prompt:

src/mcp/server/subscriptions.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from typing import Any, Protocol
2929

3030
import anyio
31+
import anyio.lowlevel
3132
import anyio.streams.memory
3233
from mcp_types import (
3334
INTERNAL_ERROR,
@@ -118,13 +119,16 @@ async def publish(self, event: ServerEvent) -> None:
118119
"""Deliver `event` to every subscribed listener.
119120
120121
A raising listener is logged and skipped: one bad listener must not
121-
starve the others or fail the publishing handler.
122+
starve the others or fail the publishing handler. Ends with a
123+
checkpoint so a burst of publishes from one task lets listen streams
124+
drain between events instead of overflowing their buffers unread.
122125
"""
123126
for listener in list(self._listeners.values()):
124127
try:
125128
listener(event)
126129
except Exception: # fan-out boundary: isolate listeners from each other
127130
logger.exception("subscription listener raised; continuing")
131+
await anyio.lowlevel.checkpoint()
128132

129133
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
130134
"""Register `listener` and return an idempotent unsubscribe callable."""
@@ -235,6 +239,10 @@ def deliver(event: ServerEvent) -> None:
235239
pass
236240
except anyio.WouldBlock:
237241
logger.warning("listen stream %r backlog full; ending the stream", subscription_id)
242+
# Release the subscription slot now: the handler's own
243+
# cleanup can be wedged in a transport write that closing
244+
# this buffer cannot wake (a client that stopped reading).
245+
self._streams.discard(send)
238246
send.close()
239247

240248
# Subscribe before sending the ack so an event published while the
@@ -267,8 +275,9 @@ def close(self) -> None:
267275
Each stream then drains its buffered events and sends its
268276
`SubscriptionsListenResult` (stamped with the subscription id) as the
269277
final frame from its own handler task - the spec's graceful closure
270-
flow, signalling clients not to re-listen. This method only initiates
271-
that; it does not wait for the streams to finish flushing.
278+
flow, telling clients the stream ended deliberately rather than
279+
dropping. This method only initiates that; it does not wait for the
280+
streams to finish flushing.
272281
"""
273282
for stream in list(self._streams):
274283
stream.close()

tests/docs_src/test_subscriptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ async def listen() -> None:
128128
assert isinstance(updated, types.ResourceUpdatedNotification)
129129
assert updated.params.uri == "note://todo"
130130

131-
# `mcp.subscriptions` / the bus is also the publish surface outside a
131+
# The bus you constructed is also the publish surface outside a
132132
# request; an unrequested kind never reaches this stream.
133133
await tutorial002.bus.publish(ToolsListChanged())
134134
await client.call_tool("edit_note", {"name": "todo", "text": "done"})

tests/server/lowlevel/test_server_discover.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import pytest
1313
from mcp_types.version import MODERN_PROTOCOL_VERSIONS
1414

15-
from mcp.server import Server, ServerRequestContext
15+
from mcp.server import NotificationOptions, Server, ServerRequestContext
1616

1717

1818
# `Server._handle_discover` reads only `ctx.protocol_version` (capabilities are
@@ -216,7 +216,5 @@ async def list_tools(
216216
legacy = server.get_capabilities()
217217
assert legacy.tools is not None and legacy.tools.list_changed is False
218218

219-
from mcp.server import NotificationOptions
220-
221219
opted_in = server.get_capabilities(NotificationOptions(tools_changed=True))
222220
assert opted_in.tools is not None and opted_in.tools.list_changed is True

tests/server/mcpserver/test_server.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,6 +2279,37 @@ async def touch(ctx: Context) -> str:
22792279
assert seen == [ToolsListChanged(), PromptsListChanged(), ResourcesListChanged(), ResourceUpdated(uri="r://x")]
22802280

22812281

2282+
async def test_programmatic_entry_points_carry_the_subscription_bus() -> None:
2283+
"""`ctx.notify_*` works when tools, resources, and prompts are invoked
2284+
programmatically (no wire request): the server-scoped bus rides along in
2285+
the fallback Context."""
2286+
bus = InMemorySubscriptionBus()
2287+
mcp = MCPServer(subscriptions=bus)
2288+
seen: list[ServerEvent] = []
2289+
bus.subscribe(seen.append)
2290+
2291+
@mcp.tool()
2292+
async def touch_tools(ctx: Context) -> str:
2293+
await ctx.notify_tools_changed()
2294+
return "ok"
2295+
2296+
@mcp.resource("res://{name}")
2297+
async def thing(name: str, ctx: Context) -> str:
2298+
await ctx.notify_resources_changed()
2299+
return "data"
2300+
2301+
@mcp.prompt()
2302+
async def ask(ctx: Context) -> str:
2303+
await ctx.notify_prompts_changed()
2304+
return "question"
2305+
2306+
await mcp.call_tool("touch_tools", {})
2307+
await mcp.read_resource("res://thing")
2308+
await mcp.get_prompt("ask")
2309+
2310+
assert seen == [ToolsListChanged(), ResourcesListChanged(), PromptsListChanged()]
2311+
2312+
22822313
def test_context_mcp_server_outside_request_raises() -> None:
22832314
with pytest.raises(ValueError, match="outside of a request"):
22842315
_ = Context().mcp_server

tests/server/test_streamable_http_modern.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,14 +1025,9 @@ def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], Non
10251025
return unsubscribe
10261026

10271027

1028-
async def test_json_response_mode_still_streams_subscriptions_listen() -> None:
1029-
"""SDK-defined (TypeScript/Go parity): a listen response IS a notification
1030-
stream, so `json_response=True` does not apply to it - the request takes
1031-
the SSE path, acks first, and ends with the stamped result on close()."""
1032-
bus = _OpenSignalBus()
1033-
handler = ListenHandler(bus)
1034-
server = Server("test", on_subscriptions_listen=handler)
1035-
body = {
1028+
def _listen_body() -> dict[str, Any]:
1029+
"""A minimal valid 2026-07-28 `subscriptions/listen` request body."""
1030+
return {
10361031
"jsonrpc": "2.0",
10371032
"id": 9,
10381033
"method": "subscriptions/listen",
@@ -1046,6 +1041,26 @@ async def test_json_response_mode_still_streams_subscriptions_listen() -> None:
10461041
},
10471042
}
10481043

1044+
1045+
async def test_subscriptions_listen_requires_the_sse_accept_even_in_json_mode() -> None:
1046+
"""SDK-defined: a listen response is always SSE, so a request whose Accept
1047+
lacks `text/event-stream` is rejected with 406 rather than served a content
1048+
type it never accepted - JSON-response mode included."""
1049+
server = Server("test", on_subscriptions_listen=ListenHandler(InMemorySubscriptionBus()))
1050+
async with _asgi_client(server, json_response=True, accept="application/json") as http:
1051+
response = await http.post("/mcp", json=_listen_body(), headers={MCP_METHOD_HEADER: "subscriptions/listen"})
1052+
assert response.status_code == 406
1053+
1054+
1055+
async def test_json_response_mode_still_streams_subscriptions_listen() -> None:
1056+
"""SDK-defined (TypeScript/Go parity): a listen response IS a notification
1057+
stream, so `json_response=True` does not apply to it - the request takes
1058+
the SSE path, acks first, and ends with the stamped result on close()."""
1059+
bus = _OpenSignalBus()
1060+
handler = ListenHandler(bus)
1061+
server = Server("test", on_subscriptions_listen=handler)
1062+
body = _listen_body()
1063+
10491064
responses: list[httpx.Response] = []
10501065
async with _asgi_client(server, json_response=True) as http:
10511066
async with anyio.create_task_group() as tg:

tests/server/test_subscriptions.py

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -338,14 +338,37 @@ async def run() -> None:
338338
handler.close()
339339

340340

341+
class _GatedSession(_RecordingSession):
342+
"""Lets the ack through, then wedges event sends until released - a client
343+
that stopped reading the transport."""
344+
345+
def __init__(self) -> None:
346+
super().__init__()
347+
self.wedged = anyio.Event()
348+
self.release = anyio.Event()
349+
350+
async def send_notification(
351+
self, notification: ServerNotification, related_request_id: RequestId | None = None
352+
) -> None:
353+
if self.sent: # the ack is the first frame; only event sends wedge
354+
self.wedged.set()
355+
await self.release.wait()
356+
await super().send_notification(notification, related_request_id)
357+
358+
341359
@pytest.mark.anyio
342-
async def test_backlog_overflow_ends_the_stream() -> None:
360+
async def test_backlog_overflow_ends_the_stream_and_frees_its_slot() -> None:
343361
"""SDK-defined: a stream whose client stopped reading is ended at
344-
`max_buffered_events` rather than buffering forever; the client re-listens."""
362+
`max_buffered_events` rather than buffering forever. The subscription slot
363+
frees at overflow time - the stream's own cleanup may be wedged in a
364+
transport write nothing can wake - and the backlog still drains into the
365+
stamped graceful result once that write completes."""
345366
bus = InMemorySubscriptionBus()
346-
handler = ListenHandler(bus, max_buffered_events=1)
347-
session = _RecordingSession()
367+
handler = ListenHandler(bus, max_subscriptions=1, max_buffered_events=1)
368+
session = _GatedSession()
348369
results: list[SubscriptionsListenResult] = []
370+
late_session = _RecordingSession()
371+
late_results: list[SubscriptionsListenResult] = []
349372

350373
async with anyio.create_task_group() as tg:
351374

@@ -355,11 +378,49 @@ async def run() -> None:
355378
tg.start_soon(run)
356379
await session.wait_for(1)
357380

358-
# Two publishes before the handler task resumes: the first fills the
359-
# one-slot buffer, the second overflows and ends the stream.
360-
await bus.publish(ToolsListChanged())
361-
await bus.publish(ToolsListChanged())
381+
await bus.publish(ToolsListChanged()) # consumed, then wedged mid-send
382+
with anyio.fail_after(5):
383+
await session.wedged.wait()
384+
await bus.publish(ToolsListChanged()) # fills the one-slot buffer
385+
await bus.publish(ToolsListChanged()) # overflows: the stream is ended
386+
387+
async def run_late() -> None:
388+
late_results.append(await handler(_ctx(late_session, request_id=8), _params(tools_list_changed=True)))
389+
390+
# The ended stream's slot is free immediately - a new listen does not
391+
# wait for the wedged write to die with its connection.
392+
tg.start_soon(run_late)
393+
await late_session.wait_for(1)
394+
395+
session.release.set()
396+
handler.close()
362397

363398
delivered = [notification for notification, _ in session.sent[1:]]
364-
assert len(delivered) == 1 # the buffered event still drained
365-
assert results[0].meta is not None # the stream ended with the stamped result
399+
assert len(delivered) == 2 # the wedged event and the buffered one still drained
400+
assert results[0].meta == {SUBSCRIPTION_ID_META_KEY: 7}
401+
assert late_results[0].meta == {SUBSCRIPTION_ID_META_KEY: 8}
402+
403+
404+
@pytest.mark.anyio
405+
async def test_same_task_publish_burst_does_not_overflow_a_healthy_stream() -> None:
406+
"""SDK-defined: `publish` ends with a checkpoint, so a burst of events from
407+
one task (no yields of its own) lets a reading stream drain between
408+
publishes instead of deterministically overflowing the buffer."""
409+
bus = InMemorySubscriptionBus()
410+
handler = ListenHandler(bus, max_buffered_events=99)
411+
session = _RecordingSession()
412+
413+
async with anyio.create_task_group() as tg:
414+
415+
async def run() -> None:
416+
await handler(_ctx(session), _params(tools_list_changed=True))
417+
418+
tg.start_soon(run)
419+
await session.wait_for(1)
420+
421+
for _ in range(100):
422+
await bus.publish(ToolsListChanged())
423+
await session.wait_for(101)
424+
handler.close()
425+
426+
assert len(session.sent) == 101 # the ack plus every event in the burst

0 commit comments

Comments
 (0)