|
5 | 5 | including token verification, middleware, and scoped tool decorators. |
6 | 6 | """ |
7 | 7 |
|
| 8 | +import logging |
| 9 | +import os |
| 10 | +from typing import Callable, Union |
| 11 | + |
8 | 12 | from mcp.server.auth.routes import create_protected_resource_routes |
9 | 13 | from mcp.server.fastmcp import FastMCP |
10 | 14 | from starlette.middleware import Middleware |
| 15 | +from starlette.requests import Request |
| 16 | +from starlette.responses import JSONResponse |
11 | 17 | from starlette.routing import Route, Router |
12 | 18 |
|
| 19 | +from .errors import AuthenticationRequired, InsufficientScope, MalformedAuthorizationRequest |
13 | 20 | from .middleware import Auth0Middleware |
14 | 21 |
|
| 22 | +logger = logging.getLogger(__name__) |
| 23 | + |
15 | 24 |
|
16 | 25 | class Auth0Mcp: |
17 | 26 | def __init__(self, name: str, audience: str, domain: str): |
@@ -56,3 +65,55 @@ def register_scopes(self, scopes: list[str]) -> None: |
56 | 65 | """ |
57 | 66 | if scopes: |
58 | 67 | self._scopes_supported.update(scopes) |
| 68 | + |
| 69 | + def exception_handlers(self) -> dict[Union[int, type[Exception]], Callable]: |
| 70 | + return { |
| 71 | + AuthenticationRequired: self._auth_error_handler, |
| 72 | + InsufficientScope: self._auth_error_handler, |
| 73 | + MalformedAuthorizationRequest: self._auth_error_handler, |
| 74 | + # Generic fallback for any other exceptions |
| 75 | + Exception: self._generic_exception_handler, |
| 76 | + } |
| 77 | + |
| 78 | + def _auth_error_handler(self, request: Request, exc: Exception): |
| 79 | + """ |
| 80 | + Handle auth errors: malformed authorization requests, missing auth, invalid tokens, and insufficient scopes. |
| 81 | + """ |
| 82 | + # Include resource metadata parameter for 401 responses per RFC 9728 Section 5.1 |
| 83 | + include_resource_metadata = exc.status_code == 401 |
| 84 | + |
| 85 | + return JSONResponse( |
| 86 | + { |
| 87 | + "error": exc.error_code, |
| 88 | + "error_description": exc.description |
| 89 | + }, |
| 90 | + status_code=exc.status_code, |
| 91 | + headers={"WWW-Authenticate": self._build_www_authenticate_header(exc.error_code, exc.description, include_resource_metadata)}, |
| 92 | + ) |
| 93 | + |
| 94 | + def _generic_exception_handler(self, request:Request, exc: Exception): |
| 95 | + """ |
| 96 | + Fallback handler for all other exceptions. |
| 97 | + """ |
| 98 | + logger.error(f"Unexpected error in: {exc}", exc_info=exc) |
| 99 | + |
| 100 | + # Return standard HTTP 500 error |
| 101 | + return JSONResponse( |
| 102 | + { |
| 103 | + "error": "internal_server_error", |
| 104 | + "error_description": "An unexpected error occurred" |
| 105 | + }, |
| 106 | + status_code=500, |
| 107 | + ) |
| 108 | + |
| 109 | + def _build_www_authenticate_header(self, error_code: str, description: str, include_resource_metadata: bool = False) -> str: |
| 110 | + """ |
| 111 | + Build WWW-Authenticate header according to RFC 9728 Section 5.1. |
| 112 | + """ |
| 113 | + www_auth_params = [f'error="{error_code}"', f'error_description="{description}"'] |
| 114 | + |
| 115 | + if include_resource_metadata: |
| 116 | + metadata_url = f"{os.getenv('MCP_SERVER_URL')}/.well-known/oauth-protected-resource" |
| 117 | + www_auth_params.append(f'resource_metadata="{metadata_url}"') |
| 118 | + |
| 119 | + return f"Bearer {', '.join(www_auth_params)}" |
0 commit comments