telegram-bot-mcp/mcp_bridge/mcp_server.py
Mikkel Georgsen a7344fa332 fix: break feedback loop - only write to inbox from send_message tool
Previously the outbound loop wrote every message to inbox, causing
the homelab bot to process its own responses as new tasks. Now only
explicit claude.ai send_message tool calls write to inbox.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 12:53:04 +00:00

210 lines
6.8 KiB
Python

"""FastMCP server exposing bridge tools to claude.ai."""
import json
import logging
from datetime import datetime, timezone
from fastmcp import FastMCP
import contextlib
import httpx
from fastmcp.server.auth import TokenVerifier, AccessToken
from fastmcp.server.auth.oauth_proxy import OAuthProxy
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from .db import Database
from .config import get_group_chat_id, load_credentials
logger = logging.getLogger(__name__)
db: Database | None = None
FORGEJO_URL = "https://git.georgsen.dk"
class ForgejoTokenVerifier(TokenVerifier):
"""Verify OAuth tokens against Forgejo's API."""
def __init__(self, forgejo_url: str = FORGEJO_URL):
super().__init__(required_scopes=None)
self.forgejo_url = forgejo_url
async def verify_token(self, token: str) -> AccessToken | None:
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.forgejo_url}/api/v1/user",
headers={"Authorization": f"Bearer {token}"},
)
if resp.status_code != 200:
return None
user = resp.json()
return AccessToken(
token=token,
client_id=str(user["id"]),
scopes=[],
expires_at=None,
claims={"sub": str(user["id"]), "login": user.get("login")},
)
except Exception as e:
logger.debug(f"Forgejo token verification failed: {e}")
return None
creds = load_credentials()
FORGEJO_INTERNAL = "http://10.5.0.14:3000"
auth = OAuthProxy(
upstream_authorization_endpoint=f"{FORGEJO_URL}/login/oauth/authorize",
upstream_token_endpoint=f"{FORGEJO_INTERNAL}/login/oauth/access_token",
upstream_client_id=creds["FORGEJO_OAUTH_CLIENT_ID"],
upstream_client_secret=creds["FORGEJO_OAUTH_CLIENT_SECRET"],
token_verifier=ForgejoTokenVerifier(forgejo_url=FORGEJO_INTERNAL),
base_url="https://mcp.georgsen.dk",
)
mcp = FastMCP(
name="homelab-bridge",
auth=auth,
instructions=(
"This MCP server bridges claude.ai to a homelab Telegram group chat. "
"Use pull_updates to read conversation history (supports cursor-based pagination). "
"Use send_message to post messages to the group (attributed as [claude.ai]). "
"Use queue_status for a quick summary."
),
)
def init(database: Database):
global db
db = database
@mcp.tool()
def send_message(message: str) -> str:
"""Send a message to the homelab Telegram group chat.
The message will be posted with [claude.ai] attribution so participants
know the message came from claude.ai.
Args:
message: The text to send to the group chat.
"""
chat_id = get_group_chat_id()
outbound_id = db.queue_outbound(chat_id, message)
# Write to homelab bot inbox so it processes the task
from pathlib import Path
inbox_path = Path.home() / "homelab" / "telegram" / "inbox"
try:
with open(inbox_path, "a") as f:
f.write(f"[MCP Bridge Task from claude.ai] {message}\nAcknowledge this task and begin working on it. Respond in the group chat.\n")
except Exception:
pass
return json.dumps({"sent": True, "id": outbound_id})
@mcp.tool()
def pull_updates(since_id: int = 0, since: str | None = None, limit: int = 50) -> str:
"""Pull conversation messages from the Telegram group.
Returns messages from all participants (Mikkel, homelab bot, MCP bot).
Supports cursor-based pagination: use the returned 'cursor' value as
'since_id' in the next call to get only new messages.
Args:
since_id: Return messages with id > this value. Use cursor from previous response.
since: ISO 8601 timestamp. Alternative to since_id — returns messages after this time.
limit: Maximum number of messages to return (default 50, max 200).
"""
limit = min(limit, 200)
if since:
messages = db.get_messages_since_timestamp(since, limit)
else:
messages = db.get_messages_since_id(since_id, limit)
for msg in messages:
if msg["has_attachment"]:
msg["attachments"] = db.get_attachments_for_message(msg["id"])
else:
msg["attachments"] = []
del msg["has_attachment"]
cursor = messages[-1]["id"] if messages else since_id
return json.dumps({
"messages": messages,
"cursor": cursor,
"count": len(messages),
})
@mcp.tool()
def queue_status() -> str:
"""Get current status of the bridge.
Returns message counts, last activity, and pending outbound messages.
"""
status = db.get_status()
return json.dumps(status)
# Custom non-MCP routes (no auth required - local access only)
INTERNAL_PREFIXES = ("127.", "10.5.0.", "::1", "100.79.") # localhost, LAN, NetBird
async def ingest_message(request: Request) -> JSONResponse:
"""HTTP endpoint for local services to log messages into the bridge."""
# Require shared secret for ingest (only the homelab bot knows this)
ingest_key = creds.get("INGEST_SECRET", "")
provided_key = request.headers.get("x-ingest-key", "")
if not ingest_key or not provided_key or ingest_key != provided_key:
return JSONResponse({"error": "forbidden"}, status_code=403)
try:
data = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON"}, status_code=400)
telegram_message_id = data.get("telegram_message_id")
chat_id = data.get("chat_id")
if not telegram_message_id or not chat_id:
return JSONResponse(
{"error": "telegram_message_id and chat_id are required"},
status_code=400,
)
created_at = data.get("created_at", datetime.now(timezone.utc).isoformat())
msg_id = db.insert_message(
telegram_message_id=telegram_message_id,
chat_id=chat_id,
sender_type=data.get("sender_type", "unknown"),
sender_id=data.get("sender_id"),
sender_name=data.get("sender_name"),
content=data.get("content"),
reply_to_message_id=data.get("reply_to_message_id"),
has_attachment=data.get("has_attachment", False),
created_at=created_at,
)
if msg_id is None:
return JSONResponse({"ok": True, "duplicate": True})
logger.info(f"Ingested message {telegram_message_id} from {data.get('sender_name', 'unknown')}")
return JSONResponse({"ok": True, "id": msg_id})
async def health(request: Request) -> JSONResponse:
"""Health check endpoint."""
status = db.get_status()
return JSONResponse({"status": "ok", **status})
custom_routes = [
Route("/api/ingest", ingest_message, methods=["POST"]),
Route("/api/health", health, methods=["GET"]),
]