"""FastMCP server exposing bridge tools to claude.ai.""" import json import logging from datetime import datetime, timezone from fastmcp import FastMCP import contextlib import httpx from fastmcp.server.auth import TokenVerifier, AccessToken from fastmcp.server.auth.oauth_proxy import OAuthProxy from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from .db import Database from .config import get_group_chat_id, load_credentials logger = logging.getLogger(__name__) db: Database | None = None FORGEJO_URL = "https://git.georgsen.dk" class ForgejoTokenVerifier(TokenVerifier): """Verify OAuth tokens against Forgejo's API.""" def __init__(self, forgejo_url: str = FORGEJO_URL): super().__init__(required_scopes=None) self.forgejo_url = forgejo_url async def verify_token(self, token: str) -> AccessToken | None: try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( f"{self.forgejo_url}/api/v1/user", headers={"Authorization": f"Bearer {token}"}, ) if resp.status_code != 200: return None user = resp.json() return AccessToken( token=token, client_id=str(user["id"]), scopes=[], expires_at=None, claims={"sub": str(user["id"]), "login": user.get("login")}, ) except Exception as e: logger.debug(f"Forgejo token verification failed: {e}") return None creds = load_credentials() FORGEJO_INTERNAL = "http://10.5.0.14:3000" auth = OAuthProxy( upstream_authorization_endpoint=f"{FORGEJO_URL}/login/oauth/authorize", upstream_token_endpoint=f"{FORGEJO_INTERNAL}/login/oauth/access_token", upstream_client_id=creds["FORGEJO_OAUTH_CLIENT_ID"], upstream_client_secret=creds["FORGEJO_OAUTH_CLIENT_SECRET"], token_verifier=ForgejoTokenVerifier(forgejo_url=FORGEJO_INTERNAL), base_url="https://mcp.georgsen.dk", ) mcp = FastMCP( name="homelab-bridge", auth=auth, instructions=( "This MCP server bridges claude.ai to a homelab Telegram group chat. " "Use pull_updates to read conversation history (supports cursor-based pagination). " "Use send_message to post messages to the group (attributed as [claude.ai]). " "Use queue_status for a quick summary." ), ) def init(database: Database): global db db = database @mcp.tool() def send_message(message: str) -> str: """Send a message to the homelab Telegram group chat. The message will be posted with [claude.ai] attribution so participants know the message came from claude.ai. Args: message: The text to send to the group chat. """ chat_id = get_group_chat_id() outbound_id = db.queue_outbound(chat_id, message) return json.dumps({"sent": True, "id": outbound_id}) @mcp.tool() def pull_updates(since_id: int = 0, since: str | None = None, limit: int = 50) -> str: """Pull conversation messages from the Telegram group. Returns messages from all participants (Mikkel, homelab bot, MCP bot). Supports cursor-based pagination: use the returned 'cursor' value as 'since_id' in the next call to get only new messages. Args: since_id: Return messages with id > this value. Use cursor from previous response. since: ISO 8601 timestamp. Alternative to since_id — returns messages after this time. limit: Maximum number of messages to return (default 50, max 200). """ limit = min(limit, 200) if since: messages = db.get_messages_since_timestamp(since, limit) else: messages = db.get_messages_since_id(since_id, limit) for msg in messages: if msg["has_attachment"]: msg["attachments"] = db.get_attachments_for_message(msg["id"]) else: msg["attachments"] = [] del msg["has_attachment"] cursor = messages[-1]["id"] if messages else since_id return json.dumps({ "messages": messages, "cursor": cursor, "count": len(messages), }) @mcp.tool() def queue_status() -> str: """Get current status of the bridge. Returns message counts, last activity, and pending outbound messages. """ status = db.get_status() return json.dumps(status) # Custom non-MCP routes (no auth required - local access only) INTERNAL_PREFIXES = ("127.", "10.5.0.", "::1", "100.79.") # localhost, LAN, NetBird async def ingest_message(request: Request) -> JSONResponse: """HTTP endpoint for local services to log messages into the bridge.""" # Check real client IP (X-Forwarded-For from NPM, or direct connection) forwarded = request.headers.get("x-forwarded-for", "") real_ip = request.headers.get("x-real-ip", "") client_ip = forwarded.split(",")[0].strip() or real_ip or (request.client.host if request.client else "") if not any(client_ip.startswith(p) for p in INTERNAL_PREFIXES): return JSONResponse({"error": "forbidden"}, status_code=403) try: data = await request.json() except Exception: return JSONResponse({"error": "invalid JSON"}, status_code=400) telegram_message_id = data.get("telegram_message_id") chat_id = data.get("chat_id") if not telegram_message_id or not chat_id: return JSONResponse( {"error": "telegram_message_id and chat_id are required"}, status_code=400, ) created_at = data.get("created_at", datetime.now(timezone.utc).isoformat()) msg_id = db.insert_message( telegram_message_id=telegram_message_id, chat_id=chat_id, sender_type=data.get("sender_type", "unknown"), sender_id=data.get("sender_id"), sender_name=data.get("sender_name"), content=data.get("content"), reply_to_message_id=data.get("reply_to_message_id"), has_attachment=data.get("has_attachment", False), created_at=created_at, ) if msg_id is None: return JSONResponse({"ok": True, "duplicate": True}) logger.info(f"Ingested message {telegram_message_id} from {data.get('sender_name', 'unknown')}") return JSONResponse({"ok": True, "id": msg_id}) async def health(request: Request) -> JSONResponse: """Health check endpoint.""" status = db.get_status() return JSONResponse({"status": "ok", **status}) custom_routes = [ Route("/api/ingest", ingest_message, methods=["POST"]), Route("/api/health", health, methods=["GET"]), ]