|
5 | 5 | pure kernel: it holds a pre-populated `Connection` and reads |
6 | 6 | `connection.protocol_version` / `connection.outbound` as facts. Driving a |
7 | 7 | dispatcher loop and tearing down the connection live in the free-function |
8 | | -drivers (`serve_connection`, `serve_loop`, `serve_one`); the entry constructs |
9 | | -the `Connection`, the driver tears it down. |
| 8 | +drivers (`serve_connection`, `serve_loop`, `serve_dual_era_loop`, `serve_one`); |
| 9 | +the entry constructs the `Connection`, the driver tears it down. |
10 | 10 |
|
11 | 11 | `ServerRunner` holds a `Server` directly - `Server` is the registry. |
12 | 12 | """ |
|
17 | 17 | from collections.abc import Awaitable, Mapping |
18 | 18 | from dataclasses import KW_ONLY, dataclass |
19 | 19 | from functools import cached_property, partial |
20 | | -from typing import TYPE_CHECKING, Any, Generic, cast |
| 20 | +from typing import TYPE_CHECKING, Any, Generic, Literal, cast |
21 | 21 |
|
22 | 22 | import anyio |
23 | 23 | import anyio.abc |
|
26 | 26 | CLIENT_INFO_META_KEY, |
27 | 27 | INTERNAL_ERROR, |
28 | 28 | INVALID_PARAMS, |
| 29 | + INVALID_REQUEST, |
29 | 30 | METHOD_NOT_FOUND, |
30 | 31 | PROTOCOL_VERSION_META_KEY, |
| 32 | + UNSUPPORTED_PROTOCOL_VERSION, |
31 | 33 | CacheableResult, |
32 | 34 | ErrorData, |
33 | 35 | Implementation, |
34 | 36 | InitializeRequestParams, |
35 | 37 | InitializeResult, |
| 38 | + RequestId, |
36 | 39 | RequestParams, |
37 | 40 | RequestParamsMeta, |
| 41 | + UnsupportedProtocolVersionErrorData, |
38 | 42 | ) |
39 | 43 | from mcp_types import methods as _methods |
40 | | -from mcp_types.version import HANDSHAKE_PROTOCOL_VERSIONS, LATEST_HANDSHAKE_VERSION, LATEST_MODERN_VERSION |
| 44 | +from mcp_types.version import ( |
| 45 | + HANDSHAKE_PROTOCOL_VERSIONS, |
| 46 | + LATEST_HANDSHAKE_VERSION, |
| 47 | + LATEST_MODERN_VERSION, |
| 48 | + MODERN_PROTOCOL_VERSIONS, |
| 49 | +) |
41 | 50 | from pydantic import BaseModel, ValidationError |
42 | 51 | from typing_extensions import TypeVar |
43 | 52 |
|
44 | 53 | from mcp.server.caching import apply_cache_hint |
45 | | -from mcp.server.connection import Connection |
| 54 | +from mcp.server.connection import Connection, NotifyOnlyOutbound |
46 | 55 | from mcp.server.context import CallNext, HandlerResult, ServerMiddleware, ServerRequestContext |
47 | 56 | from mcp.server.models import InitializationOptions |
48 | 57 | from mcp.server.session import ServerSession |
49 | 58 | from mcp.shared._stream_protocols import ReadStream, WriteStream |
50 | | -from mcp.shared.dispatcher import DispatchContext, Dispatcher, OnNotify, OnRequest |
51 | | -from mcp.shared.exceptions import MCPError |
| 59 | +from mcp.shared.dispatcher import CallOptions, DispatchContext, Dispatcher, OnNotify, OnRequest |
| 60 | +from mcp.shared.exceptions import MCPError, NoBackChannelError |
| 61 | +from mcp.shared.inbound import InboundLadderRejection, classify_inbound_request |
52 | 62 | from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher |
53 | | -from mcp.shared.message import ServerMessageMetadata, SessionMessage |
| 63 | +from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage |
54 | 64 | from mcp.shared.transport_context import TransportContext |
55 | 65 |
|
56 | 66 | if TYPE_CHECKING: |
|
63 | 73 | "aclose_shielded", |
64 | 74 | "modern_on_request", |
65 | 75 | "serve_connection", |
| 76 | + "serve_dual_era_loop", |
66 | 77 | "serve_loop", |
67 | 78 | "serve_one", |
68 | 79 | ] |
@@ -427,6 +438,206 @@ async def serve_loop( |
427 | 438 | ) |
428 | 439 |
|
429 | 440 |
|
| 441 | +_MODERN_ENVELOPE_KEYS = (PROTOCOL_VERSION_META_KEY, CLIENT_INFO_META_KEY, CLIENT_CAPABILITIES_META_KEY) |
| 442 | + |
| 443 | + |
| 444 | +def _has_modern_envelope(params: Mapping[str, Any] | None) -> bool: |
| 445 | + """Whether `params._meta` carries every reserved modern-envelope key. |
| 446 | +
|
| 447 | + Era evidence is the FULL key triple - bare `_meta` is not (legacy traffic |
| 448 | + carries `progressToken` there). |
| 449 | + """ |
| 450 | + if not params: |
| 451 | + return False |
| 452 | + meta = params.get("_meta") |
| 453 | + return isinstance(meta, Mapping) and all(key in meta for key in _MODERN_ENVELOPE_KEYS) |
| 454 | + |
| 455 | + |
| 456 | +def _initialize_after_modern_data(params: Mapping[str, Any] | None) -> dict[str, Any]: |
| 457 | + """Error data for an `initialize` arriving on a modern-locked connection. |
| 458 | +
|
| 459 | + The typed -32022 payload when the client's proposed version is parseable; |
| 460 | + otherwise just the supported list (the point is naming what we serve). |
| 461 | + """ |
| 462 | + requested = (params or {}).get("protocolVersion") |
| 463 | + if isinstance(requested, str): |
| 464 | + return UnsupportedProtocolVersionErrorData( |
| 465 | + supported=list(MODERN_PROTOCOL_VERSIONS), requested=requested |
| 466 | + ).model_dump(mode="json") |
| 467 | + return {"supported": list(MODERN_PROTOCOL_VERSIONS)} |
| 468 | + |
| 469 | + |
| 470 | +@dataclass |
| 471 | +class _NoServerRequestsDispatchContext: |
| 472 | + """Delegating `DispatchContext` that refuses server-initiated requests. |
| 473 | +
|
| 474 | + Wraps the loop dispatcher's per-message context for modern-era dispatch: |
| 475 | + the modern protocol forbids server-initiated JSON-RPC requests, so |
| 476 | + `send_raw_request` refuses while notifications and progress still ride |
| 477 | + the duplex pipe. |
| 478 | + """ |
| 479 | + |
| 480 | + _inner: DispatchContext[TransportContext] |
| 481 | + |
| 482 | + @property |
| 483 | + def transport(self) -> TransportContext: |
| 484 | + return self._inner.transport |
| 485 | + |
| 486 | + @property |
| 487 | + def can_send_request(self) -> bool: |
| 488 | + return False |
| 489 | + |
| 490 | + @property |
| 491 | + def request_id(self) -> RequestId | None: |
| 492 | + return self._inner.request_id |
| 493 | + |
| 494 | + @property |
| 495 | + def message_metadata(self) -> MessageMetadata: |
| 496 | + return self._inner.message_metadata |
| 497 | + |
| 498 | + @property |
| 499 | + def cancel_requested(self) -> anyio.Event: |
| 500 | + return self._inner.cancel_requested |
| 501 | + |
| 502 | + async def send_raw_request( |
| 503 | + self, |
| 504 | + method: str, |
| 505 | + params: Mapping[str, Any] | None, |
| 506 | + opts: CallOptions | None = None, |
| 507 | + ) -> dict[str, Any]: |
| 508 | + raise NoBackChannelError(method) |
| 509 | + |
| 510 | + async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None: |
| 511 | + await self._inner.notify(method, params, opts) |
| 512 | + |
| 513 | + async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None: |
| 514 | + await self._inner.progress(progress, total, message) |
| 515 | + |
| 516 | + |
| 517 | +async def serve_dual_era_loop( |
| 518 | + server: Server[LifespanT], |
| 519 | + read_stream: ReadStream[SessionMessage | Exception], |
| 520 | + write_stream: WriteStream[SessionMessage], |
| 521 | + *, |
| 522 | + lifespan_state: LifespanT, |
| 523 | + session_id: str | None = None, |
| 524 | + init_options: InitializationOptions | None = None, |
| 525 | + raise_exceptions: bool = False, |
| 526 | +) -> None: |
| 527 | + """Drive `server` over a duplex stream pair, serving both protocol eras. |
| 528 | +
|
| 529 | + The stream-pair counterpart of the modern HTTP entry's era router. Era is |
| 530 | + a property of the connection, decided by how the client opens it, and |
| 531 | + mid-stream switching is undefined - so the first era-distinctive message |
| 532 | + locks the connection (matching the typescript-sdk): |
| 533 | +
|
| 534 | + - `initialize` locks legacy: the connection behaves exactly like |
| 535 | + `serve_loop` for its lifetime, and modern envelope traffic is rejected |
| 536 | + with INVALID_REQUEST. |
| 537 | + - A request carrying the modern `_meta` envelope triple - or |
| 538 | + `server/discover`, a modern-only method - locks modern: every request is |
| 539 | + classified (`classify_inbound_request`) and served single-exchange via |
| 540 | + `serve_one` with a born-ready per-request `Connection`, the same |
| 541 | + dispatch model as the modern HTTP entry. A later `initialize` is |
| 542 | + rejected with UNSUPPORTED_PROTOCOL_VERSION naming the modern versions. |
| 543 | +
|
| 544 | + Modern connections push notifications over the duplex pipe but refuse |
| 545 | + server-initiated requests on both channels (the modern protocol forbids |
| 546 | + them). A rejected classification (malformed envelope, unsupported version) |
| 547 | + never locks the era, so a failed probe leaves the legacy handshake |
| 548 | + available - released auto-negotiating clients fall back on any error code |
| 549 | + except -32022. |
| 550 | + """ |
| 551 | + dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher( |
| 552 | + read_stream, |
| 553 | + write_stream, |
| 554 | + raise_handler_exceptions=raise_exceptions, |
| 555 | + # `initialize` inline for the same pipelining reason as `serve_loop`; |
| 556 | + # `server/discover` inline so the modern era lock commits before the |
| 557 | + # next pipelined message is read. |
| 558 | + inline_methods=frozenset({"initialize", "server/discover"}), |
| 559 | + ) |
| 560 | + loop_connection = Connection.for_loop(dispatcher, session_id=session_id) |
| 561 | + loop_runner = ServerRunner(server, loop_connection, lifespan_state, init_options=init_options) |
| 562 | + standalone_outbound = NotifyOnlyOutbound(dispatcher) |
| 563 | + era: Literal["unlocked", "legacy", "modern"] = "unlocked" |
| 564 | + modern_version = LATEST_MODERN_VERSION |
| 565 | + |
| 566 | + async def serve_modern( |
| 567 | + dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None |
| 568 | + ) -> dict[str, Any]: |
| 569 | + nonlocal era, modern_version |
| 570 | + route = classify_inbound_request({"method": method, "params": params}) |
| 571 | + if isinstance(route, InboundLadderRejection): |
| 572 | + raise MCPError(code=route.code, message=route.message, data=route.data) |
| 573 | + if era != "modern": |
| 574 | + era, modern_version = "modern", route.protocol_version |
| 575 | + if method == "subscriptions/listen": |
| 576 | + # The registered listen handler assumes the HTTP entry's stream |
| 577 | + # semantics; served over a stream pair it would wedge. Reject until |
| 578 | + # this transport grows its own listen design. |
| 579 | + raise MCPError( |
| 580 | + code=METHOD_NOT_FOUND, message="subscriptions/listen is not served over this transport", data=method |
| 581 | + ) |
| 582 | + connection = Connection.from_envelope( |
| 583 | + route.protocol_version, |
| 584 | + route.client_info, |
| 585 | + route.client_capabilities, |
| 586 | + outbound=standalone_outbound, |
| 587 | + ) |
| 588 | + return await serve_one( |
| 589 | + server, |
| 590 | + _NoServerRequestsDispatchContext(dctx), |
| 591 | + method, |
| 592 | + params, |
| 593 | + connection=connection, |
| 594 | + lifespan_state=lifespan_state, |
| 595 | + ) |
| 596 | + |
| 597 | + async def on_request( |
| 598 | + dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None |
| 599 | + ) -> dict[str, Any]: |
| 600 | + nonlocal era |
| 601 | + if era == "legacy": |
| 602 | + if method == "server/discover" or _has_modern_envelope(params): |
| 603 | + raise MCPError( |
| 604 | + code=INVALID_REQUEST, |
| 605 | + message="connection is locked to the legacy handshake era; " |
| 606 | + "modern envelope requests are not accepted", |
| 607 | + ) |
| 608 | + return await loop_runner.on_request(dctx, method, params) |
| 609 | + if era == "modern" and method == "initialize": |
| 610 | + raise MCPError( |
| 611 | + code=UNSUPPORTED_PROTOCOL_VERSION, |
| 612 | + message="connection already negotiated a modern protocol version", |
| 613 | + data=_initialize_after_modern_data(params), |
| 614 | + ) |
| 615 | + if era == "modern" or method == "server/discover" or _has_modern_envelope(params): |
| 616 | + return await serve_modern(dctx, method, params) |
| 617 | + result = await loop_runner.on_request(dctx, method, params) |
| 618 | + if method == "initialize": |
| 619 | + # Lock only on success: a failed handshake leaves both eras open. |
| 620 | + era = "legacy" |
| 621 | + return result |
| 622 | + |
| 623 | + async def on_notify(dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None) -> None: |
| 624 | + if era != "modern": |
| 625 | + return await loop_runner.on_notify(dctx, method, params) |
| 626 | + # The envelope is request-only, so notifications inherit the |
| 627 | + # connection's locked version. |
| 628 | + connection = Connection.from_envelope(modern_version, None, None, outbound=standalone_outbound) |
| 629 | + notify_runner = ServerRunner(server, connection, lifespan_state) |
| 630 | + try: |
| 631 | + await notify_runner.on_notify(_NoServerRequestsDispatchContext(dctx), method, params) |
| 632 | + finally: |
| 633 | + await aclose_shielded(connection) |
| 634 | + |
| 635 | + try: |
| 636 | + await dispatcher.run(on_request, on_notify) |
| 637 | + finally: |
| 638 | + await aclose_shielded(loop_connection) |
| 639 | + |
| 640 | + |
430 | 641 | async def serve_one( |
431 | 642 | server: Server[LifespanT], |
432 | 643 | dctx: DispatchContext[TransportContext], |
|
0 commit comments