Skip to content

Commit 41b9e54

Browse files
author
冯基魁
committed
fix: terminate streamable http sessions on shutdown
1 parent a527142 commit 41b9e54

3 files changed

Lines changed: 41 additions & 0 deletions

File tree

src/mcp/server/streamable_http.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,10 +774,17 @@ async def terminate(self) -> None:
774774
775775
Once terminated, all requests with this session ID will receive 404 Not Found.
776776
"""
777+
if self._terminated:
778+
return
777779

778780
self._terminated = True
779781
logger.info(f"Terminating session: {self.mcp_session_id}")
780782

783+
sse_stream_writers = list(self._sse_stream_writers.values())
784+
for writer in sse_stream_writers:
785+
writer.close()
786+
self._sse_stream_writers.clear()
787+
781788
# We need a copy of the keys to avoid modification during iteration
782789
request_stream_keys = list(self._request_streams.keys())
783790

src/mcp/server/streamable_http_manager.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
145145
yield # Let the application run
146146
finally:
147147
logger.info("StreamableHTTP session manager shutting down")
148+
for transport in list(self._server_instances.values()):
149+
try:
150+
await transport.terminate()
151+
except Exception: # pragma: no cover
152+
logger.debug("Error terminating StreamableHTTP transport during shutdown", exc_info=True)
148153
# Cancel task group to stop all spawned tasks
149154
tg.cancel_scope.cancel()
150155
self._task_group = None

tests/server/test_streamable_http_manager.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,19 @@ async def try_run():
6464
assert "StreamableHTTPSessionManager .run() can only be called once per instance" in str(errors[0])
6565

6666

67+
@pytest.mark.anyio
68+
async def test_run_terminates_active_transports_before_shutdown():
69+
app = Server("test-server")
70+
manager = StreamableHTTPSessionManager(app=app)
71+
transport = AsyncMock()
72+
73+
async with manager.run():
74+
manager._server_instances["session-id"] = transport
75+
76+
transport.terminate.assert_awaited_once_with()
77+
assert not manager._server_instances
78+
79+
6780
@pytest.mark.anyio
6881
async def test_handle_request_without_run_raises_error():
6982
"""Test that handle_request raises error if run() hasn't been called."""
@@ -269,6 +282,22 @@ async def mock_receive():
269282
assert len(transport._request_streams) == 0, "Transport should have no active request streams"
270283

271284

285+
@pytest.mark.anyio
286+
async def test_transport_terminate_closes_active_sse_writers():
287+
transport = StreamableHTTPServerTransport(mcp_session_id="session-id")
288+
writer, reader = anyio.create_memory_object_stream[dict[str, str]](1)
289+
transport._sse_stream_writers["request-id"] = writer
290+
291+
await transport.terminate()
292+
293+
assert transport.is_terminated
294+
assert not transport._sse_stream_writers
295+
with pytest.raises(anyio.ClosedResourceError):
296+
writer.send_nowait({"event": "message", "data": "{}"})
297+
298+
await reader.aclose()
299+
300+
272301
@pytest.mark.anyio
273302
async def test_unknown_session_id_returns_404(caplog: pytest.LogCaptureFixture):
274303
"""Test that requests with unknown session IDs return HTTP 404 per MCP spec."""

0 commit comments

Comments
 (0)