feat: MCP bridge - Telegram group logger + FastMCP HTTP server

Single-process Python app that:
- Runs a Telegram bot in a group chat, logging all messages/files to libsql
- Exposes send_message, pull_updates, queue_status MCP tools over HTTP
- Downloads and stores file attachments with Telegram file_id + local path
- Accessible via NetBird mesh at mgmt.mg:8321 (no auth needed)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mikkel Georgsen 2026-03-29 23:56:05 +00:00
parent c339980411
commit 1cb16e6e8f
13 changed files with 1249 additions and 1 deletions

8
.gitignore vendored Normal file
View file

@ -0,0 +1,8 @@
.venv/
__pycache__/
*.pyc
data/
media/
credentials
.build-complete
heartbeat.sh

165
README.md
View file

@ -1,2 +1,165 @@
# telegram-bot-mcp
# Nexus MCP Bridge
MCP server that bridges claude.ai to a homelab Telegram group chat. Logs all messages and files to libsql, exposes three MCP tools over HTTP on the NetBird mesh.
## Architecture
```
claude.ai ──HTTP──► MCP Bridge (mgmt.mg:8321) ──Telegram API──► Group Chat
│ │
libsql (local) Mikkel + Homelab Bot + MCP Bot
media/ (files)
```
Single Python process running:
- **Telegram bot** — polls group chat, logs everything to libsql, sends outbound messages
- **FastMCP HTTP server** — exposes `send_message`, `pull_updates`, `queue_status` tools
## Setup Guide
### Step 1: Create a new Telegram bot
1. Open Telegram, message [@BotFather](https://t.me/BotFather)
2. Send `/newbot`
3. Name: `Nexus MCP Bridge` (or whatever you like)
4. Username: `nexus_mcp_bot` (must be unique, pick something available)
5. Copy the **bot token** BotFather gives you
### Step 2: Create the Telegram group
1. In Telegram, create a **new group**
2. Name: `Homelab Bridge` (or your preference)
3. Add members:
- Your existing homelab bot (`@georgsen_homelab_bot`)
- The new MCP bot (search by the username you chose)
4. **Important:** Make both bots **group admins** so they can read all messages
- Tap group name → Edit → Administrators → Add both bots
- Bots need at minimum: "Read messages" permission (enabled by default for admins)
### Step 3: Get the group chat ID
Option A — Send a message in the group, then check:
```bash
curl -s "https://api.telegram.org/bot<MCP_BOT_TOKEN>/getUpdates" | python3 -m json.tool
```
Look for `"chat": {"id": -100XXXXXXXXXX}` — the negative number is the group chat ID.
Option B — The bridge logs the chat ID on startup. You can start it once, send a message in the group, and check the logs.
### Step 4: Configure credentials
```bash
cd ~/repos/telegram-bot-mcp
cp credentials.example credentials
```
Edit `credentials`:
```
MCP_BOT_TOKEN=<token from BotFather>
GROUP_CHAT_ID=<negative number from step 3>
HOMELAB_BOT_ID=8521598773
```
### Step 5: Install and test
```bash
cd ~/repos/telegram-bot-mcp
# Create venv (if not already done)
python3 -m venv .venv
# Install dependencies
.venv/bin/pip install -r requirements.txt
# Test run (Ctrl+C to stop)
.venv/bin/python -m mcp_bridge
```
You should see:
```
Database initialized at /home/mikkel/repos/telegram-bot-mcp/data/bridge.db
MCP server starting on 0.0.0.0:8321
Telegram bot polling started
MCP Bridge bot started as @nexus_mcp_bot (ID: XXXXXXX)
Monitoring group chat: -100XXXXXXXXXX
```
Send a message in the group — you should see it logged.
### Step 6: Install systemd service
```bash
cp mcp-bridge.service ~/.config/systemd/user/
systemctl --user daemon-reload
systemctl --user enable --now mcp-bridge
systemctl --user status mcp-bridge
```
View logs:
```bash
journalctl --user -u mcp-bridge -f
```
### Step 7: Configure claude.ai MCP connection
In claude.ai settings, add a new MCP server:
- **URL:** `http://mgmt.mg:8321/mcp`
- **Transport:** Streamable HTTP
This works because mgmt.mg resolves via NetBird mesh — no public exposure needed.
## MCP Tools
### send_message
Send a message to the group, attributed as `[claude.ai]`.
```json
{"message": "Fix the nexus.mg DNS record to 100.79.65.206"}
```
### pull_updates
Get conversation messages with cursor-based pagination.
```json
{"since_id": 0, "limit": 50}
```
Returns messages from all participants with attachment metadata. Use the `cursor` value from the response as `since_id` in the next call.
### queue_status
Quick summary: total messages, last activity, pending outbound count.
## File Structure
```
├── mcp_bridge/
│ ├── __main__.py # Entry point (bot + MCP server)
│ ├── config.py # Configuration loader
│ ├── db.py # libsql database layer
│ ├── telegram_bot.py # Telegram bot (logging + sending)
│ ├── mcp_server.py # FastMCP tool definitions
│ └── models.py # Data models
├── data/ # libsql database (auto-created)
├── media/ # Downloaded attachments (auto-created)
├── credentials # Bot token + chat ID (not in git)
├── credentials.example # Template
├── requirements.txt
└── mcp-bridge.service # systemd unit file
```
## Troubleshooting
**Bot doesn't see group messages:**
- Ensure bot is a group admin
- Check BotFather: `/mybots` → Bot Settings → Group Privacy → Turn OFF
(bots have "privacy mode" ON by default — they only see commands unless it's disabled)
**Can't get group chat ID:**
- Make sure you sent a message AFTER adding the bot
- For supergroups, the ID starts with `-100`
**MCP server unreachable from claude.ai:**
- Verify NetBird is connected: `netbird status`
- Test locally: `curl http://mgmt.mg:8321/mcp`
- Check firewall: port 8321 must be open

13
credentials.example Normal file
View file

@ -0,0 +1,13 @@
# MCP Bridge Bot credentials
# Copy to 'credentials' and fill in values
# Bot token from BotFather (create a NEW bot for the MCP bridge)
MCP_BOT_TOKEN=
# Telegram group chat ID (negative number for groups)
# Send a message in the group, then check: https://api.telegram.org/bot<TOKEN>/getUpdates
GROUP_CHAT_ID=
# (Optional) Bot ID of the existing homelab bot, for sender classification
# Find it: https://api.telegram.org/bot<HOMELAB_TOKEN>/getMe
HOMELAB_BOT_ID=8521598773

View file

@ -0,0 +1,237 @@
# Nexus MCP Bridge — Design Spec
**Date:** 2026-03-30
**Status:** Approved (user greenlit Approach 1, all design questions resolved)
---
## Summary
A single-process Python application that:
1. Runs a **Telegram bot** in a group chat, logging all messages/files to **libsql**
2. Exposes an **MCP server** (FastMCP over HTTP) on the NetBird mesh for claude.ai to query
This lets claude.ai dispatch tasks to the homelab agent and pull conversation updates — without Mikkel being the copy-paste middleman.
---
## Design Decisions
| Question | Decision | Rationale |
|----------|----------|-----------|
| Telegram conflict | New bot + group chat | Each bot has own token, no polling conflict |
| What to capture | Everything (all participants + files) | Full replay capability |
| File storage | Download to disk + store file_id | Durability + convenience |
| Auth | None (NetBird mesh is trust boundary) | Only accessible from enrolled peers |
| Database | libsql embedded (local file) | Single process, no extra service |
| Scope | Single group chat (homelab) for v1 | Nail core loop, extend later |
| Bot personality | Attributed relay `[claude.ai] ...` | Clear attribution in group |
| Architecture | Single process (FastMCP + telegram bot) | Simple, systemd restart covers failures |
---
## Architecture
```
┌──────────────┐ HTTP (NetBird) ┌─────────────────────────────┐
│ claude.ai │ ◄──────────────────── │ MCP Bridge Process │
│ MCP client │ ────────────────────► │ │
└──────────────┘ │ ┌─────────┐ ┌──────────┐ │
│ │ FastMCP │ │ Telegram │ │
│ │ HTTP │ │ Bot │ │
│ │ Server │ │ (polling)│ │
│ └────┬─────┘ └────┬─────┘ │
│ │ │ │
│ └──────┬───────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ libsql │ │
│ │ (embed) │ │
│ └───────────┘ │
│ │
│ media/ (downloaded files) │
└─────────────────────────────┘
Telegram Group Chat
┌─────▼─────┐
│ Mikkel │
│ Homelab ♦ │ (existing bot)
│ MCP ♦ │ (new bot)
└───────────┘
```
---
## Database Schema (libsql)
```sql
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
telegram_message_id INTEGER NOT NULL,
chat_id INTEGER NOT NULL,
sender_type TEXT NOT NULL, -- 'user', 'homelab_bot', 'mcp_bot', 'unknown'
sender_id INTEGER,
sender_name TEXT,
content TEXT, -- message text (nullable for media-only)
reply_to_message_id INTEGER, -- telegram reply reference
has_attachment INTEGER DEFAULT 0,
created_at TEXT NOT NULL, -- ISO 8601
UNIQUE(chat_id, telegram_message_id)
);
CREATE TABLE attachments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER NOT NULL REFERENCES messages(id),
file_type TEXT NOT NULL, -- 'photo', 'document', 'video', 'voice', 'sticker'
file_id TEXT NOT NULL, -- telegram file_id
file_unique_id TEXT NOT NULL, -- telegram file_unique_id
file_name TEXT, -- original filename
mime_type TEXT,
file_size INTEGER,
local_path TEXT, -- path under media/
caption TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE outbound_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
content TEXT NOT NULL,
attribution TEXT DEFAULT 'claude.ai', -- prefix for the message
status TEXT DEFAULT 'pending', -- 'pending', 'sent', 'failed'
created_at TEXT NOT NULL,
sent_at TEXT
);
-- Index for delta queries
CREATE INDEX idx_messages_created_at ON messages(created_at);
CREATE INDEX idx_messages_chat_id ON messages(chat_id);
CREATE INDEX idx_outbound_status ON outbound_queue(status);
```
---
## MCP Tools
### send_message
```
Send a message to the homelab group chat, attributed as [claude.ai].
Input: { "message": "Fix the nexus.mg DNS record to 100.79.65.206" }
Output: { "sent": true, "id": 42 }
```
### pull_updates
```
Get conversation messages since a cursor (message ID or timestamp).
Returns messages from all participants with attachment metadata.
Input: { "since_id": 150 } or { "since": "2026-03-30T01:00:00Z" } or {}
Output: {
"messages": [
{
"id": 151,
"sender": "mikkel",
"sender_type": "user",
"content": "Can you check the DNS?",
"attachments": [],
"created_at": "2026-03-30T01:02:15Z"
},
{
"id": 152,
"sender": "homelab_bot",
"sender_type": "homelab_bot",
"content": "Checking Technitium... record found.",
"attachments": [{"file_type": "photo", "file_name": "dns-screenshot.png"}],
"created_at": "2026-03-30T01:02:45Z"
}
],
"cursor": 152
}
```
### queue_status
```
Current state summary.
Input: {}
Output: {
"total_messages": 152,
"last_message_at": "2026-03-30T01:02:45Z",
"last_sender": "homelab_bot",
"pending_outbound": 0
}
```
---
## Telegram Bot Behavior
- **Joins group chat** with Mikkel + existing homelab bot
- **Logs everything**: text, photos, documents, voice, stickers, replies
- **Downloads attachments** to `media/<YYYY-MM-DD>/<file_unique_id>_<filename>`
- **Sends outbound** messages prefixed with `[claude.ai]` in bold
- **No commands** — this bot is a silent logger + relay, not interactive
- **Ignores private messages** — only operates in the configured group
---
## File Structure
```
~/repos/telegram-bot-mcp/
├── docs/ # Specs and design docs
├── mcp_bridge/
│ ├── __init__.py
│ ├── __main__.py # Entry point: runs both bot + MCP server
│ ├── config.py # Configuration (env vars, paths)
│ ├── db.py # libsql database layer
│ ├── telegram_bot.py # Telegram bot (polling, logging, sending)
│ ├── mcp_server.py # FastMCP tool definitions
│ └── models.py # Shared data models
├── media/ # Downloaded attachments
├── data/ # libsql database file
├── credentials # BOT_TOKEN, GROUP_CHAT_ID (created during setup)
├── requirements.txt
├── heartbeat.sh
└── README.md
```
---
## Configuration
Environment/file based:
- `credentials` file: `MCP_BOT_TOKEN=...`, `GROUP_CHAT_ID=...`, `HOMELAB_BOT_ID=...`
- MCP server binds to `0.0.0.0:8321` (accessible via NetBird at `mgmt.mg:8321`)
- Database at `data/bridge.db`
- Media at `media/`
---
## Deployment
- **systemd user service** (`mcp-bridge.service`)
- Uses project-local venv at `.venv/`
- `Restart=always`, `RestartSec=5`
- Bind to `0.0.0.0:8321` (NetBird interface)
---
## Setup Steps (for user)
1. Create new Telegram bot via BotFather → get token
2. Create Telegram group → add Mikkel + homelab bot + MCP bot
3. Get group chat ID (bot will log it on first message)
4. Fill in `credentials` file
5. `systemctl --user enable --now mcp-bridge`
6. Add MCP URL `http://mgmt.mg:8321/mcp` in claude.ai settings
---
## Future Extensions
- Multi-group support (per-project chats with `target` parameter)
- Session tracking (detect agent restarts)
- File content search across attachments
- Message threading/reply chain reconstruction

19
mcp-bridge.service Normal file
View file

@ -0,0 +1,19 @@
[Unit]
Description=Nexus MCP Bridge (Telegram + MCP Server)
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
WorkingDirectory=/home/mikkel/repos/telegram-bot-mcp
ExecStart=/home/mikkel/repos/telegram-bot-mcp/.venv/bin/python -m mcp_bridge
Restart=always
RestartSec=5
KillMode=mixed
KillSignal=SIGTERM
TimeoutStopSec=15
Environment=PATH=/home/mikkel/.local/bin:/home/mikkel/bin:/usr/local/bin:/usr/bin:/bin
[Install]
WantedBy=default.target

0
mcp_bridge/__init__.py Normal file
View file

84
mcp_bridge/__main__.py Normal file
View file

@ -0,0 +1,84 @@
"""Entry point: runs both Telegram bot and MCP server in one process."""
import asyncio
import logging
import signal
from .config import MCP_HOST, MCP_PORT, MEDIA_DIR, DB_PATH
from .db import Database
from .telegram_bot import BridgeBot
from .mcp_server import mcp, init as init_mcp
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("mcp_bridge")
async def run_telegram_bot(bot: BridgeBot):
"""Run the telegram bot polling loop."""
app = bot.build_application()
await app.initialize()
await app.start()
updater = app.updater
await updater.start_polling(drop_pending_updates=True)
logger.info("Telegram bot polling started")
try:
while True:
await asyncio.sleep(3600)
except asyncio.CancelledError:
logger.info("Telegram bot shutting down...")
await updater.stop()
await app.stop()
await app.shutdown()
async def run_mcp_server():
"""Run the FastMCP HTTP server using its built-in runner."""
logger.info(f"MCP server starting on {MCP_HOST}:{MCP_PORT}")
await mcp.run_http_async(
host=MCP_HOST,
port=MCP_PORT,
log_level="info",
show_banner=False,
)
async def main():
"""Start both services."""
MEDIA_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
db = Database()
logger.info(f"Database initialized at {DB_PATH}")
init_mcp(db)
bot = BridgeBot(db)
telegram_task = asyncio.create_task(run_telegram_bot(bot))
mcp_task = asyncio.create_task(run_mcp_server())
loop = asyncio.get_event_loop()
def handle_signal():
logger.info("Received shutdown signal")
telegram_task.cancel()
mcp_task.cancel()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, handle_signal)
try:
await asyncio.gather(telegram_task, mcp_task)
except asyncio.CancelledError:
pass
logger.info("MCP Bridge stopped")
if __name__ == "__main__":
asyncio.run(main())

49
mcp_bridge/config.py Normal file
View file

@ -0,0 +1,49 @@
"""Configuration loader for MCP Bridge."""
import os
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
CREDENTIALS_FILE = BASE_DIR / "credentials"
DB_PATH = BASE_DIR / "data" / "bridge.db"
MEDIA_DIR = BASE_DIR / "media"
# MCP server settings
MCP_HOST = "0.0.0.0"
MCP_PORT = 8321
def load_credentials() -> dict[str, str]:
"""Load credentials from KEY=VALUE file."""
config = {}
if CREDENTIALS_FILE.exists():
with open(CREDENTIALS_FILE) as f:
for line in f:
line = line.strip()
if "=" in line and not line.startswith("#"):
key, value = line.split("=", 1)
config[key.strip()] = value.strip()
return config
def get_bot_token() -> str:
creds = load_credentials()
token = creds.get("MCP_BOT_TOKEN", "")
if not token:
raise RuntimeError("MCP_BOT_TOKEN not set in credentials file")
return token
def get_group_chat_id() -> int:
creds = load_credentials()
chat_id = creds.get("GROUP_CHAT_ID", "")
if not chat_id:
raise RuntimeError("GROUP_CHAT_ID not set in credentials file")
return int(chat_id)
def get_homelab_bot_id() -> int | None:
"""Optional: ID of the existing homelab bot for sender_type classification."""
creds = load_credentials()
bot_id = creds.get("HOMELAB_BOT_ID", "")
return int(bot_id) if bot_id else None

241
mcp_bridge/db.py Normal file
View file

@ -0,0 +1,241 @@
"""Database layer using libsql (embedded)."""
import libsql_experimental as libsql
from datetime import datetime, timezone
from pathlib import Path
from .config import DB_PATH
SCHEMA = """
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
telegram_message_id INTEGER NOT NULL,
chat_id INTEGER NOT NULL,
sender_type TEXT NOT NULL,
sender_id INTEGER,
sender_name TEXT,
content TEXT,
reply_to_message_id INTEGER,
has_attachment INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
UNIQUE(chat_id, telegram_message_id)
);
CREATE TABLE IF NOT EXISTS attachments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER NOT NULL REFERENCES messages(id),
file_type TEXT NOT NULL,
file_id TEXT NOT NULL,
file_unique_id TEXT NOT NULL,
file_name TEXT,
mime_type TEXT,
file_size INTEGER,
local_path TEXT,
caption TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS outbound_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
content TEXT NOT NULL,
attribution TEXT DEFAULT 'claude.ai',
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
sent_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(created_at);
CREATE INDEX IF NOT EXISTS idx_messages_chat_id ON messages(chat_id);
CREATE INDEX IF NOT EXISTS idx_outbound_status ON outbound_queue(status);
"""
class Database:
def __init__(self, db_path: Path | None = None):
self.db_path = db_path or DB_PATH
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = libsql.connect(str(self.db_path))
self._init_schema()
def _init_schema(self):
for statement in SCHEMA.split(";"):
statement = statement.strip()
if statement:
self.conn.execute(statement)
self.conn.commit()
def insert_message(
self,
telegram_message_id: int,
chat_id: int,
sender_type: str,
sender_id: int | None,
sender_name: str | None,
content: str | None,
reply_to_message_id: int | None,
has_attachment: bool,
created_at: str,
) -> int | None:
"""Insert a message. Returns row id, or None if duplicate."""
try:
cursor = self.conn.execute(
"""INSERT INTO messages
(telegram_message_id, chat_id, sender_type, sender_id,
sender_name, content, reply_to_message_id, has_attachment, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
telegram_message_id, chat_id, sender_type, sender_id,
sender_name, content, reply_to_message_id,
1 if has_attachment else 0, created_at,
),
)
self.conn.commit()
return cursor.lastrowid
except Exception as e:
if "UNIQUE" in str(e):
return None
raise
def insert_attachment(
self,
message_id: int,
file_type: str,
file_id: str,
file_unique_id: str,
file_name: str | None,
mime_type: str | None,
file_size: int | None,
local_path: str | None,
caption: str | None,
) -> int:
cursor = self.conn.execute(
"""INSERT INTO attachments
(message_id, file_type, file_id, file_unique_id, file_name,
mime_type, file_size, local_path, caption, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
message_id, file_type, file_id, file_unique_id, file_name,
mime_type, file_size, local_path, caption,
datetime.now(timezone.utc).isoformat(),
),
)
self.conn.commit()
return cursor.lastrowid
def queue_outbound(self, chat_id: int, content: str, attribution: str = "claude.ai") -> int:
cursor = self.conn.execute(
"""INSERT INTO outbound_queue (chat_id, content, attribution, status, created_at)
VALUES (?, ?, ?, 'pending', ?)""",
(chat_id, content, attribution, datetime.now(timezone.utc).isoformat()),
)
self.conn.commit()
return cursor.lastrowid
def get_pending_outbound(self) -> list[dict]:
rows = self.conn.execute(
"SELECT id, chat_id, content, attribution FROM outbound_queue WHERE status = 'pending' ORDER BY id"
).fetchall()
return [
{"id": r[0], "chat_id": r[1], "content": r[2], "attribution": r[3]}
for r in rows
]
def mark_outbound_sent(self, outbound_id: int):
self.conn.execute(
"UPDATE outbound_queue SET status = 'sent', sent_at = ? WHERE id = ?",
(datetime.now(timezone.utc).isoformat(), outbound_id),
)
self.conn.commit()
def mark_outbound_failed(self, outbound_id: int):
self.conn.execute(
"UPDATE outbound_queue SET status = 'failed' WHERE id = ?",
(outbound_id,),
)
self.conn.commit()
def get_messages_since_id(self, since_id: int = 0, limit: int = 100) -> list[dict]:
rows = self.conn.execute(
"""SELECT m.id, m.telegram_message_id, m.sender_type, m.sender_name,
m.content, m.has_attachment, m.created_at, m.reply_to_message_id
FROM messages m
WHERE m.id > ?
ORDER BY m.id ASC
LIMIT ?""",
(since_id, limit),
).fetchall()
return [
{
"id": r[0],
"telegram_message_id": r[1],
"sender_type": r[2],
"sender": r[3] or r[2],
"content": r[4],
"has_attachment": bool(r[5]),
"created_at": r[6],
"reply_to_message_id": r[7],
}
for r in rows
]
def get_messages_since_timestamp(self, since: str, limit: int = 100) -> list[dict]:
rows = self.conn.execute(
"""SELECT m.id, m.telegram_message_id, m.sender_type, m.sender_name,
m.content, m.has_attachment, m.created_at, m.reply_to_message_id
FROM messages m
WHERE m.created_at > ?
ORDER BY m.id ASC
LIMIT ?""",
(since, limit),
).fetchall()
return [
{
"id": r[0],
"telegram_message_id": r[1],
"sender_type": r[2],
"sender": r[3] or r[2],
"content": r[4],
"has_attachment": bool(r[5]),
"created_at": r[6],
"reply_to_message_id": r[7],
}
for r in rows
]
def get_attachments_for_message(self, message_id: int) -> list[dict]:
rows = self.conn.execute(
"""SELECT file_type, file_id, file_unique_id, file_name,
mime_type, file_size, local_path, caption
FROM attachments WHERE message_id = ?""",
(message_id,),
).fetchall()
return [
{
"file_type": r[0],
"file_id": r[1],
"file_unique_id": r[2],
"file_name": r[3],
"mime_type": r[4],
"file_size": r[5],
"local_path": r[6],
"caption": r[7],
}
for r in rows
]
def get_status(self) -> dict:
total = self.conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
last_row = self.conn.execute(
"SELECT created_at, sender_name, sender_type FROM messages ORDER BY id DESC LIMIT 1"
).fetchone()
pending = self.conn.execute(
"SELECT COUNT(*) FROM outbound_queue WHERE status = 'pending'"
).fetchone()[0]
return {
"total_messages": total,
"last_message_at": last_row[0] if last_row else None,
"last_sender": last_row[1] or last_row[2] if last_row else None,
"pending_outbound": pending,
}

90
mcp_bridge/mcp_server.py Normal file
View file

@ -0,0 +1,90 @@
"""FastMCP server exposing bridge tools to claude.ai."""
import json
from datetime import datetime, timezone
from fastmcp import FastMCP
from .db import Database
from .config import get_group_chat_id
# Will be initialized in __main__ with shared db instance
db: Database | None = None
mcp = FastMCP(
name="homelab-bridge",
instructions=(
"This MCP server bridges claude.ai to a homelab Telegram group chat. "
"Use pull_updates to read conversation history (supports cursor-based pagination). "
"Use send_message to post messages to the group (attributed as [claude.ai]). "
"Use queue_status for a quick summary."
),
)
def init(database: Database):
"""Set the shared database instance."""
global db
db = database
@mcp.tool()
def send_message(message: str) -> str:
"""Send a message to the homelab Telegram group chat.
The message will be posted with [claude.ai] attribution so participants
know the message came from claude.ai.
Args:
message: The text to send to the group chat.
"""
chat_id = get_group_chat_id()
outbound_id = db.queue_outbound(chat_id, message)
return json.dumps({"sent": True, "id": outbound_id})
@mcp.tool()
def pull_updates(since_id: int = 0, since: str | None = None, limit: int = 50) -> str:
"""Pull conversation messages from the Telegram group.
Returns messages from all participants (Mikkel, homelab bot, MCP bot).
Supports cursor-based pagination: use the returned 'cursor' value as
'since_id' in the next call to get only new messages.
Args:
since_id: Return messages with id > this value. Use cursor from previous response.
since: ISO 8601 timestamp. Alternative to since_id returns messages after this time.
limit: Maximum number of messages to return (default 50, max 200).
"""
limit = min(limit, 200)
if since:
messages = db.get_messages_since_timestamp(since, limit)
else:
messages = db.get_messages_since_id(since_id, limit)
# Enrich with attachment info
for msg in messages:
if msg["has_attachment"]:
msg["attachments"] = db.get_attachments_for_message(msg["id"])
else:
msg["attachments"] = []
del msg["has_attachment"]
cursor = messages[-1]["id"] if messages else since_id
return json.dumps({
"messages": messages,
"cursor": cursor,
"count": len(messages),
})
@mcp.tool()
def queue_status() -> str:
"""Get current status of the bridge.
Returns message counts, last activity, and pending outbound messages.
"""
status = db.get_status()
return json.dumps(status)

43
mcp_bridge/models.py Normal file
View file

@ -0,0 +1,43 @@
"""Shared data models."""
from dataclasses import dataclass, field
@dataclass
class MessageRecord:
id: int
telegram_message_id: int
chat_id: int
sender_type: str
sender_id: int | None
sender_name: str | None
content: str | None
reply_to_message_id: int | None
has_attachment: bool
created_at: str
@dataclass
class AttachmentRecord:
id: int
message_id: int
file_type: str
file_id: str
file_unique_id: str
file_name: str | None
mime_type: str | None
file_size: int | None
local_path: str | None
caption: str | None
created_at: str
@dataclass
class OutboundMessage:
id: int
chat_id: int
content: str
attribution: str
status: str
created_at: str
sent_at: str | None

297
mcp_bridge/telegram_bot.py Normal file
View file

@ -0,0 +1,297 @@
"""Telegram bot: logs group messages to libsql, sends outbound messages."""
import asyncio
import logging
from datetime import datetime, timezone
from pathlib import Path
from telegram import Bot, Update
from telegram.ext import (
Application,
MessageHandler,
filters,
ContextTypes,
)
from .config import get_bot_token, get_group_chat_id, get_homelab_bot_id, MEDIA_DIR
from .db import Database
logger = logging.getLogger(__name__)
class BridgeBot:
def __init__(self, db: Database):
self.db = db
self.token = get_bot_token()
self.group_chat_id = get_group_chat_id()
self.homelab_bot_id = get_homelab_bot_id()
self.bot: Bot | None = None
self.app: Application | None = None
self._outbound_task: asyncio.Task | None = None
self._my_bot_id: int | None = None
def _classify_sender(self, user_id: int | None) -> str:
"""Classify who sent a message."""
if user_id is None:
return "unknown"
if user_id == self._my_bot_id:
return "mcp_bot"
if self.homelab_bot_id and user_id == self.homelab_bot_id:
return "homelab_bot"
# Check if user is a bot (we'll handle this in the handler with full user info)
return "user"
async def _log_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Log any group message to the database."""
message = update.effective_message
if not message or message.chat_id != self.group_chat_id:
return
user = message.from_user
user_id = user.id if user else None
sender_name = None
if user:
if user.is_bot:
sender_name = user.first_name or user.username
else:
parts = [user.first_name or "", user.last_name or ""]
sender_name = " ".join(p for p in parts if p) or user.username
sender_type = self._classify_sender(user_id)
# Refine: if it's a bot we don't know, label it
if sender_type == "user" and user and user.is_bot:
sender_type = "bot"
reply_to_id = None
if message.reply_to_message:
reply_to_id = message.reply_to_message.message_id
# Determine if there are attachments
has_attachment = bool(
message.photo or message.document or message.video
or message.voice or message.audio or message.sticker
or message.video_note or message.animation
)
# Get text content
content = message.text or message.caption or None
# Parse message date
msg_date = message.date
if msg_date:
created_at = msg_date.astimezone(timezone.utc).isoformat()
else:
created_at = datetime.now(timezone.utc).isoformat()
# Insert message
msg_id = self.db.insert_message(
telegram_message_id=message.message_id,
chat_id=message.chat_id,
sender_type=sender_type,
sender_id=user_id,
sender_name=sender_name,
content=content,
reply_to_message_id=reply_to_id,
has_attachment=has_attachment,
created_at=created_at,
)
if msg_id is None:
logger.debug(f"Duplicate message {message.message_id}, skipping")
return
logger.info(
f"Logged message {message.message_id} from {sender_name} ({sender_type})"
)
# Process attachments
if has_attachment:
await self._process_attachments(message, msg_id)
async def _process_attachments(self, message, db_message_id: int):
"""Download and log attachments."""
attachments = []
if message.photo:
# Get highest resolution photo
photo = message.photo[-1]
attachments.append(("photo", photo.file_id, photo.file_unique_id,
None, "image/jpeg", photo.file_size, message.caption))
if message.document:
doc = message.document
attachments.append(("document", doc.file_id, doc.file_unique_id,
doc.file_name, doc.mime_type, doc.file_size, message.caption))
if message.video:
vid = message.video
attachments.append(("video", vid.file_id, vid.file_unique_id,
vid.file_name, vid.mime_type, vid.file_size, message.caption))
if message.voice:
voice = message.voice
attachments.append(("voice", voice.file_id, voice.file_unique_id,
None, voice.mime_type, voice.file_size, None))
if message.audio:
audio = message.audio
attachments.append(("audio", audio.file_id, audio.file_unique_id,
audio.file_name, audio.mime_type, audio.file_size, None))
if message.sticker:
sticker = message.sticker
attachments.append(("sticker", sticker.file_id, sticker.file_unique_id,
None, None, sticker.file_size, None))
if message.animation:
anim = message.animation
attachments.append(("animation", anim.file_id, anim.file_unique_id,
anim.file_name, anim.mime_type, anim.file_size, message.caption))
for file_type, file_id, file_unique_id, file_name, mime_type, file_size, caption in attachments:
# Download file
local_path = await self._download_file(file_id, file_unique_id, file_name, file_type)
self.db.insert_attachment(
message_id=db_message_id,
file_type=file_type,
file_id=file_id,
file_unique_id=file_unique_id,
file_name=file_name,
mime_type=mime_type,
file_size=file_size,
local_path=str(local_path) if local_path else None,
caption=caption,
)
logger.info(f"Saved attachment: {file_type} -> {local_path}")
async def _download_file(
self, file_id: str, file_unique_id: str, file_name: str | None, file_type: str
) -> Path | None:
"""Download a file from Telegram to local media directory."""
try:
tg_file = await self.bot.get_file(file_id)
# Organize by date
date_dir = MEDIA_DIR / datetime.now(timezone.utc).strftime("%Y-%m-%d")
date_dir.mkdir(parents=True, exist_ok=True)
# Build filename
if file_name:
local_name = f"{file_unique_id}_{file_name}"
else:
ext_map = {
"photo": ".jpg", "voice": ".ogg", "sticker": ".webp",
"video": ".mp4", "animation": ".mp4", "audio": ".mp3",
}
ext = ext_map.get(file_type, "")
local_name = f"{file_unique_id}{ext}"
local_path = date_dir / local_name
await tg_file.download_to_drive(str(local_path))
return local_path
except Exception as e:
logger.error(f"Failed to download file {file_id}: {e}")
return None
async def send_to_group(self, text: str, attribution: str = "claude.ai"):
"""Send an attributed message to the group chat."""
formatted = f"*\\[{attribution}\\]* {self._escape_markdown(text)}"
try:
await self.bot.send_message(
chat_id=self.group_chat_id,
text=formatted,
parse_mode="MarkdownV2",
)
except Exception:
# Fallback to plain text if markdown fails
plain = f"[{attribution}] {text}"
await self.bot.send_message(
chat_id=self.group_chat_id,
text=plain,
)
@staticmethod
def _escape_markdown(text: str) -> str:
"""Escape MarkdownV2 special characters, preserving code blocks."""
special = r"_*[]()~`>#+-=|{}.!"
result = []
i = 0
in_code_block = False
in_inline_code = False
while i < len(text):
# Check for code block
if text[i:i+3] == "```":
in_code_block = not in_code_block
result.append("```")
i += 3
continue
# Check for inline code
if text[i] == "`" and not in_code_block:
in_inline_code = not in_inline_code
result.append("`")
i += 1
continue
if not in_code_block and not in_inline_code and text[i] in special:
result.append(f"\\{text[i]}")
else:
result.append(text[i])
i += 1
return "".join(result)
async def _outbound_loop(self):
"""Poll outbound queue and send messages."""
while True:
try:
pending = self.db.get_pending_outbound()
for msg in pending:
try:
await self.send_to_group(msg["content"], msg["attribution"])
self.db.mark_outbound_sent(msg["id"])
logger.info(f"Sent outbound message {msg['id']}")
except Exception as e:
logger.error(f"Failed to send outbound {msg['id']}: {e}")
self.db.mark_outbound_failed(msg["id"])
except Exception as e:
logger.error(f"Outbound loop error: {e}")
await asyncio.sleep(2)
async def _post_init(self, application: Application):
"""Called after bot initialization."""
self.bot = application.bot
me = await self.bot.get_me()
self._my_bot_id = me.id
logger.info(f"MCP Bridge bot started as @{me.username} (ID: {me.id})")
logger.info(f"Monitoring group chat: {self.group_chat_id}")
# Start outbound message loop
self._outbound_task = asyncio.create_task(self._outbound_loop())
async def _post_shutdown(self, application: Application):
"""Cleanup on shutdown."""
if self._outbound_task:
self._outbound_task.cancel()
try:
await self._outbound_task
except asyncio.CancelledError:
pass
def build_application(self) -> Application:
"""Build the telegram Application (but don't start it yet)."""
builder = Application.builder().token(self.token)
self.app = builder.build()
# Log ALL messages in the group (text, photos, documents, etc.)
self.app.add_handler(
MessageHandler(filters.ALL, self._log_message)
)
self.app.post_init = self._post_init
self.app.post_shutdown = self._post_shutdown
return self.app

4
requirements.txt Normal file
View file

@ -0,0 +1,4 @@
fastmcp>=2.0.0
python-telegram-bot>=21.0
libsql-experimental>=0.0.50
aiohttp>=3.9.0