telegram-bot-mcp/mcp_bridge/mcp_server.py
Mikkel Georgsen 2a88b528d4 security: protect ingest endpoint with shared secret
Ingest API now requires X-Ingest-Key header matching INGEST_SECRET
from credentials. IP-based check was insufficient since NPM proxies
all external traffic from the same internal IP.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 11:47:22 +00:00

200 lines
6.5 KiB
Python

"""FastMCP server exposing bridge tools to claude.ai."""
import json
import logging
from datetime import datetime, timezone
from fastmcp import FastMCP
import contextlib
import httpx
from fastmcp.server.auth import TokenVerifier, AccessToken
from fastmcp.server.auth.oauth_proxy import OAuthProxy
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from .db import Database
from .config import get_group_chat_id, load_credentials
logger = logging.getLogger(__name__)
db: Database | None = None
FORGEJO_URL = "https://git.georgsen.dk"
class ForgejoTokenVerifier(TokenVerifier):
"""Verify OAuth tokens against Forgejo's API."""
def __init__(self, forgejo_url: str = FORGEJO_URL):
super().__init__(required_scopes=None)
self.forgejo_url = forgejo_url
async def verify_token(self, token: str) -> AccessToken | None:
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.forgejo_url}/api/v1/user",
headers={"Authorization": f"Bearer {token}"},
)
if resp.status_code != 200:
return None
user = resp.json()
return AccessToken(
token=token,
client_id=str(user["id"]),
scopes=[],
expires_at=None,
claims={"sub": str(user["id"]), "login": user.get("login")},
)
except Exception as e:
logger.debug(f"Forgejo token verification failed: {e}")
return None
creds = load_credentials()
FORGEJO_INTERNAL = "http://10.5.0.14:3000"
auth = OAuthProxy(
upstream_authorization_endpoint=f"{FORGEJO_URL}/login/oauth/authorize",
upstream_token_endpoint=f"{FORGEJO_INTERNAL}/login/oauth/access_token",
upstream_client_id=creds["FORGEJO_OAUTH_CLIENT_ID"],
upstream_client_secret=creds["FORGEJO_OAUTH_CLIENT_SECRET"],
token_verifier=ForgejoTokenVerifier(forgejo_url=FORGEJO_INTERNAL),
base_url="https://mcp.georgsen.dk",
)
mcp = FastMCP(
name="homelab-bridge",
auth=auth,
instructions=(
"This MCP server bridges claude.ai to a homelab Telegram group chat. "
"Use pull_updates to read conversation history (supports cursor-based pagination). "
"Use send_message to post messages to the group (attributed as [claude.ai]). "
"Use queue_status for a quick summary."
),
)
def init(database: Database):
global db
db = database
@mcp.tool()
def send_message(message: str) -> str:
"""Send a message to the homelab Telegram group chat.
The message will be posted with [claude.ai] attribution so participants
know the message came from claude.ai.
Args:
message: The text to send to the group chat.
"""
chat_id = get_group_chat_id()
outbound_id = db.queue_outbound(chat_id, message)
return json.dumps({"sent": True, "id": outbound_id})
@mcp.tool()
def pull_updates(since_id: int = 0, since: str | None = None, limit: int = 50) -> str:
"""Pull conversation messages from the Telegram group.
Returns messages from all participants (Mikkel, homelab bot, MCP bot).
Supports cursor-based pagination: use the returned 'cursor' value as
'since_id' in the next call to get only new messages.
Args:
since_id: Return messages with id > this value. Use cursor from previous response.
since: ISO 8601 timestamp. Alternative to since_id — returns messages after this time.
limit: Maximum number of messages to return (default 50, max 200).
"""
limit = min(limit, 200)
if since:
messages = db.get_messages_since_timestamp(since, limit)
else:
messages = db.get_messages_since_id(since_id, limit)
for msg in messages:
if msg["has_attachment"]:
msg["attachments"] = db.get_attachments_for_message(msg["id"])
else:
msg["attachments"] = []
del msg["has_attachment"]
cursor = messages[-1]["id"] if messages else since_id
return json.dumps({
"messages": messages,
"cursor": cursor,
"count": len(messages),
})
@mcp.tool()
def queue_status() -> str:
"""Get current status of the bridge.
Returns message counts, last activity, and pending outbound messages.
"""
status = db.get_status()
return json.dumps(status)
# Custom non-MCP routes (no auth required - local access only)
INTERNAL_PREFIXES = ("127.", "10.5.0.", "::1", "100.79.") # localhost, LAN, NetBird
async def ingest_message(request: Request) -> JSONResponse:
"""HTTP endpoint for local services to log messages into the bridge."""
# Require shared secret for ingest (only the homelab bot knows this)
ingest_key = creds.get("INGEST_SECRET", "")
provided_key = request.headers.get("x-ingest-key", "")
if not ingest_key or not provided_key or ingest_key != provided_key:
return JSONResponse({"error": "forbidden"}, status_code=403)
try:
data = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON"}, status_code=400)
telegram_message_id = data.get("telegram_message_id")
chat_id = data.get("chat_id")
if not telegram_message_id or not chat_id:
return JSONResponse(
{"error": "telegram_message_id and chat_id are required"},
status_code=400,
)
created_at = data.get("created_at", datetime.now(timezone.utc).isoformat())
msg_id = db.insert_message(
telegram_message_id=telegram_message_id,
chat_id=chat_id,
sender_type=data.get("sender_type", "unknown"),
sender_id=data.get("sender_id"),
sender_name=data.get("sender_name"),
content=data.get("content"),
reply_to_message_id=data.get("reply_to_message_id"),
has_attachment=data.get("has_attachment", False),
created_at=created_at,
)
if msg_id is None:
return JSONResponse({"ok": True, "duplicate": True})
logger.info(f"Ingested message {telegram_message_id} from {data.get('sender_name', 'unknown')}")
return JSONResponse({"ok": True, "id": msg_id})
async def health(request: Request) -> JSONResponse:
"""Health check endpoint."""
status = db.get_status()
return JSONResponse({"status": "ok", **status})
custom_routes = [
Route("/api/ingest", ingest_message, methods=["POST"]),
Route("/api/health", health, methods=["GET"]),
]