Skip to content

Commit b0b398c

Browse files
committed
Buffer per-request StreamableHTTP streams to avoid serial-router head-of-line block
The serial message_router forwards each response with a blocking send into a per-request buffer-0 stream whose only consumer (sse_writer) is started lazily via nested start_soon. Under concurrent requests one not-yet-receiving consumer parks the router and head-of-line blocks every other in-flight response on the session. Give the three _request_streams[EventMessage] sites a small bounded buffer so the router can deposit and move on. The sse_stream dict streams stay at 0 (downstream of the router; buffering them would relax per-client backpressure without helping the race). Fixes #1764.
1 parent 2397319 commit b0b398c

2 files changed

Lines changed: 66 additions & 4 deletions

File tree

src/mcp/server/streamable_http.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from contextlib import asynccontextmanager
1414
from dataclasses import dataclass
1515
from http import HTTPStatus
16-
from typing import Any
16+
from typing import Any, Final
1717

1818
import anyio
1919
import pydantic_core
@@ -59,6 +59,11 @@
5959
# Special key for the standalone GET stream
6060
GET_STREAM_KEY = "_GET_stream"
6161

62+
# Buffer for the per-request `_request_streams` so the serial `message_router`
63+
# can deposit a response and move on instead of head-of-line blocking the
64+
# whole session on a lazily-started `sse_writer`. See #1764.
65+
REQUEST_STREAM_BUFFER_SIZE: Final = 16
66+
6267
# Session ID validation pattern (visible ASCII characters ranging from 0x21 to 0x7E)
6368
# Pattern ensures entire string contains only valid characters by using ^ and $ anchors
6469
SESSION_ID_PATTERN = re.compile(r"^[\x21-\x7E]+$")
@@ -524,7 +529,9 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
524529
# Extract the request ID outside the try block for proper scope
525530
request_id = str(message.id)
526531
# Register this stream for the request ID
527-
self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](0)
532+
self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](
533+
REQUEST_STREAM_BUFFER_SIZE
534+
)
528535
request_stream_reader = self._request_streams[request_id][1]
529536

530537
if self.is_json_response_enabled:
@@ -703,7 +710,9 @@ async def standalone_sse_writer():
703710
try:
704711
# Create a standalone message stream for server-initiated messages
705712

706-
self._request_streams[GET_STREAM_KEY] = anyio.create_memory_object_stream[EventMessage](0)
713+
self._request_streams[GET_STREAM_KEY] = anyio.create_memory_object_stream[EventMessage](
714+
REQUEST_STREAM_BUFFER_SIZE
715+
)
707716
standalone_stream_reader = self._request_streams[GET_STREAM_KEY][1]
708717

709718
async with sse_stream_writer, standalone_stream_reader:
@@ -893,7 +902,9 @@ async def send_event(event_message: EventMessage) -> None:
893902
await self._maybe_send_priming_event(stream_id, sse_stream_writer, replay_protocol_version)
894903

895904
# Create new request streams for this connection
896-
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](0)
905+
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](
906+
REQUEST_STREAM_BUFFER_SIZE
907+
)
897908
msg_reader = self._request_streams[stream_id][1]
898909

899910
# Forward messages to SSE
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Regression coverage for the StreamableHTTP per-session response router."""
2+
3+
import anyio
4+
import pytest
5+
6+
from mcp.server.streamable_http import (
7+
REQUEST_STREAM_BUFFER_SIZE,
8+
EventMessage,
9+
StreamableHTTPServerTransport,
10+
)
11+
from mcp.shared.message import SessionMessage
12+
from mcp.types import JSONRPCResponse
13+
14+
15+
@pytest.mark.anyio
16+
async def test_router_unconsumed_request_stream_does_not_block_siblings() -> None:
17+
"""A response whose `sse_writer` is not yet receiving must not park the router (#1764).
18+
19+
Drives the routing layer directly (the production race does not reproduce
20+
on loopback), so this pins the router semantics, not the call sites.
21+
"""
22+
transport = StreamableHTTPServerTransport(mcp_session_id="sid", is_json_response_enabled=False)
23+
streams = transport._request_streams
24+
async with transport.connect() as (_read_stream, write_stream):
25+
# Model two concurrent POSTs at the point _handle_post_request has
26+
# registered the per-request stream but A's sse_writer has not yet
27+
# reached its first receive().
28+
streams["A"] = anyio.create_memory_object_stream[EventMessage](REQUEST_STREAM_BUFFER_SIZE)
29+
streams["B"] = anyio.create_memory_object_stream[EventMessage](REQUEST_STREAM_BUFFER_SIZE)
30+
a_send, a_recv = streams["A"]
31+
b_reader = streams["B"][1]
32+
b_received = anyio.Event()
33+
34+
async def consume_b() -> None:
35+
async with b_reader:
36+
await b_reader.receive()
37+
b_received.set()
38+
39+
async def server_writes() -> None:
40+
await write_stream.send(SessionMessage(JSONRPCResponse(jsonrpc="2.0", id="A", result={})))
41+
await write_stream.send(SessionMessage(JSONRPCResponse(jsonrpc="2.0", id="B", result={})))
42+
43+
async with anyio.create_task_group() as tg:
44+
tg.start_soon(consume_b)
45+
tg.start_soon(server_writes)
46+
with anyio.fail_after(5):
47+
await b_received.wait()
48+
# A's response was buffered for its (late) consumer, not dropped.
49+
assert a_send.statistics().current_buffer_used == 1
50+
await a_recv.aclose()
51+
await a_send.aclose()

0 commit comments

Comments
 (0)