Skip to content

Commit 067f905

Browse files
authored
Add SSE response mode to the 2026 streamable-HTTP server entry (#3001)
1 parent c0ecb70 commit 067f905

11 files changed

Lines changed: 517 additions & 112 deletions

File tree

.github/actions/conformance/expected-failures.2026-07-28.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ client:
2828
# neither run nor evaluated on this leg.
2929

3030
server:
31-
# The stateless 2026 path now reaches handlers for plain request/response
32-
# scenarios; tools-call-with-progress still fails because the stateless
33-
# server has no channel for server→client progress notifications.
34-
- tools-call-with-progress
3531
# SEP-2322 (multi-round-trip requests / IncompleteResult): the prompt pipeline
3632
# cannot return InputRequiredResult from MCPServer yet (tools/call can).
3733
- input-required-result-non-tool-request

examples/stories/manifest.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ era = "dual-in-body"
3131
multi_connection = true
3232

3333
[story.streaming]
34-
# progress + log notifications dropped on the modern streamable-HTTP path pending SSE wiring
35-
xfail = ["http-asgi:modern"]
3634

3735
[story.mrtr]
3836
era = "modern"

examples/stories/streaming/README.md

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@ uv run python -m stories.streaming.client
1717
uv run python -m stories.streaming.client --server server_lowlevel
1818

1919
# HTTP — the client self-hosts the server on a free port, runs, then tears it
20-
# down (--legacy: see the note below)
21-
uv run python -m stories.streaming.client --http --legacy
20+
# down
21+
uv run python -m stories.streaming.client --http
2222
# same, against the lowlevel-API server variant
23-
uv run python -m stories.streaming.client --http --legacy --server server_lowlevel
23+
uv run python -m stories.streaming.client --http --server server_lowlevel
2424
```
2525

26-
The modern HTTP leg (drop `--legacy`) is `xfail` until the SSE wiring lands —
27-
mid-call progress and log notifications are currently dropped there (see
28-
Caveats).
29-
3026
## What to look at
3127

3228
- `client.py` `main` — opens with `async with Client(target, mode=mode,
@@ -60,9 +56,6 @@ Caveats).
6056
OpenTelemetry instead of `notifications/message`. It is shown here because
6157
servers still need to support 2025-era clients during that window. Progress
6258
and cancellation are **not** deprecated. TODO(maxisbey): revisit before beta.
63-
- On the modern (2026-07-28) streamable-HTTP path, mid-call progress and log
64-
notifications are currently dropped pending the SSE wiring; the
65-
`http-asgi:modern` leg of this story is `xfail` until that lands.
6659
- When a request is cancelled the server currently replies with
6760
`ErrorData(code=0, message="Request cancelled")`; the spec says it should not
6861
reply at all. The client never observes it (its awaiting task is already

src/mcp/client/streamable_http.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ async def _handle_sse_event(
156156
# Otherwise, return False to continue listening
157157
return isinstance(message, JSONRPCResponse | JSONRPCError)
158158

159-
except Exception as exc: # pragma: no cover
159+
# Forwarding to a closed read stream lands here when the caller cancels mid-SSE
160+
# (BrokenResourceError, not a parse failure); coverage is timing-dependent in the
161+
# streaming story's modern HTTP cancellation leg.
162+
except Exception as exc: # pragma: lax no cover
160163
logger.exception("Error parsing SSE message")
161164
if original_request_id is not None:
162165
error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse SSE message: {exc}")
@@ -372,7 +375,7 @@ async def _handle_sse_response(
372375
await response.aclose()
373376
return # Normal completion, no reconnect needed
374377
except Exception:
375-
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
378+
logger.debug("SSE stream ended", exc_info=True) # pragma: lax no cover
376379

377380
# Stream ended without response - reconnect if we received an event with ID
378381
if last_event_id is not None: # pragma: no branch

src/mcp/server/_streamable_http_modern.py

Lines changed: 123 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,15 @@
55
path for earlier protocol revisions.
66
77
A 2026-07-28 request is a self-contained POST: no `initialize` handshake, no
8-
`Mcp-Session-Id`, one JSON-RPC request in, one JSON-RPC response out. This
9-
module handles such a request directly in the ASGI task - no memory streams,
10-
no per-request task group, no `JSONRPCDispatcher`.
8+
`Mcp-Session-Id`, one JSON-RPC request in, one JSON-RPC response out. JSON
9+
mode handles the request directly in the ASGI task. SSE mode runs the handler
10+
as a sibling task and defers committing to `text/event-stream` until the
11+
handler emits a notification or `_SSE_PING_INTERVAL` elapses, whichever
12+
comes first: a handler that completes (or raises) within that window without
13+
emitting still gets a JSON response with the table-mapped HTTP status, so
14+
the spec's `404`/`400` MUSTs hold for kernel-dispatch errors; a handler that
15+
runs silent past the window commits SSE so the keepalive ping can keep the
16+
connection open behind a proxy idle-read timeout.
1117
"""
1218

1319
from __future__ import annotations
@@ -16,9 +22,10 @@
1622
import logging
1723
from collections.abc import Awaitable, Mapping
1824
from dataclasses import dataclass, field
19-
from typing import TYPE_CHECKING, Any, TypeVar
25+
from typing import TYPE_CHECKING, Any, Final, TypeVar
2026

2127
import anyio
28+
from anyio.streams.memory import MemoryObjectSendStream
2229
from mcp_types import (
2330
INTERNAL_ERROR,
2431
INVALID_REQUEST,
@@ -27,8 +34,10 @@
2734
ErrorData,
2835
Implementation,
2936
JSONRPCError,
37+
JSONRPCNotification,
3038
JSONRPCRequest,
3139
JSONRPCResponse,
40+
ProgressToken,
3241
RequestId,
3342
)
3443
from pydantic import BaseModel, ValidationError
@@ -38,6 +47,7 @@
3847

3948
from mcp.server.connection import Connection
4049
from mcp.server.runner import serve_one
50+
from mcp.server.streamable_http import check_accept_headers
4151
from mcp.server.transport_security import TransportSecurityMiddleware, TransportSecuritySettings
4252
from mcp.shared.dispatcher import CallOptions
4353
from mcp.shared.exceptions import NoBackChannelError
@@ -46,7 +56,7 @@
4656
InboundLadderRejection,
4757
classify_inbound_request,
4858
)
49-
from mcp.shared.jsonrpc_dispatcher import handler_exception_to_error_data
59+
from mcp.shared.jsonrpc_dispatcher import handler_exception_to_error_data, progress_token_from_params
5060
from mcp.shared.message import MessageMetadata, ServerMessageMetadata
5161
from mcp.shared.transport_context import TransportContext
5262

@@ -66,12 +76,15 @@ class _SingleExchangeDispatchContext:
6676
6777
Structurally satisfies `mcp.shared.dispatcher.DispatchContext`. The
6878
back-channel is closed by construction: a 2026-07-28 server cannot send
69-
requests to the client.
79+
requests to the client. The SSE sink, when present, carries request-scoped
80+
notifications onto this request's response stream.
7081
"""
7182

7283
transport: TransportContext
7384
request_id: RequestId
7485
message_metadata: MessageMetadata
86+
progress_token: ProgressToken | None = None
87+
sink: MemoryObjectSendStream[bytes] | None = None
7588
cancel_requested: anyio.Event = field(default_factory=anyio.Event)
7689
can_send_request: bool = field(default=False, init=False)
7790

@@ -84,12 +97,23 @@ async def send_raw_request(
8497
raise NoBackChannelError(method)
8598

8699
async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
87-
# TODO(D-005a): buffer and stream as SSE once the JSON-vs-SSE response mode lands.
88-
return None
100+
if self.sink is None:
101+
return
102+
body = dict(params) if params is not None else None
103+
try:
104+
await self.sink.send(_sse_event(JSONRPCNotification(jsonrpc="2.0", method=method, params=body)))
105+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
106+
logger.debug("dropped %s: response stream closed", method)
89107

90108
async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
91-
# TODO(D-005a): no progressToken plumbing yet; ships with the SSE response mode.
92-
return None
109+
if self.progress_token is None:
110+
return
111+
params: dict[str, Any] = {"progressToken": self.progress_token, "progress": progress}
112+
if total is not None:
113+
params["total"] = total
114+
if message is not None:
115+
params["message"] = message
116+
await self.notify("notifications/progress", params)
93117

94118

95119
def _typed(model: type[_ModelT], raw: Any) -> _ModelT | None:
@@ -126,6 +150,28 @@ async def _to_jsonrpc_response(
126150
return JSONRPCResponse(jsonrpc="2.0", id=request_id, result=result)
127151

128152

153+
_SSE_PING_INTERVAL: float = 15.0
154+
"""Seconds between SSE comment-line keepalives once `text/event-stream` has committed."""
155+
156+
_SSE_HEADERS: Final[list[tuple[bytes, bytes]]] = [
157+
(b"content-type", b"text/event-stream"),
158+
(b"cache-control", b"no-cache, no-transform"),
159+
(b"connection", b"keep-alive"),
160+
(b"x-accel-buffering", b"no"),
161+
]
162+
163+
164+
def _sse_event(msg: JSONRPCResponse | JSONRPCError | JSONRPCNotification) -> bytes:
165+
"""Serialise a JSON-RPC message as one SSE `event: message` frame.
166+
167+
SSE mode begins after the handler has emitted, so a `JSONRPCError` here
168+
always carries the request's id; the `id: null` case lives in `_write`.
169+
"""
170+
body = msg.model_dump(mode="json", by_alias=True, exclude_none=True)
171+
data = json.dumps(body, separators=(",", ":"))
172+
return f"event: message\r\ndata: {data}\r\n\r\n".encode()
173+
174+
129175
async def _write(
130176
msg: JSONRPCResponse | JSONRPCError,
131177
scope: Scope,
@@ -149,6 +195,7 @@ async def _write(
149195
async def handle_modern_request(
150196
app: Server[Any],
151197
security_settings: TransportSecuritySettings | None,
198+
json_response: bool,
152199
lifespan_state: Any,
153200
scope: Scope,
154201
receive: Receive,
@@ -169,14 +216,17 @@ async def handle_modern_request(
169216
await err(scope, receive, send)
170217
return
171218

172-
# TODO(D-005a): validate Accept once the JSON-vs-SSE response mode is settled.
173-
174219
if request.method != "POST":
175220
# HTTP-layer rejection (Allow accompanies 405 per RFC 9110) — happens
176221
# before JSON-RPC parsing, so it doesn't go through `_write`.
177222
await Response(status_code=405, headers={"Allow": "POST"})(scope, receive, send)
178223
return
179224

225+
has_json, has_sse = check_accept_headers(request)
226+
if not has_json or (not json_response and not has_sse):
227+
await Response(status_code=406)(scope, receive, send)
228+
return
229+
180230
body = await request.body()
181231
try:
182232
decoded = json.loads(body)
@@ -219,8 +269,65 @@ async def handle_modern_request(
219269
transport=TransportContext(kind="streamable-http", can_send_request=False, headers=request.headers),
220270
request_id=req.id,
221271
message_metadata=ServerMessageMetadata(request_context=request),
272+
progress_token=progress_token_from_params(req.params),
222273
)
223-
msg = await _to_jsonrpc_response(
224-
req.id, serve_one(app, dctx, req.method, req.params, connection=connection, lifespan_state=lifespan_state)
225-
)
226-
await _write(msg, scope, receive, send)
274+
275+
if json_response:
276+
msg = await _to_jsonrpc_response(
277+
req.id, serve_one(app, dctx, req.method, req.params, connection=connection, lifespan_state=lifespan_state)
278+
)
279+
await _write(msg, scope, receive, send)
280+
return
281+
282+
send_ch, recv_ch = anyio.create_memory_object_stream[bytes](0)
283+
dctx.sink = send_ch
284+
result: list[JSONRPCResponse | JSONRPCError] = []
285+
286+
async def run_handler() -> None:
287+
async with send_ch:
288+
result.append(
289+
await _to_jsonrpc_response(
290+
req.id,
291+
serve_one(app, dctx, req.method, req.params, connection=connection, lifespan_state=lifespan_state),
292+
)
293+
)
294+
295+
async def watch_disconnect(cancel_scope: anyio.CancelScope) -> None:
296+
while (await receive()).get("type") != "http.disconnect":
297+
pass # pragma: no cover
298+
cancel_scope.cancel()
299+
300+
async with recv_ch, anyio.create_task_group() as tg:
301+
tg.start_soon(run_handler)
302+
tg.start_soon(watch_disconnect, tg.cancel_scope)
303+
304+
event: bytes | None = None
305+
done = False
306+
with anyio.move_on_after(_SSE_PING_INTERVAL):
307+
try:
308+
event = await recv_ch.receive()
309+
except anyio.EndOfStream:
310+
done = True
311+
312+
if done:
313+
# Handler completed within the deferral window without emitting:
314+
# `application/json` with the table-mapped status. Kernel-dispatch
315+
# errors (METHOD_NOT_FOUND, missing-capability, INVALID_PARAMS)
316+
# resolve here in practice.
317+
await _write(result[0], scope, receive, send)
318+
else:
319+
# First notification arrived, or the deferral window elapsed: commit
320+
# `text/event-stream` and start pinging so a proxy idle-read timeout
321+
# cannot close the stream (which on this path cancels the handler).
322+
await send({"type": "http.response.start", "status": _OK_STATUS, "headers": _SSE_HEADERS})
323+
while not done:
324+
await send({"type": "http.response.body", "body": event or b": ping\r\n\r\n", "more_body": True})
325+
event = None
326+
with anyio.move_on_after(_SSE_PING_INTERVAL):
327+
try:
328+
event = await recv_ch.receive()
329+
except anyio.EndOfStream:
330+
done = True
331+
await send({"type": "http.response.body", "body": _sse_event(result[0]), "more_body": False})
332+
333+
tg.cancel_scope.cancel()

src/mcp/server/streamable_http.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,24 @@
7676
SSEEvent = dict[str, Any]
7777

7878

79+
def check_accept_headers(request: Request) -> tuple[bool, bool]:
80+
"""Return (has_json, has_sse) for the request's Accept header, with RFC 7231 wildcard handling.
81+
82+
Supports wildcard media types per RFC 7231, section 5.3.2:
83+
- */* matches any media type
84+
- application/* matches any application/ subtype
85+
- text/* matches any text/ subtype
86+
"""
87+
accept_header = request.headers.get("accept", "")
88+
accept_types = [media_type.strip().split(";")[0].strip().lower() for media_type in accept_header.split(",")]
89+
90+
has_wildcard = "*/*" in accept_types
91+
has_json = has_wildcard or any(t in (CONTENT_TYPE_JSON, "application/*") for t in accept_types)
92+
has_sse = has_wildcard or any(t in (CONTENT_TYPE_SSE, "text/*") for t in accept_types)
93+
94+
return has_json, has_sse
95+
96+
7997
@dataclass
8098
class EventMessage:
8199
"""A JSONRPCMessage with an optional event ID for stream resumability."""
@@ -415,23 +433,6 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
415433
else:
416434
await self._handle_unsupported_request(request, send)
417435

418-
def _check_accept_headers(self, request: Request) -> tuple[bool, bool]:
419-
"""Check if the request accepts the required media types.
420-
421-
Supports wildcard media types per RFC 7231, section 5.3.2:
422-
- */* matches any media type
423-
- application/* matches any application/ subtype
424-
- text/* matches any text/ subtype
425-
"""
426-
accept_header = request.headers.get("accept", "")
427-
accept_types = [media_type.strip().split(";")[0].strip().lower() for media_type in accept_header.split(",")]
428-
429-
has_wildcard = "*/*" in accept_types
430-
has_json = has_wildcard or any(t in (CONTENT_TYPE_JSON, "application/*") for t in accept_types)
431-
has_sse = has_wildcard or any(t in (CONTENT_TYPE_SSE, "text/*") for t in accept_types)
432-
433-
return has_json, has_sse
434-
435436
def _check_content_type(self, request: Request) -> bool:
436437
"""Check if the request has the correct Content-Type."""
437438
content_type = request.headers.get("content-type", "")
@@ -441,7 +442,7 @@ def _check_content_type(self, request: Request) -> bool:
441442

442443
async def _validate_accept_header(self, request: Request, scope: Scope, send: Send) -> bool:
443444
"""Validate Accept header based on response mode. Returns True if valid."""
444-
has_json, has_sse = self._check_accept_headers(request)
445+
has_json, has_sse = check_accept_headers(request)
445446
if self.is_json_response_enabled:
446447
# For JSON-only responses, only require application/json
447448
if not has_json:
@@ -661,7 +662,7 @@ async def _handle_get_request(self, request: Request, send: Send) -> None:
661662
raise ValueError("No read stream writer available. Ensure connect() is called first.")
662663

663664
# Validate Accept header - must include text/event-stream
664-
_, has_sse = self._check_accept_headers(request)
665+
_, has_sse = check_accept_headers(request)
665666

666667
if not has_sse:
667668
response = self._create_error_response(

src/mcp/server/streamable_http_manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
170170
header = MCP_PROTOCOL_VERSION_HEADER.encode("ascii")
171171
pv = next((v.decode("latin-1") for k, v in scope["headers"] if k == header), None)
172172
if pv is not None and pv not in HANDSHAKE_PROTOCOL_VERSIONS:
173-
await handle_modern_request(self.app, self.security_settings, self._lifespan_state, scope, receive, send)
173+
await handle_modern_request(
174+
self.app, self.security_settings, self.json_response, self._lifespan_state, scope, receive, send
175+
)
174176
return
175177

176178
# Dispatch to the appropriate handler

0 commit comments

Comments
 (0)