"""Database layer using libsql (embedded).""" import libsql_experimental as libsql from datetime import datetime, timezone from pathlib import Path from .config import DB_PATH SCHEMA = """ CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, telegram_message_id INTEGER NOT NULL, chat_id INTEGER NOT NULL, sender_type TEXT NOT NULL, sender_id INTEGER, sender_name TEXT, content TEXT, reply_to_message_id INTEGER, has_attachment INTEGER DEFAULT 0, created_at TEXT NOT NULL, UNIQUE(chat_id, telegram_message_id) ); CREATE TABLE IF NOT EXISTS attachments ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id INTEGER NOT NULL REFERENCES messages(id), file_type TEXT NOT NULL, file_id TEXT NOT NULL, file_unique_id TEXT NOT NULL, file_name TEXT, mime_type TEXT, file_size INTEGER, local_path TEXT, caption TEXT, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS outbound_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, content TEXT NOT NULL, attribution TEXT DEFAULT 'claude.ai', status TEXT DEFAULT 'pending', created_at TEXT NOT NULL, sent_at TEXT ); CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(created_at); CREATE INDEX IF NOT EXISTS idx_messages_chat_id ON messages(chat_id); CREATE INDEX IF NOT EXISTS idx_outbound_status ON outbound_queue(status); """ class Database: def __init__(self, db_path: Path | None = None): self.db_path = db_path or DB_PATH self.db_path.parent.mkdir(parents=True, exist_ok=True) self.conn = libsql.connect(str(self.db_path)) self._init_schema() def _init_schema(self): for statement in SCHEMA.split(";"): statement = statement.strip() if statement: self.conn.execute(statement) self.conn.commit() def insert_message( self, telegram_message_id: int, chat_id: int, sender_type: str, sender_id: int | None, sender_name: str | None, content: str | None, reply_to_message_id: int | None, has_attachment: bool, created_at: str, ) -> int | None: """Insert a message. Returns row id, or None if duplicate.""" try: cursor = self.conn.execute( """INSERT INTO messages (telegram_message_id, chat_id, sender_type, sender_id, sender_name, content, reply_to_message_id, has_attachment, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( telegram_message_id, chat_id, sender_type, sender_id, sender_name, content, reply_to_message_id, 1 if has_attachment else 0, created_at, ), ) self.conn.commit() return cursor.lastrowid except Exception as e: if "UNIQUE" in str(e): return None raise def insert_attachment( self, message_id: int, file_type: str, file_id: str, file_unique_id: str, file_name: str | None, mime_type: str | None, file_size: int | None, local_path: str | None, caption: str | None, ) -> int: cursor = self.conn.execute( """INSERT INTO attachments (message_id, file_type, file_id, file_unique_id, file_name, mime_type, file_size, local_path, caption, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( message_id, file_type, file_id, file_unique_id, file_name, mime_type, file_size, local_path, caption, datetime.now(timezone.utc).isoformat(), ), ) self.conn.commit() return cursor.lastrowid def queue_outbound(self, chat_id: int, content: str, attribution: str = "claude.ai") -> int: cursor = self.conn.execute( """INSERT INTO outbound_queue (chat_id, content, attribution, status, created_at) VALUES (?, ?, ?, 'pending', ?)""", (chat_id, content, attribution, datetime.now(timezone.utc).isoformat()), ) self.conn.commit() return cursor.lastrowid def get_pending_outbound(self) -> list[dict]: rows = self.conn.execute( "SELECT id, chat_id, content, attribution FROM outbound_queue WHERE status = 'pending' ORDER BY id" ).fetchall() return [ {"id": r[0], "chat_id": r[1], "content": r[2], "attribution": r[3]} for r in rows ] def mark_outbound_sent(self, outbound_id: int): self.conn.execute( "UPDATE outbound_queue SET status = 'sent', sent_at = ? WHERE id = ?", (datetime.now(timezone.utc).isoformat(), outbound_id), ) self.conn.commit() def mark_outbound_failed(self, outbound_id: int): self.conn.execute( "UPDATE outbound_queue SET status = 'failed' WHERE id = ?", (outbound_id,), ) self.conn.commit() def get_messages_since_id(self, since_id: int = 0, limit: int = 100) -> list[dict]: rows = self.conn.execute( """SELECT m.id, m.telegram_message_id, m.sender_type, m.sender_name, m.content, m.has_attachment, m.created_at, m.reply_to_message_id FROM messages m WHERE m.id > ? ORDER BY m.id ASC LIMIT ?""", (since_id, limit), ).fetchall() return [ { "id": r[0], "telegram_message_id": r[1], "sender_type": r[2], "sender": r[3] or r[2], "content": r[4], "has_attachment": bool(r[5]), "created_at": r[6], "reply_to_message_id": r[7], } for r in rows ] def get_messages_since_timestamp(self, since: str, limit: int = 100) -> list[dict]: rows = self.conn.execute( """SELECT m.id, m.telegram_message_id, m.sender_type, m.sender_name, m.content, m.has_attachment, m.created_at, m.reply_to_message_id FROM messages m WHERE m.created_at > ? ORDER BY m.id ASC LIMIT ?""", (since, limit), ).fetchall() return [ { "id": r[0], "telegram_message_id": r[1], "sender_type": r[2], "sender": r[3] or r[2], "content": r[4], "has_attachment": bool(r[5]), "created_at": r[6], "reply_to_message_id": r[7], } for r in rows ] def get_attachments_for_message(self, message_id: int) -> list[dict]: rows = self.conn.execute( """SELECT file_type, file_id, file_unique_id, file_name, mime_type, file_size, local_path, caption FROM attachments WHERE message_id = ?""", (message_id,), ).fetchall() return [ { "file_type": r[0], "file_id": r[1], "file_unique_id": r[2], "file_name": r[3], "mime_type": r[4], "file_size": r[5], "local_path": r[6], "caption": r[7], } for r in rows ] def get_status(self) -> dict: total = self.conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] last_row = self.conn.execute( "SELECT created_at, sender_name, sender_type FROM messages ORDER BY id DESC LIMIT 1" ).fetchone() pending = self.conn.execute( "SELECT COUNT(*) FROM outbound_queue WHERE status = 'pending'" ).fetchone()[0] return { "total_messages": total, "last_message_at": last_row[0] if last_row else None, "last_sender": last_row[1] or last_row[2] if last_row else None, "pending_outbound": pending, }