From f616d5ef26b64ab55d2c084310592dd68818a3f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=9F=BA=E9=AD=81?= <1412414664@qq.com> Date: Tue, 23 Jun 2026 23:21:21 +0800 Subject: [PATCH] fix: validate DCR redirect URIs --- src/mcp/server/auth/handlers/register.py | 30 ++++++++++++++- tests/interaction/_requirements.py | 7 ---- tests/interaction/auth/test_as_handlers.py | 43 ++++++++++++++++++---- 3 files changed, 64 insertions(+), 16 deletions(-) diff --git a/src/mcp/server/auth/handlers/register.py b/src/mcp/server/auth/handlers/register.py index 79eb0fb0c1..b44ae36cd4 100644 --- a/src/mcp/server/auth/handlers/register.py +++ b/src/mcp/server/auth/handlers/register.py @@ -4,7 +4,7 @@ from typing import Any from uuid import uuid4 -from pydantic import BaseModel, ValidationError +from pydantic import AnyUrl, BaseModel, ValidationError from starlette.requests import Request from starlette.responses import Response @@ -18,12 +18,29 @@ # provider from what we use in the HTTP handler RegistrationRequest = OAuthClientMetadata +LOOPBACK_REDIRECT_HOSTS = ("localhost", "127.0.0.1", "[::1]") + class RegistrationErrorResponse(BaseModel): error: RegistrationErrorCode error_description: str | None +def validate_redirect_uri(uri: AnyUrl) -> None: + """Validate a dynamic client registration redirect URI.""" + if uri.scheme != "https" and uri.host not in LOOPBACK_REDIRECT_HOSTS: + raise RegistrationError( + error="invalid_redirect_uri", + error_description="redirect_uris must use HTTPS or target a loopback host", + ) + + if uri.fragment: + raise RegistrationError( + error="invalid_redirect_uri", + error_description="redirect_uris must not include a fragment", + ) + + @dataclass class RegistrationHandler: provider: OAuthAuthorizationServerProvider[Any, Any, Any] @@ -35,6 +52,9 @@ async def handle(self, request: Request) -> Response: body = await request.body() client_metadata = OAuthClientMetadata.model_validate_json(body) + for redirect_uri in client_metadata.redirect_uris or (): + validate_redirect_uri(redirect_uri) + # Scope validation is handled below except ValidationError as validation_error: return PydanticJSONResponse( @@ -44,6 +64,14 @@ async def handle(self, request: Request) -> Response: ), status_code=400, ) + except RegistrationError as registration_error: + return PydanticJSONResponse( + content=RegistrationErrorResponse( + error=registration_error.error, + error_description=registration_error.error_description, + ), + status_code=400, + ) client_id = str(uuid4()) diff --git a/tests/interaction/_requirements.py b/tests/interaction/_requirements.py index 9aee73b29b..7df46fb97a 100644 --- a/tests/interaction/_requirements.py +++ b/tests/interaction/_requirements.py @@ -2713,13 +2713,6 @@ def __post_init__(self) -> None: ), transports=("streamable-http",), note="Auth is enforced at the HTTP layer; the bundled AS is an ASGI app.", - divergence=Divergence( - note=( - "Not enforced: the registration handler models redirect_uris as AnyUrl with no scheme or " - "host check, so http://evil.example/callback is accepted and registered. The spec's " - "localhost-or-HTTPS rule is left to the provider implementation." - ), - ), ), "hosting:auth:as:token-cache-headers": Requirement( source="sdk", diff --git a/tests/interaction/auth/test_as_handlers.py b/tests/interaction/auth/test_as_handlers.py index 5cb4e92d86..5b3e324e1c 100644 --- a/tests/interaction/auth/test_as_handlers.py +++ b/tests/interaction/auth/test_as_handlers.py @@ -279,22 +279,49 @@ async def test_authorize_with_an_unregistered_redirect_uri_is_rejected_directly( @requirement("hosting:auth:as:redirect-uri-scheme") -async def test_a_non_loopback_http_redirect_uri_is_accepted_at_registration( +@pytest.mark.parametrize( + "redirect_uri", + [ + "http://evil.example/callback", + "javascript:alert(1)", + "data:text/html,", + "file:///etc/passwd", + "https://client.example.com/callback#fragment", + ], +) +async def test_registration_rejects_redirect_uris_without_https_or_loopback( as_app: tuple[httpx.AsyncClient, InMemoryAuthorizationServerProvider], + redirect_uri: str, ) -> None: - """A registration carrying a non-HTTPS, non-loopback redirect URI is accepted. + """The registration endpoint rejects redirect URIs without HTTPS or a loopback host.""" + http, provider = as_app + body = oauth_client_metadata().model_dump(mode="json", exclude_none=True) + body["redirect_uris"] = [redirect_uri] - The spec requires every redirect URI to be either HTTPS or a loopback host; the bundled - registration handler does not enforce this and registers `http://evil.example/callback` - successfully. See the divergence on the requirement. - """ + response = await http.post("/register", json=body) + + assert response.status_code == 400 + assert response.json()["error"] == "invalid_redirect_uri" + assert provider.clients == {} + + +@requirement("hosting:auth:as:redirect-uri-scheme") +async def test_registration_allows_https_and_loopback_redirect_uris( + as_app: tuple[httpx.AsyncClient, InMemoryAuthorizationServerProvider], +) -> None: + """HTTPS redirect URIs and loopback HTTP redirect URIs remain valid.""" http, provider = as_app body = oauth_client_metadata().model_dump(mode="json", exclude_none=True) - body["redirect_uris"] = ["http://evil.example/callback"] + body["redirect_uris"] = [ + "https://client.example.com/callback?next=%2Fhome", + "http://localhost:3000/callback", + "http://127.0.0.1:3000/callback", + "http://[::1]:3000/callback", + ] response = await http.post("/register", json=body) assert response.status_code == 201 info = OAuthClientInformationFull.model_validate_json(response.content) - assert [str(u) for u in (info.redirect_uris or [])] == ["http://evil.example/callback"] + assert [str(u) for u in (info.redirect_uris or [])] == body["redirect_uris"] assert info.client_id in provider.clients