telegram-bot-mcp/mcp_bridge/mcp_server.py
Mikkel Georgsen 1dff4630fe fix: use FastMCP's InMemoryOAuthProvider instead of custom implementation
Replaced hand-rolled OAuth with FastMCP's battle-tested
InMemoryOAuthProvider. Handles DCR, PKCE, token exchange,
refresh tokens, and revocation out of the box.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 11:32:01 +00:00

150 lines
4.6 KiB
Python

"""FastMCP server exposing bridge tools to claude.ai."""
import json
import logging
from datetime import datetime, timezone
from fastmcp import FastMCP
from fastmcp.server.auth.providers.in_memory import InMemoryOAuthProvider
from fastmcp.server.auth.auth import ClientRegistrationOptions
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from .db import Database
from .config import get_group_chat_id
logger = logging.getLogger(__name__)
db: Database | None = None
oauth = InMemoryOAuthProvider(
base_url="https://mcp.georgsen.dk",
client_registration_options=ClientRegistrationOptions(enabled=True),
)
mcp = FastMCP(
name="homelab-bridge",
auth=oauth,
instructions=(
"This MCP server bridges claude.ai to a homelab Telegram group chat. "
"Use pull_updates to read conversation history (supports cursor-based pagination). "
"Use send_message to post messages to the group (attributed as [claude.ai]). "
"Use queue_status for a quick summary."
),
)
def init(database: Database):
global db
db = database
@mcp.tool()
def send_message(message: str) -> str:
"""Send a message to the homelab Telegram group chat.
The message will be posted with [claude.ai] attribution so participants
know the message came from claude.ai.
Args:
message: The text to send to the group chat.
"""
chat_id = get_group_chat_id()
outbound_id = db.queue_outbound(chat_id, message)
return json.dumps({"sent": True, "id": outbound_id})
@mcp.tool()
def pull_updates(since_id: int = 0, since: str | None = None, limit: int = 50) -> str:
"""Pull conversation messages from the Telegram group.
Returns messages from all participants (Mikkel, homelab bot, MCP bot).
Supports cursor-based pagination: use the returned 'cursor' value as
'since_id' in the next call to get only new messages.
Args:
since_id: Return messages with id > this value. Use cursor from previous response.
since: ISO 8601 timestamp. Alternative to since_id — returns messages after this time.
limit: Maximum number of messages to return (default 50, max 200).
"""
limit = min(limit, 200)
if since:
messages = db.get_messages_since_timestamp(since, limit)
else:
messages = db.get_messages_since_id(since_id, limit)
for msg in messages:
if msg["has_attachment"]:
msg["attachments"] = db.get_attachments_for_message(msg["id"])
else:
msg["attachments"] = []
del msg["has_attachment"]
cursor = messages[-1]["id"] if messages else since_id
return json.dumps({
"messages": messages,
"cursor": cursor,
"count": len(messages),
})
@mcp.tool()
def queue_status() -> str:
"""Get current status of the bridge.
Returns message counts, last activity, and pending outbound messages.
"""
status = db.get_status()
return json.dumps(status)
# Custom non-MCP routes (no auth required - local access only)
async def ingest_message(request: Request) -> JSONResponse:
"""HTTP endpoint for local services to log messages into the bridge."""
try:
data = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON"}, status_code=400)
telegram_message_id = data.get("telegram_message_id")
chat_id = data.get("chat_id")
if not telegram_message_id or not chat_id:
return JSONResponse(
{"error": "telegram_message_id and chat_id are required"},
status_code=400,
)
created_at = data.get("created_at", datetime.now(timezone.utc).isoformat())
msg_id = db.insert_message(
telegram_message_id=telegram_message_id,
chat_id=chat_id,
sender_type=data.get("sender_type", "unknown"),
sender_id=data.get("sender_id"),
sender_name=data.get("sender_name"),
content=data.get("content"),
reply_to_message_id=data.get("reply_to_message_id"),
has_attachment=data.get("has_attachment", False),
created_at=created_at,
)
if msg_id is None:
return JSONResponse({"ok": True, "duplicate": True})
logger.info(f"Ingested message {telegram_message_id} from {data.get('sender_name', 'unknown')}")
return JSONResponse({"ok": True, "id": msg_id})
async def health(request: Request) -> JSONResponse:
"""Health check endpoint."""
status = db.get_status()
return JSONResponse({"status": "ok", **status})
custom_routes = [
Route("/api/ingest", ingest_message, methods=["POST"]),
Route("/api/health", health, methods=["GET"]),
]