diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..001a7aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.venv/ +__pycache__/ +*.pyc +data/ +media/ +credentials +.build-complete +heartbeat.sh diff --git a/README.md b/README.md index 5516f0e..0f53df9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,165 @@ -# telegram-bot-mcp +# Nexus MCP Bridge +MCP server that bridges claude.ai to a homelab Telegram group chat. Logs all messages and files to libsql, exposes three MCP tools over HTTP on the NetBird mesh. + +## Architecture + +``` +claude.ai ──HTTP──► MCP Bridge (mgmt.mg:8321) ──Telegram API──► Group Chat + │ │ + libsql (local) Mikkel + Homelab Bot + MCP Bot + media/ (files) +``` + +Single Python process running: +- **Telegram bot** — polls group chat, logs everything to libsql, sends outbound messages +- **FastMCP HTTP server** — exposes `send_message`, `pull_updates`, `queue_status` tools + +## Setup Guide + +### Step 1: Create a new Telegram bot + +1. Open Telegram, message [@BotFather](https://t.me/BotFather) +2. Send `/newbot` +3. Name: `Nexus MCP Bridge` (or whatever you like) +4. Username: `nexus_mcp_bot` (must be unique, pick something available) +5. Copy the **bot token** BotFather gives you + +### Step 2: Create the Telegram group + +1. In Telegram, create a **new group** +2. Name: `Homelab Bridge` (or your preference) +3. Add members: + - Your existing homelab bot (`@georgsen_homelab_bot`) + - The new MCP bot (search by the username you chose) +4. **Important:** Make both bots **group admins** so they can read all messages + - Tap group name → Edit → Administrators → Add both bots + - Bots need at minimum: "Read messages" permission (enabled by default for admins) + +### Step 3: Get the group chat ID + +Option A — Send a message in the group, then check: +```bash +curl -s "https://api.telegram.org/bot/getUpdates" | python3 -m json.tool +``` +Look for `"chat": {"id": -100XXXXXXXXXX}` — the negative number is the group chat ID. + +Option B — The bridge logs the chat ID on startup. You can start it once, send a message in the group, and check the logs. + +### Step 4: Configure credentials + +```bash +cd ~/repos/telegram-bot-mcp +cp credentials.example credentials +``` + +Edit `credentials`: +``` +MCP_BOT_TOKEN= +GROUP_CHAT_ID= +HOMELAB_BOT_ID=8521598773 +``` + +### Step 5: Install and test + +```bash +cd ~/repos/telegram-bot-mcp + +# Create venv (if not already done) +python3 -m venv .venv + +# Install dependencies +.venv/bin/pip install -r requirements.txt + +# Test run (Ctrl+C to stop) +.venv/bin/python -m mcp_bridge +``` + +You should see: +``` +Database initialized at /home/mikkel/repos/telegram-bot-mcp/data/bridge.db +MCP server starting on 0.0.0.0:8321 +Telegram bot polling started +MCP Bridge bot started as @nexus_mcp_bot (ID: XXXXXXX) +Monitoring group chat: -100XXXXXXXXXX +``` + +Send a message in the group — you should see it logged. + +### Step 6: Install systemd service + +```bash +cp mcp-bridge.service ~/.config/systemd/user/ +systemctl --user daemon-reload +systemctl --user enable --now mcp-bridge +systemctl --user status mcp-bridge +``` + +View logs: +```bash +journalctl --user -u mcp-bridge -f +``` + +### Step 7: Configure claude.ai MCP connection + +In claude.ai settings, add a new MCP server: + +- **URL:** `http://mgmt.mg:8321/mcp` +- **Transport:** Streamable HTTP + +This works because mgmt.mg resolves via NetBird mesh — no public exposure needed. + +## MCP Tools + +### send_message +Send a message to the group, attributed as `[claude.ai]`. + +```json +{"message": "Fix the nexus.mg DNS record to 100.79.65.206"} +``` + +### pull_updates +Get conversation messages with cursor-based pagination. + +```json +{"since_id": 0, "limit": 50} +``` + +Returns messages from all participants with attachment metadata. Use the `cursor` value from the response as `since_id` in the next call. + +### queue_status +Quick summary: total messages, last activity, pending outbound count. + +## File Structure + +``` +├── mcp_bridge/ +│ ├── __main__.py # Entry point (bot + MCP server) +│ ├── config.py # Configuration loader +│ ├── db.py # libsql database layer +│ ├── telegram_bot.py # Telegram bot (logging + sending) +│ ├── mcp_server.py # FastMCP tool definitions +│ └── models.py # Data models +├── data/ # libsql database (auto-created) +├── media/ # Downloaded attachments (auto-created) +├── credentials # Bot token + chat ID (not in git) +├── credentials.example # Template +├── requirements.txt +└── mcp-bridge.service # systemd unit file +``` + +## Troubleshooting + +**Bot doesn't see group messages:** +- Ensure bot is a group admin +- Check BotFather: `/mybots` → Bot Settings → Group Privacy → Turn OFF + (bots have "privacy mode" ON by default — they only see commands unless it's disabled) + +**Can't get group chat ID:** +- Make sure you sent a message AFTER adding the bot +- For supergroups, the ID starts with `-100` + +**MCP server unreachable from claude.ai:** +- Verify NetBird is connected: `netbird status` +- Test locally: `curl http://mgmt.mg:8321/mcp` +- Check firewall: port 8321 must be open diff --git a/credentials.example b/credentials.example new file mode 100644 index 0000000..e076b10 --- /dev/null +++ b/credentials.example @@ -0,0 +1,13 @@ +# MCP Bridge Bot credentials +# Copy to 'credentials' and fill in values + +# Bot token from BotFather (create a NEW bot for the MCP bridge) +MCP_BOT_TOKEN= + +# Telegram group chat ID (negative number for groups) +# Send a message in the group, then check: https://api.telegram.org/bot/getUpdates +GROUP_CHAT_ID= + +# (Optional) Bot ID of the existing homelab bot, for sender classification +# Find it: https://api.telegram.org/bot/getMe +HOMELAB_BOT_ID=8521598773 diff --git a/docs/2026-03-30-nexus-mcp-bridge-design.md b/docs/2026-03-30-nexus-mcp-bridge-design.md new file mode 100644 index 0000000..529e8c8 --- /dev/null +++ b/docs/2026-03-30-nexus-mcp-bridge-design.md @@ -0,0 +1,237 @@ +# Nexus MCP Bridge — Design Spec + +**Date:** 2026-03-30 +**Status:** Approved (user greenlit Approach 1, all design questions resolved) + +--- + +## Summary + +A single-process Python application that: +1. Runs a **Telegram bot** in a group chat, logging all messages/files to **libsql** +2. Exposes an **MCP server** (FastMCP over HTTP) on the NetBird mesh for claude.ai to query + +This lets claude.ai dispatch tasks to the homelab agent and pull conversation updates — without Mikkel being the copy-paste middleman. + +--- + +## Design Decisions + +| Question | Decision | Rationale | +|----------|----------|-----------| +| Telegram conflict | New bot + group chat | Each bot has own token, no polling conflict | +| What to capture | Everything (all participants + files) | Full replay capability | +| File storage | Download to disk + store file_id | Durability + convenience | +| Auth | None (NetBird mesh is trust boundary) | Only accessible from enrolled peers | +| Database | libsql embedded (local file) | Single process, no extra service | +| Scope | Single group chat (homelab) for v1 | Nail core loop, extend later | +| Bot personality | Attributed relay `[claude.ai] ...` | Clear attribution in group | +| Architecture | Single process (FastMCP + telegram bot) | Simple, systemd restart covers failures | + +--- + +## Architecture + +``` +┌──────────────┐ HTTP (NetBird) ┌─────────────────────────────┐ +│ claude.ai │ ◄──────────────────── │ MCP Bridge Process │ +│ MCP client │ ────────────────────► │ │ +└──────────────┘ │ ┌─────────┐ ┌──────────┐ │ + │ │ FastMCP │ │ Telegram │ │ + │ │ HTTP │ │ Bot │ │ + │ │ Server │ │ (polling)│ │ + │ └────┬─────┘ └────┬─────┘ │ + │ │ │ │ + │ └──────┬───────┘ │ + │ │ │ + │ ┌─────▼─────┐ │ + │ │ libsql │ │ + │ │ (embed) │ │ + │ └───────────┘ │ + │ │ + │ media/ (downloaded files) │ + └─────────────────────────────┘ + │ + Telegram Group Chat + ┌─────▼─────┐ + │ Mikkel │ + │ Homelab ♦ │ (existing bot) + │ MCP ♦ │ (new bot) + └───────────┘ +``` + +--- + +## Database Schema (libsql) + +```sql +CREATE TABLE messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + telegram_message_id INTEGER NOT NULL, + chat_id INTEGER NOT NULL, + sender_type TEXT NOT NULL, -- 'user', 'homelab_bot', 'mcp_bot', 'unknown' + sender_id INTEGER, + sender_name TEXT, + content TEXT, -- message text (nullable for media-only) + reply_to_message_id INTEGER, -- telegram reply reference + has_attachment INTEGER DEFAULT 0, + created_at TEXT NOT NULL, -- ISO 8601 + UNIQUE(chat_id, telegram_message_id) +); + +CREATE TABLE attachments ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + message_id INTEGER NOT NULL REFERENCES messages(id), + file_type TEXT NOT NULL, -- 'photo', 'document', 'video', 'voice', 'sticker' + file_id TEXT NOT NULL, -- telegram file_id + file_unique_id TEXT NOT NULL, -- telegram file_unique_id + file_name TEXT, -- original filename + mime_type TEXT, + file_size INTEGER, + local_path TEXT, -- path under media/ + caption TEXT, + created_at TEXT NOT NULL +); + +CREATE TABLE outbound_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chat_id INTEGER NOT NULL, + content TEXT NOT NULL, + attribution TEXT DEFAULT 'claude.ai', -- prefix for the message + status TEXT DEFAULT 'pending', -- 'pending', 'sent', 'failed' + created_at TEXT NOT NULL, + sent_at TEXT +); + +-- Index for delta queries +CREATE INDEX idx_messages_created_at ON messages(created_at); +CREATE INDEX idx_messages_chat_id ON messages(chat_id); +CREATE INDEX idx_outbound_status ON outbound_queue(status); +``` + +--- + +## MCP Tools + +### send_message +``` +Send a message to the homelab group chat, attributed as [claude.ai]. + +Input: { "message": "Fix the nexus.mg DNS record to 100.79.65.206" } +Output: { "sent": true, "id": 42 } +``` + +### pull_updates +``` +Get conversation messages since a cursor (message ID or timestamp). +Returns messages from all participants with attachment metadata. + +Input: { "since_id": 150 } or { "since": "2026-03-30T01:00:00Z" } or {} +Output: { + "messages": [ + { + "id": 151, + "sender": "mikkel", + "sender_type": "user", + "content": "Can you check the DNS?", + "attachments": [], + "created_at": "2026-03-30T01:02:15Z" + }, + { + "id": 152, + "sender": "homelab_bot", + "sender_type": "homelab_bot", + "content": "Checking Technitium... record found.", + "attachments": [{"file_type": "photo", "file_name": "dns-screenshot.png"}], + "created_at": "2026-03-30T01:02:45Z" + } + ], + "cursor": 152 +} +``` + +### queue_status +``` +Current state summary. + +Input: {} +Output: { + "total_messages": 152, + "last_message_at": "2026-03-30T01:02:45Z", + "last_sender": "homelab_bot", + "pending_outbound": 0 +} +``` + +--- + +## Telegram Bot Behavior + +- **Joins group chat** with Mikkel + existing homelab bot +- **Logs everything**: text, photos, documents, voice, stickers, replies +- **Downloads attachments** to `media//_` +- **Sends outbound** messages prefixed with `[claude.ai]` in bold +- **No commands** — this bot is a silent logger + relay, not interactive +- **Ignores private messages** — only operates in the configured group + +--- + +## File Structure + +``` +~/repos/telegram-bot-mcp/ +├── docs/ # Specs and design docs +├── mcp_bridge/ +│ ├── __init__.py +│ ├── __main__.py # Entry point: runs both bot + MCP server +│ ├── config.py # Configuration (env vars, paths) +│ ├── db.py # libsql database layer +│ ├── telegram_bot.py # Telegram bot (polling, logging, sending) +│ ├── mcp_server.py # FastMCP tool definitions +│ └── models.py # Shared data models +├── media/ # Downloaded attachments +├── data/ # libsql database file +├── credentials # BOT_TOKEN, GROUP_CHAT_ID (created during setup) +├── requirements.txt +├── heartbeat.sh +└── README.md +``` + +--- + +## Configuration + +Environment/file based: +- `credentials` file: `MCP_BOT_TOKEN=...`, `GROUP_CHAT_ID=...`, `HOMELAB_BOT_ID=...` +- MCP server binds to `0.0.0.0:8321` (accessible via NetBird at `mgmt.mg:8321`) +- Database at `data/bridge.db` +- Media at `media/` + +--- + +## Deployment + +- **systemd user service** (`mcp-bridge.service`) +- Uses project-local venv at `.venv/` +- `Restart=always`, `RestartSec=5` +- Bind to `0.0.0.0:8321` (NetBird interface) + +--- + +## Setup Steps (for user) + +1. Create new Telegram bot via BotFather → get token +2. Create Telegram group → add Mikkel + homelab bot + MCP bot +3. Get group chat ID (bot will log it on first message) +4. Fill in `credentials` file +5. `systemctl --user enable --now mcp-bridge` +6. Add MCP URL `http://mgmt.mg:8321/mcp` in claude.ai settings + +--- + +## Future Extensions + +- Multi-group support (per-project chats with `target` parameter) +- Session tracking (detect agent restarts) +- File content search across attachments +- Message threading/reply chain reconstruction diff --git a/mcp-bridge.service b/mcp-bridge.service new file mode 100644 index 0000000..8d444bf --- /dev/null +++ b/mcp-bridge.service @@ -0,0 +1,19 @@ +[Unit] +Description=Nexus MCP Bridge (Telegram + MCP Server) +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +WorkingDirectory=/home/mikkel/repos/telegram-bot-mcp +ExecStart=/home/mikkel/repos/telegram-bot-mcp/.venv/bin/python -m mcp_bridge +Restart=always +RestartSec=5 +KillMode=mixed +KillSignal=SIGTERM +TimeoutStopSec=15 + +Environment=PATH=/home/mikkel/.local/bin:/home/mikkel/bin:/usr/local/bin:/usr/bin:/bin + +[Install] +WantedBy=default.target diff --git a/mcp_bridge/__init__.py b/mcp_bridge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mcp_bridge/__main__.py b/mcp_bridge/__main__.py new file mode 100644 index 0000000..b592c10 --- /dev/null +++ b/mcp_bridge/__main__.py @@ -0,0 +1,84 @@ +"""Entry point: runs both Telegram bot and MCP server in one process.""" + +import asyncio +import logging +import signal + +from .config import MCP_HOST, MCP_PORT, MEDIA_DIR, DB_PATH +from .db import Database +from .telegram_bot import BridgeBot +from .mcp_server import mcp, init as init_mcp + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger("mcp_bridge") + + +async def run_telegram_bot(bot: BridgeBot): + """Run the telegram bot polling loop.""" + app = bot.build_application() + await app.initialize() + await app.start() + updater = app.updater + await updater.start_polling(drop_pending_updates=True) + logger.info("Telegram bot polling started") + + try: + while True: + await asyncio.sleep(3600) + except asyncio.CancelledError: + logger.info("Telegram bot shutting down...") + await updater.stop() + await app.stop() + await app.shutdown() + + +async def run_mcp_server(): + """Run the FastMCP HTTP server using its built-in runner.""" + logger.info(f"MCP server starting on {MCP_HOST}:{MCP_PORT}") + await mcp.run_http_async( + host=MCP_HOST, + port=MCP_PORT, + log_level="info", + show_banner=False, + ) + + +async def main(): + """Start both services.""" + MEDIA_DIR.mkdir(parents=True, exist_ok=True) + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + + db = Database() + logger.info(f"Database initialized at {DB_PATH}") + + init_mcp(db) + + bot = BridgeBot(db) + + telegram_task = asyncio.create_task(run_telegram_bot(bot)) + mcp_task = asyncio.create_task(run_mcp_server()) + + loop = asyncio.get_event_loop() + + def handle_signal(): + logger.info("Received shutdown signal") + telegram_task.cancel() + mcp_task.cancel() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, handle_signal) + + try: + await asyncio.gather(telegram_task, mcp_task) + except asyncio.CancelledError: + pass + + logger.info("MCP Bridge stopped") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/mcp_bridge/config.py b/mcp_bridge/config.py new file mode 100644 index 0000000..0f7935b --- /dev/null +++ b/mcp_bridge/config.py @@ -0,0 +1,49 @@ +"""Configuration loader for MCP Bridge.""" + +import os +from pathlib import Path + +BASE_DIR = Path(__file__).resolve().parent.parent +CREDENTIALS_FILE = BASE_DIR / "credentials" +DB_PATH = BASE_DIR / "data" / "bridge.db" +MEDIA_DIR = BASE_DIR / "media" + +# MCP server settings +MCP_HOST = "0.0.0.0" +MCP_PORT = 8321 + + +def load_credentials() -> dict[str, str]: + """Load credentials from KEY=VALUE file.""" + config = {} + if CREDENTIALS_FILE.exists(): + with open(CREDENTIALS_FILE) as f: + for line in f: + line = line.strip() + if "=" in line and not line.startswith("#"): + key, value = line.split("=", 1) + config[key.strip()] = value.strip() + return config + + +def get_bot_token() -> str: + creds = load_credentials() + token = creds.get("MCP_BOT_TOKEN", "") + if not token: + raise RuntimeError("MCP_BOT_TOKEN not set in credentials file") + return token + + +def get_group_chat_id() -> int: + creds = load_credentials() + chat_id = creds.get("GROUP_CHAT_ID", "") + if not chat_id: + raise RuntimeError("GROUP_CHAT_ID not set in credentials file") + return int(chat_id) + + +def get_homelab_bot_id() -> int | None: + """Optional: ID of the existing homelab bot for sender_type classification.""" + creds = load_credentials() + bot_id = creds.get("HOMELAB_BOT_ID", "") + return int(bot_id) if bot_id else None diff --git a/mcp_bridge/db.py b/mcp_bridge/db.py new file mode 100644 index 0000000..194a936 --- /dev/null +++ b/mcp_bridge/db.py @@ -0,0 +1,241 @@ +"""Database layer using libsql (embedded).""" + +import libsql_experimental as libsql +from datetime import datetime, timezone +from pathlib import Path + +from .config import DB_PATH + + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + telegram_message_id INTEGER NOT NULL, + chat_id INTEGER NOT NULL, + sender_type TEXT NOT NULL, + sender_id INTEGER, + sender_name TEXT, + content TEXT, + reply_to_message_id INTEGER, + has_attachment INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + UNIQUE(chat_id, telegram_message_id) +); + +CREATE TABLE IF NOT EXISTS attachments ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + message_id INTEGER NOT NULL REFERENCES messages(id), + file_type TEXT NOT NULL, + file_id TEXT NOT NULL, + file_unique_id TEXT NOT NULL, + file_name TEXT, + mime_type TEXT, + file_size INTEGER, + local_path TEXT, + caption TEXT, + created_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS outbound_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chat_id INTEGER NOT NULL, + content TEXT NOT NULL, + attribution TEXT DEFAULT 'claude.ai', + status TEXT DEFAULT 'pending', + created_at TEXT NOT NULL, + sent_at TEXT +); + +CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(created_at); +CREATE INDEX IF NOT EXISTS idx_messages_chat_id ON messages(chat_id); +CREATE INDEX IF NOT EXISTS idx_outbound_status ON outbound_queue(status); +""" + + +class Database: + def __init__(self, db_path: Path | None = None): + self.db_path = db_path or DB_PATH + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.conn = libsql.connect(str(self.db_path)) + self._init_schema() + + def _init_schema(self): + for statement in SCHEMA.split(";"): + statement = statement.strip() + if statement: + self.conn.execute(statement) + self.conn.commit() + + def insert_message( + self, + telegram_message_id: int, + chat_id: int, + sender_type: str, + sender_id: int | None, + sender_name: str | None, + content: str | None, + reply_to_message_id: int | None, + has_attachment: bool, + created_at: str, + ) -> int | None: + """Insert a message. Returns row id, or None if duplicate.""" + try: + cursor = self.conn.execute( + """INSERT INTO messages + (telegram_message_id, chat_id, sender_type, sender_id, + sender_name, content, reply_to_message_id, has_attachment, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + telegram_message_id, chat_id, sender_type, sender_id, + sender_name, content, reply_to_message_id, + 1 if has_attachment else 0, created_at, + ), + ) + self.conn.commit() + return cursor.lastrowid + except Exception as e: + if "UNIQUE" in str(e): + return None + raise + + def insert_attachment( + self, + message_id: int, + file_type: str, + file_id: str, + file_unique_id: str, + file_name: str | None, + mime_type: str | None, + file_size: int | None, + local_path: str | None, + caption: str | None, + ) -> int: + cursor = self.conn.execute( + """INSERT INTO attachments + (message_id, file_type, file_id, file_unique_id, file_name, + mime_type, file_size, local_path, caption, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + message_id, file_type, file_id, file_unique_id, file_name, + mime_type, file_size, local_path, caption, + datetime.now(timezone.utc).isoformat(), + ), + ) + self.conn.commit() + return cursor.lastrowid + + def queue_outbound(self, chat_id: int, content: str, attribution: str = "claude.ai") -> int: + cursor = self.conn.execute( + """INSERT INTO outbound_queue (chat_id, content, attribution, status, created_at) + VALUES (?, ?, ?, 'pending', ?)""", + (chat_id, content, attribution, datetime.now(timezone.utc).isoformat()), + ) + self.conn.commit() + return cursor.lastrowid + + def get_pending_outbound(self) -> list[dict]: + rows = self.conn.execute( + "SELECT id, chat_id, content, attribution FROM outbound_queue WHERE status = 'pending' ORDER BY id" + ).fetchall() + return [ + {"id": r[0], "chat_id": r[1], "content": r[2], "attribution": r[3]} + for r in rows + ] + + def mark_outbound_sent(self, outbound_id: int): + self.conn.execute( + "UPDATE outbound_queue SET status = 'sent', sent_at = ? WHERE id = ?", + (datetime.now(timezone.utc).isoformat(), outbound_id), + ) + self.conn.commit() + + def mark_outbound_failed(self, outbound_id: int): + self.conn.execute( + "UPDATE outbound_queue SET status = 'failed' WHERE id = ?", + (outbound_id,), + ) + self.conn.commit() + + def get_messages_since_id(self, since_id: int = 0, limit: int = 100) -> list[dict]: + rows = self.conn.execute( + """SELECT m.id, m.telegram_message_id, m.sender_type, m.sender_name, + m.content, m.has_attachment, m.created_at, m.reply_to_message_id + FROM messages m + WHERE m.id > ? + ORDER BY m.id ASC + LIMIT ?""", + (since_id, limit), + ).fetchall() + return [ + { + "id": r[0], + "telegram_message_id": r[1], + "sender_type": r[2], + "sender": r[3] or r[2], + "content": r[4], + "has_attachment": bool(r[5]), + "created_at": r[6], + "reply_to_message_id": r[7], + } + for r in rows + ] + + def get_messages_since_timestamp(self, since: str, limit: int = 100) -> list[dict]: + rows = self.conn.execute( + """SELECT m.id, m.telegram_message_id, m.sender_type, m.sender_name, + m.content, m.has_attachment, m.created_at, m.reply_to_message_id + FROM messages m + WHERE m.created_at > ? + ORDER BY m.id ASC + LIMIT ?""", + (since, limit), + ).fetchall() + return [ + { + "id": r[0], + "telegram_message_id": r[1], + "sender_type": r[2], + "sender": r[3] or r[2], + "content": r[4], + "has_attachment": bool(r[5]), + "created_at": r[6], + "reply_to_message_id": r[7], + } + for r in rows + ] + + def get_attachments_for_message(self, message_id: int) -> list[dict]: + rows = self.conn.execute( + """SELECT file_type, file_id, file_unique_id, file_name, + mime_type, file_size, local_path, caption + FROM attachments WHERE message_id = ?""", + (message_id,), + ).fetchall() + return [ + { + "file_type": r[0], + "file_id": r[1], + "file_unique_id": r[2], + "file_name": r[3], + "mime_type": r[4], + "file_size": r[5], + "local_path": r[6], + "caption": r[7], + } + for r in rows + ] + + def get_status(self) -> dict: + total = self.conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] + last_row = self.conn.execute( + "SELECT created_at, sender_name, sender_type FROM messages ORDER BY id DESC LIMIT 1" + ).fetchone() + pending = self.conn.execute( + "SELECT COUNT(*) FROM outbound_queue WHERE status = 'pending'" + ).fetchone()[0] + return { + "total_messages": total, + "last_message_at": last_row[0] if last_row else None, + "last_sender": last_row[1] or last_row[2] if last_row else None, + "pending_outbound": pending, + } diff --git a/mcp_bridge/mcp_server.py b/mcp_bridge/mcp_server.py new file mode 100644 index 0000000..f95fa0c --- /dev/null +++ b/mcp_bridge/mcp_server.py @@ -0,0 +1,90 @@ +"""FastMCP server exposing bridge tools to claude.ai.""" + +import json +from datetime import datetime, timezone + +from fastmcp import FastMCP + +from .db import Database +from .config import get_group_chat_id + +# Will be initialized in __main__ with shared db instance +db: Database | None = None + +mcp = FastMCP( + name="homelab-bridge", + instructions=( + "This MCP server bridges claude.ai to a homelab Telegram group chat. " + "Use pull_updates to read conversation history (supports cursor-based pagination). " + "Use send_message to post messages to the group (attributed as [claude.ai]). " + "Use queue_status for a quick summary." + ), +) + + +def init(database: Database): + """Set the shared database instance.""" + global db + db = database + + +@mcp.tool() +def send_message(message: str) -> str: + """Send a message to the homelab Telegram group chat. + + The message will be posted with [claude.ai] attribution so participants + know the message came from claude.ai. + + Args: + message: The text to send to the group chat. + """ + chat_id = get_group_chat_id() + outbound_id = db.queue_outbound(chat_id, message) + return json.dumps({"sent": True, "id": outbound_id}) + + +@mcp.tool() +def pull_updates(since_id: int = 0, since: str | None = None, limit: int = 50) -> str: + """Pull conversation messages from the Telegram group. + + Returns messages from all participants (Mikkel, homelab bot, MCP bot). + Supports cursor-based pagination: use the returned 'cursor' value as + 'since_id' in the next call to get only new messages. + + Args: + since_id: Return messages with id > this value. Use cursor from previous response. + since: ISO 8601 timestamp. Alternative to since_id — returns messages after this time. + limit: Maximum number of messages to return (default 50, max 200). + """ + limit = min(limit, 200) + + if since: + messages = db.get_messages_since_timestamp(since, limit) + else: + messages = db.get_messages_since_id(since_id, limit) + + # Enrich with attachment info + for msg in messages: + if msg["has_attachment"]: + msg["attachments"] = db.get_attachments_for_message(msg["id"]) + else: + msg["attachments"] = [] + del msg["has_attachment"] + + cursor = messages[-1]["id"] if messages else since_id + + return json.dumps({ + "messages": messages, + "cursor": cursor, + "count": len(messages), + }) + + +@mcp.tool() +def queue_status() -> str: + """Get current status of the bridge. + + Returns message counts, last activity, and pending outbound messages. + """ + status = db.get_status() + return json.dumps(status) diff --git a/mcp_bridge/models.py b/mcp_bridge/models.py new file mode 100644 index 0000000..60ac385 --- /dev/null +++ b/mcp_bridge/models.py @@ -0,0 +1,43 @@ +"""Shared data models.""" + +from dataclasses import dataclass, field + + +@dataclass +class MessageRecord: + id: int + telegram_message_id: int + chat_id: int + sender_type: str + sender_id: int | None + sender_name: str | None + content: str | None + reply_to_message_id: int | None + has_attachment: bool + created_at: str + + +@dataclass +class AttachmentRecord: + id: int + message_id: int + file_type: str + file_id: str + file_unique_id: str + file_name: str | None + mime_type: str | None + file_size: int | None + local_path: str | None + caption: str | None + created_at: str + + +@dataclass +class OutboundMessage: + id: int + chat_id: int + content: str + attribution: str + status: str + created_at: str + sent_at: str | None diff --git a/mcp_bridge/telegram_bot.py b/mcp_bridge/telegram_bot.py new file mode 100644 index 0000000..22f0d4f --- /dev/null +++ b/mcp_bridge/telegram_bot.py @@ -0,0 +1,297 @@ +"""Telegram bot: logs group messages to libsql, sends outbound messages.""" + +import asyncio +import logging +from datetime import datetime, timezone +from pathlib import Path + +from telegram import Bot, Update +from telegram.ext import ( + Application, + MessageHandler, + filters, + ContextTypes, +) + +from .config import get_bot_token, get_group_chat_id, get_homelab_bot_id, MEDIA_DIR +from .db import Database + +logger = logging.getLogger(__name__) + + +class BridgeBot: + def __init__(self, db: Database): + self.db = db + self.token = get_bot_token() + self.group_chat_id = get_group_chat_id() + self.homelab_bot_id = get_homelab_bot_id() + self.bot: Bot | None = None + self.app: Application | None = None + self._outbound_task: asyncio.Task | None = None + self._my_bot_id: int | None = None + + def _classify_sender(self, user_id: int | None) -> str: + """Classify who sent a message.""" + if user_id is None: + return "unknown" + if user_id == self._my_bot_id: + return "mcp_bot" + if self.homelab_bot_id and user_id == self.homelab_bot_id: + return "homelab_bot" + # Check if user is a bot (we'll handle this in the handler with full user info) + return "user" + + async def _log_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): + """Log any group message to the database.""" + message = update.effective_message + if not message or message.chat_id != self.group_chat_id: + return + + user = message.from_user + user_id = user.id if user else None + sender_name = None + if user: + if user.is_bot: + sender_name = user.first_name or user.username + else: + parts = [user.first_name or "", user.last_name or ""] + sender_name = " ".join(p for p in parts if p) or user.username + + sender_type = self._classify_sender(user_id) + # Refine: if it's a bot we don't know, label it + if sender_type == "user" and user and user.is_bot: + sender_type = "bot" + + reply_to_id = None + if message.reply_to_message: + reply_to_id = message.reply_to_message.message_id + + # Determine if there are attachments + has_attachment = bool( + message.photo or message.document or message.video + or message.voice or message.audio or message.sticker + or message.video_note or message.animation + ) + + # Get text content + content = message.text or message.caption or None + + # Parse message date + msg_date = message.date + if msg_date: + created_at = msg_date.astimezone(timezone.utc).isoformat() + else: + created_at = datetime.now(timezone.utc).isoformat() + + # Insert message + msg_id = self.db.insert_message( + telegram_message_id=message.message_id, + chat_id=message.chat_id, + sender_type=sender_type, + sender_id=user_id, + sender_name=sender_name, + content=content, + reply_to_message_id=reply_to_id, + has_attachment=has_attachment, + created_at=created_at, + ) + + if msg_id is None: + logger.debug(f"Duplicate message {message.message_id}, skipping") + return + + logger.info( + f"Logged message {message.message_id} from {sender_name} ({sender_type})" + ) + + # Process attachments + if has_attachment: + await self._process_attachments(message, msg_id) + + async def _process_attachments(self, message, db_message_id: int): + """Download and log attachments.""" + attachments = [] + + if message.photo: + # Get highest resolution photo + photo = message.photo[-1] + attachments.append(("photo", photo.file_id, photo.file_unique_id, + None, "image/jpeg", photo.file_size, message.caption)) + + if message.document: + doc = message.document + attachments.append(("document", doc.file_id, doc.file_unique_id, + doc.file_name, doc.mime_type, doc.file_size, message.caption)) + + if message.video: + vid = message.video + attachments.append(("video", vid.file_id, vid.file_unique_id, + vid.file_name, vid.mime_type, vid.file_size, message.caption)) + + if message.voice: + voice = message.voice + attachments.append(("voice", voice.file_id, voice.file_unique_id, + None, voice.mime_type, voice.file_size, None)) + + if message.audio: + audio = message.audio + attachments.append(("audio", audio.file_id, audio.file_unique_id, + audio.file_name, audio.mime_type, audio.file_size, None)) + + if message.sticker: + sticker = message.sticker + attachments.append(("sticker", sticker.file_id, sticker.file_unique_id, + None, None, sticker.file_size, None)) + + if message.animation: + anim = message.animation + attachments.append(("animation", anim.file_id, anim.file_unique_id, + anim.file_name, anim.mime_type, anim.file_size, message.caption)) + + for file_type, file_id, file_unique_id, file_name, mime_type, file_size, caption in attachments: + # Download file + local_path = await self._download_file(file_id, file_unique_id, file_name, file_type) + + self.db.insert_attachment( + message_id=db_message_id, + file_type=file_type, + file_id=file_id, + file_unique_id=file_unique_id, + file_name=file_name, + mime_type=mime_type, + file_size=file_size, + local_path=str(local_path) if local_path else None, + caption=caption, + ) + logger.info(f"Saved attachment: {file_type} -> {local_path}") + + async def _download_file( + self, file_id: str, file_unique_id: str, file_name: str | None, file_type: str + ) -> Path | None: + """Download a file from Telegram to local media directory.""" + try: + tg_file = await self.bot.get_file(file_id) + + # Organize by date + date_dir = MEDIA_DIR / datetime.now(timezone.utc).strftime("%Y-%m-%d") + date_dir.mkdir(parents=True, exist_ok=True) + + # Build filename + if file_name: + local_name = f"{file_unique_id}_{file_name}" + else: + ext_map = { + "photo": ".jpg", "voice": ".ogg", "sticker": ".webp", + "video": ".mp4", "animation": ".mp4", "audio": ".mp3", + } + ext = ext_map.get(file_type, "") + local_name = f"{file_unique_id}{ext}" + + local_path = date_dir / local_name + await tg_file.download_to_drive(str(local_path)) + return local_path + + except Exception as e: + logger.error(f"Failed to download file {file_id}: {e}") + return None + + async def send_to_group(self, text: str, attribution: str = "claude.ai"): + """Send an attributed message to the group chat.""" + formatted = f"*\\[{attribution}\\]* {self._escape_markdown(text)}" + try: + await self.bot.send_message( + chat_id=self.group_chat_id, + text=formatted, + parse_mode="MarkdownV2", + ) + except Exception: + # Fallback to plain text if markdown fails + plain = f"[{attribution}] {text}" + await self.bot.send_message( + chat_id=self.group_chat_id, + text=plain, + ) + + @staticmethod + def _escape_markdown(text: str) -> str: + """Escape MarkdownV2 special characters, preserving code blocks.""" + special = r"_*[]()~`>#+-=|{}.!" + result = [] + i = 0 + in_code_block = False + in_inline_code = False + + while i < len(text): + # Check for code block + if text[i:i+3] == "```": + in_code_block = not in_code_block + result.append("```") + i += 3 + continue + # Check for inline code + if text[i] == "`" and not in_code_block: + in_inline_code = not in_inline_code + result.append("`") + i += 1 + continue + + if not in_code_block and not in_inline_code and text[i] in special: + result.append(f"\\{text[i]}") + else: + result.append(text[i]) + i += 1 + + return "".join(result) + + async def _outbound_loop(self): + """Poll outbound queue and send messages.""" + while True: + try: + pending = self.db.get_pending_outbound() + for msg in pending: + try: + await self.send_to_group(msg["content"], msg["attribution"]) + self.db.mark_outbound_sent(msg["id"]) + logger.info(f"Sent outbound message {msg['id']}") + except Exception as e: + logger.error(f"Failed to send outbound {msg['id']}: {e}") + self.db.mark_outbound_failed(msg["id"]) + except Exception as e: + logger.error(f"Outbound loop error: {e}") + + await asyncio.sleep(2) + + async def _post_init(self, application: Application): + """Called after bot initialization.""" + self.bot = application.bot + me = await self.bot.get_me() + self._my_bot_id = me.id + logger.info(f"MCP Bridge bot started as @{me.username} (ID: {me.id})") + logger.info(f"Monitoring group chat: {self.group_chat_id}") + + # Start outbound message loop + self._outbound_task = asyncio.create_task(self._outbound_loop()) + + async def _post_shutdown(self, application: Application): + """Cleanup on shutdown.""" + if self._outbound_task: + self._outbound_task.cancel() + try: + await self._outbound_task + except asyncio.CancelledError: + pass + + def build_application(self) -> Application: + """Build the telegram Application (but don't start it yet).""" + builder = Application.builder().token(self.token) + self.app = builder.build() + + # Log ALL messages in the group (text, photos, documents, etc.) + self.app.add_handler( + MessageHandler(filters.ALL, self._log_message) + ) + + self.app.post_init = self._post_init + self.app.post_shutdown = self._post_shutdown + + return self.app diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b1c82bf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +fastmcp>=2.0.0 +python-telegram-bot>=21.0 +libsql-experimental>=0.0.50 +aiohttp>=3.9.0