"""FastMCP server exposing bridge tools to claude.ai.""" import json import logging from datetime import datetime, timezone from fastmcp import FastMCP import contextlib import httpx from fastmcp.server.auth import TokenVerifier, AccessToken from fastmcp.server.auth.oauth_proxy import OAuthProxy from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from .db import Database from .config import get_group_chat_id, load_credentials logger = logging.getLogger(__name__) db: Database | None = None FORGEJO_URL = "https://git.georgsen.dk" class ForgejoTokenVerifier(TokenVerifier): """Verify OAuth tokens against Forgejo's API.""" def __init__(self, forgejo_url: str = FORGEJO_URL): super().__init__(required_scopes=None) self.forgejo_url = forgejo_url async def verify_token(self, token: str) -> AccessToken | None: try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get( f"{self.forgejo_url}/api/v1/user", headers={"Authorization": f"Bearer {token}"}, ) if resp.status_code != 200: return None user = resp.json() return AccessToken( token=token, client_id=str(user["id"]), scopes=[], expires_at=None, claims={"sub": str(user["id"]), "login": user.get("login")}, ) except Exception as e: logger.debug(f"Forgejo token verification failed: {e}") return None creds = load_credentials() FORGEJO_INTERNAL = "http://10.5.0.14:3000" auth = OAuthProxy( upstream_authorization_endpoint=f"{FORGEJO_URL}/login/oauth/authorize", upstream_token_endpoint=f"{FORGEJO_INTERNAL}/login/oauth/access_token", upstream_client_id=creds["FORGEJO_OAUTH_CLIENT_ID"], upstream_client_secret=creds["FORGEJO_OAUTH_CLIENT_SECRET"], token_verifier=ForgejoTokenVerifier(forgejo_url=FORGEJO_INTERNAL), base_url="https://mcp.georgsen.dk", ) mcp = FastMCP( name="homelab-bridge", auth=auth, instructions=( "This MCP server bridges claude.ai to a homelab Telegram group chat. " "Use pull_updates to read conversation history (supports cursor-based pagination). " "Use send_message to post messages to the group (attributed as [claude.ai]). " "Use queue_status for a quick summary." ), ) def init(database: Database): global db db = database @mcp.tool() def send_message(message: str) -> str: """Send a message to the homelab Telegram group chat. The message will be posted with [claude.ai] attribution so participants know the message came from claude.ai. Args: message: The text to send to the group chat. """ chat_id = get_group_chat_id() outbound_id = db.queue_outbound(chat_id, message) # Write to homelab bot inbox so it processes the task from pathlib import Path inbox_path = Path.home() / "homelab" / "telegram" / "inbox" try: with open(inbox_path, "a") as f: f.write(f"[MCP Bridge Task from claude.ai] {message}\nAcknowledge this task and begin working on it. Respond in the group chat.\n") except Exception: pass return json.dumps({"sent": True, "id": outbound_id}) @mcp.tool() def pull_updates(since_id: int = 0, since: str | None = None, limit: int = 50) -> str: """Pull conversation messages from the Telegram group. Returns messages from all participants (Mikkel, homelab bot, MCP bot). Supports cursor-based pagination: use the returned 'cursor' value as 'since_id' in the next call to get only new messages. Args: since_id: Return messages with id > this value. Use cursor from previous response. since: ISO 8601 timestamp. Alternative to since_id — returns messages after this time. limit: Maximum number of messages to return (default 50, max 200). """ limit = min(limit, 200) if since: messages = db.get_messages_since_timestamp(since, limit) else: messages = db.get_messages_since_id(since_id, limit) for msg in messages: if msg["has_attachment"]: msg["attachments"] = db.get_attachments_for_message(msg["id"]) else: msg["attachments"] = [] del msg["has_attachment"] cursor = messages[-1]["id"] if messages else since_id return json.dumps({ "messages": messages, "cursor": cursor, "count": len(messages), }) @mcp.tool() def queue_status() -> str: """Get current status of the bridge. Returns message counts, last activity, and pending outbound messages. """ status = db.get_status() return json.dumps(status) # Custom non-MCP routes (no auth required - local access only) INTERNAL_PREFIXES = ("127.", "10.5.0.", "::1", "100.79.") # localhost, LAN, NetBird async def ingest_message(request: Request) -> JSONResponse: """HTTP endpoint for local services to log messages into the bridge.""" # Require shared secret for ingest (only the homelab bot knows this) ingest_key = creds.get("INGEST_SECRET", "") provided_key = request.headers.get("x-ingest-key", "") if not ingest_key or not provided_key or ingest_key != provided_key: return JSONResponse({"error": "forbidden"}, status_code=403) try: data = await request.json() except Exception: return JSONResponse({"error": "invalid JSON"}, status_code=400) telegram_message_id = data.get("telegram_message_id") chat_id = data.get("chat_id") if not telegram_message_id or not chat_id: return JSONResponse( {"error": "telegram_message_id and chat_id are required"}, status_code=400, ) created_at = data.get("created_at", datetime.now(timezone.utc).isoformat()) msg_id = db.insert_message( telegram_message_id=telegram_message_id, chat_id=chat_id, sender_type=data.get("sender_type", "unknown"), sender_id=data.get("sender_id"), sender_name=data.get("sender_name"), content=data.get("content"), reply_to_message_id=data.get("reply_to_message_id"), has_attachment=data.get("has_attachment", False), created_at=created_at, ) if msg_id is None: return JSONResponse({"ok": True, "duplicate": True}) logger.info(f"Ingested message {telegram_message_id} from {data.get('sender_name', 'unknown')}") return JSONResponse({"ok": True, "id": msg_id}) async def health(request: Request) -> JSONResponse: """Health check endpoint with both bot statuses.""" from pathlib import Path import subprocess as sp status = db.get_status() # Check homelab bot inbox inbox_path = Path.home() / "homelab" / "telegram" / "inbox" inbox_size = inbox_path.stat().st_size if inbox_path.exists() else 0 inbox_lines = len(inbox_path.read_text().splitlines()) if inbox_size > 0 else 0 # Check both services mcp_active = True # We're running if this responds try: result = sp.run(["systemctl", "--user", "is-active", "telegram-bot"], capture_output=True, text=True, timeout=2) homelab_bot_active = result.stdout.strip() == "active" except Exception: homelab_bot_active = False return JSONResponse({ "status": "ok", "mcp_bridge": {"active": mcp_active, "telegram_polling": True}, "homelab_bot": {"active": homelab_bot_active}, "inbox": {"pending_lines": inbox_lines, "bytes": inbox_size}, **status, }) custom_routes = [ Route("/api/ingest", ingest_message, methods=["POST"]), Route("/api/health", health, methods=["GET"]), ]