"""OAuth 2.0 client credentials auth for MCP server.""" import hashlib import json import logging import secrets import time from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from .config import load_credentials logger = logging.getLogger(__name__) # In-memory token store: token_hash -> expiry timestamp _active_tokens: dict[str, float] = {} TOKEN_LIFETIME = 3600 # 1 hour def _get_oauth_credentials() -> tuple[str, str]: """Load OAuth client_id and client_secret from credentials file.""" creds = load_credentials() client_id = creds.get("OAUTH_CLIENT_ID", "") client_secret = creds.get("OAUTH_CLIENT_SECRET", "") if not client_id or not client_secret: raise RuntimeError("OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET must be set in credentials") return client_id, client_secret def _hash_token(token: str) -> str: return hashlib.sha256(token.encode()).hexdigest() def _cleanup_expired(): """Remove expired tokens.""" now = time.time() expired = [h for h, exp in _active_tokens.items() if exp < now] for h in expired: del _active_tokens[h] def validate_bearer_token(token: str) -> bool: """Check if a bearer token is valid and not expired.""" _cleanup_expired() token_hash = _hash_token(token) return token_hash in _active_tokens and _active_tokens[token_hash] > time.time() async def token_endpoint(request: Request) -> JSONResponse: """OAuth 2.0 token endpoint (client_credentials grant). POST /token Content-Type: application/x-www-form-urlencoded grant_type=client_credentials&client_id=...&client_secret=... """ try: # Accept both form-encoded and JSON content_type = request.headers.get("content-type", "") if "application/json" in content_type: data = await request.json() else: form = await request.form() data = dict(form) except Exception: return JSONResponse( {"error": "invalid_request", "error_description": "Could not parse request body"}, status_code=400, ) grant_type = data.get("grant_type") if grant_type != "client_credentials": return JSONResponse( {"error": "unsupported_grant_type", "error_description": "Only client_credentials is supported"}, status_code=400, ) client_id = data.get("client_id", "") client_secret = data.get("client_secret", "") try: expected_id, expected_secret = _get_oauth_credentials() except RuntimeError: logger.error("OAuth credentials not configured") return JSONResponse( {"error": "server_error", "error_description": "Auth not configured"}, status_code=500, ) if not secrets.compare_digest(client_id, expected_id) or \ not secrets.compare_digest(client_secret, expected_secret): logger.warning(f"OAuth auth failed from {request.client.host}") return JSONResponse( {"error": "invalid_client", "error_description": "Invalid client credentials"}, status_code=401, ) # Issue token access_token = secrets.token_urlsafe(48) _active_tokens[_hash_token(access_token)] = time.time() + TOKEN_LIFETIME _cleanup_expired() logger.info(f"OAuth token issued to {request.client.host}") return JSONResponse({ "access_token": access_token, "token_type": "Bearer", "expires_in": TOKEN_LIFETIME, }) async def oauth_metadata(request: Request) -> JSONResponse: """OAuth 2.0 Authorization Server Metadata (RFC 8414). GET /.well-known/oauth-authorization-server """ # Build base URL from request base = str(request.base_url).rstrip("/") return JSONResponse({ "issuer": base, "token_endpoint": f"{base}/token", "token_endpoint_auth_methods_supported": ["client_secret_post"], "grant_types_supported": ["client_credentials"], "response_types_supported": [], "scopes_supported": ["mcp"], }) # Routes to add to the app auth_routes = [ Route("/.well-known/oauth-authorization-server", oauth_metadata, methods=["GET"]), Route("/token", token_endpoint, methods=["POST"]), ]