telegram-bot-mcp/mcp_bridge/db.py
Mikkel Georgsen 1cb16e6e8f feat: MCP bridge - Telegram group logger + FastMCP HTTP server
Single-process Python app that:
- Runs a Telegram bot in a group chat, logging all messages/files to libsql
- Exposes send_message, pull_updates, queue_status MCP tools over HTTP
- Downloads and stores file attachments with Telegram file_id + local path
- Accessible via NetBird mesh at mgmt.mg:8321 (no auth needed)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 23:56:05 +00:00

241 lines
8.1 KiB
Python

"""Database layer using libsql (embedded)."""
import libsql_experimental as libsql
from datetime import datetime, timezone
from pathlib import Path
from .config import DB_PATH
SCHEMA = """
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
telegram_message_id INTEGER NOT NULL,
chat_id INTEGER NOT NULL,
sender_type TEXT NOT NULL,
sender_id INTEGER,
sender_name TEXT,
content TEXT,
reply_to_message_id INTEGER,
has_attachment INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
UNIQUE(chat_id, telegram_message_id)
);
CREATE TABLE IF NOT EXISTS attachments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER NOT NULL REFERENCES messages(id),
file_type TEXT NOT NULL,
file_id TEXT NOT NULL,
file_unique_id TEXT NOT NULL,
file_name TEXT,
mime_type TEXT,
file_size INTEGER,
local_path TEXT,
caption TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS outbound_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
content TEXT NOT NULL,
attribution TEXT DEFAULT 'claude.ai',
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
sent_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(created_at);
CREATE INDEX IF NOT EXISTS idx_messages_chat_id ON messages(chat_id);
CREATE INDEX IF NOT EXISTS idx_outbound_status ON outbound_queue(status);
"""
class Database:
def __init__(self, db_path: Path | None = None):
self.db_path = db_path or DB_PATH
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = libsql.connect(str(self.db_path))
self._init_schema()
def _init_schema(self):
for statement in SCHEMA.split(";"):
statement = statement.strip()
if statement:
self.conn.execute(statement)
self.conn.commit()
def insert_message(
self,
telegram_message_id: int,
chat_id: int,
sender_type: str,
sender_id: int | None,
sender_name: str | None,
content: str | None,
reply_to_message_id: int | None,
has_attachment: bool,
created_at: str,
) -> int | None:
"""Insert a message. Returns row id, or None if duplicate."""
try:
cursor = self.conn.execute(
"""INSERT INTO messages
(telegram_message_id, chat_id, sender_type, sender_id,
sender_name, content, reply_to_message_id, has_attachment, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
telegram_message_id, chat_id, sender_type, sender_id,
sender_name, content, reply_to_message_id,
1 if has_attachment else 0, created_at,
),
)
self.conn.commit()
return cursor.lastrowid
except Exception as e:
if "UNIQUE" in str(e):
return None
raise
def insert_attachment(
self,
message_id: int,
file_type: str,
file_id: str,
file_unique_id: str,
file_name: str | None,
mime_type: str | None,
file_size: int | None,
local_path: str | None,
caption: str | None,
) -> int:
cursor = self.conn.execute(
"""INSERT INTO attachments
(message_id, file_type, file_id, file_unique_id, file_name,
mime_type, file_size, local_path, caption, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
message_id, file_type, file_id, file_unique_id, file_name,
mime_type, file_size, local_path, caption,
datetime.now(timezone.utc).isoformat(),
),
)
self.conn.commit()
return cursor.lastrowid
def queue_outbound(self, chat_id: int, content: str, attribution: str = "claude.ai") -> int:
cursor = self.conn.execute(
"""INSERT INTO outbound_queue (chat_id, content, attribution, status, created_at)
VALUES (?, ?, ?, 'pending', ?)""",
(chat_id, content, attribution, datetime.now(timezone.utc).isoformat()),
)
self.conn.commit()
return cursor.lastrowid
def get_pending_outbound(self) -> list[dict]:
rows = self.conn.execute(
"SELECT id, chat_id, content, attribution FROM outbound_queue WHERE status = 'pending' ORDER BY id"
).fetchall()
return [
{"id": r[0], "chat_id": r[1], "content": r[2], "attribution": r[3]}
for r in rows
]
def mark_outbound_sent(self, outbound_id: int):
self.conn.execute(
"UPDATE outbound_queue SET status = 'sent', sent_at = ? WHERE id = ?",
(datetime.now(timezone.utc).isoformat(), outbound_id),
)
self.conn.commit()
def mark_outbound_failed(self, outbound_id: int):
self.conn.execute(
"UPDATE outbound_queue SET status = 'failed' WHERE id = ?",
(outbound_id,),
)
self.conn.commit()
def get_messages_since_id(self, since_id: int = 0, limit: int = 100) -> list[dict]:
rows = self.conn.execute(
"""SELECT m.id, m.telegram_message_id, m.sender_type, m.sender_name,
m.content, m.has_attachment, m.created_at, m.reply_to_message_id
FROM messages m
WHERE m.id > ?
ORDER BY m.id ASC
LIMIT ?""",
(since_id, limit),
).fetchall()
return [
{
"id": r[0],
"telegram_message_id": r[1],
"sender_type": r[2],
"sender": r[3] or r[2],
"content": r[4],
"has_attachment": bool(r[5]),
"created_at": r[6],
"reply_to_message_id": r[7],
}
for r in rows
]
def get_messages_since_timestamp(self, since: str, limit: int = 100) -> list[dict]:
rows = self.conn.execute(
"""SELECT m.id, m.telegram_message_id, m.sender_type, m.sender_name,
m.content, m.has_attachment, m.created_at, m.reply_to_message_id
FROM messages m
WHERE m.created_at > ?
ORDER BY m.id ASC
LIMIT ?""",
(since, limit),
).fetchall()
return [
{
"id": r[0],
"telegram_message_id": r[1],
"sender_type": r[2],
"sender": r[3] or r[2],
"content": r[4],
"has_attachment": bool(r[5]),
"created_at": r[6],
"reply_to_message_id": r[7],
}
for r in rows
]
def get_attachments_for_message(self, message_id: int) -> list[dict]:
rows = self.conn.execute(
"""SELECT file_type, file_id, file_unique_id, file_name,
mime_type, file_size, local_path, caption
FROM attachments WHERE message_id = ?""",
(message_id,),
).fetchall()
return [
{
"file_type": r[0],
"file_id": r[1],
"file_unique_id": r[2],
"file_name": r[3],
"mime_type": r[4],
"file_size": r[5],
"local_path": r[6],
"caption": r[7],
}
for r in rows
]
def get_status(self) -> dict:
total = self.conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
last_row = self.conn.execute(
"SELECT created_at, sender_name, sender_type FROM messages ORDER BY id DESC LIMIT 1"
).fetchone()
pending = self.conn.execute(
"SELECT COUNT(*) FROM outbound_queue WHERE status = 'pending'"
).fetchone()[0]
return {
"total_messages": total,
"last_message_at": last_row[0] if last_row else None,
"last_sender": last_row[1] or last_row[2] if last_row else None,
"pending_outbound": pending,
}