#!/usr/bin/env python3 """ Homelab Telegram Bot Two-way interactive bot for homelab management and notifications. """ import asyncio import logging import os import subprocess from datetime import datetime from pathlib import Path from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from session_manager import SessionManager from claude_subprocess import ClaudeSubprocess # Setup logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Load credentials CREDENTIALS_FILE = Path(__file__).parent / 'credentials' config = {} with open(CREDENTIALS_FILE) as f: for line in f: if '=' in line: key, value = line.strip().split('=', 1) config[key] = value TOKEN = config['TELEGRAM_BOT_TOKEN'] # Authorized users file (stores chat IDs that are allowed to use the bot) AUTHORIZED_FILE = Path(__file__).parent / 'authorized_users' # Inbox file for messages to Claude INBOX_FILE = Path(__file__).parent / 'inbox' # Session manager and subprocess tracking session_manager = SessionManager() subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session def get_authorized_users(): """Load authorized user IDs.""" if AUTHORIZED_FILE.exists(): with open(AUTHORIZED_FILE) as f: return set(int(line.strip()) for line in f if line.strip()) return set() def add_authorized_user(user_id: int): """Add a user to authorized list.""" users = get_authorized_users() users.add(user_id) with open(AUTHORIZED_FILE, 'w') as f: for uid in users: f.write(f"{uid}\n") def is_authorized(user_id: int) -> bool: """Check if user is authorized.""" return user_id in get_authorized_users() def make_callbacks(bot, chat_id): """Create callbacks for ClaudeSubprocess bound to specific chat.""" async def on_output(text): # Truncate to Telegram limit if len(text) > 4000: text = text[:4000] + "\n... (truncated)" await bot.send_message(chat_id=chat_id, text=text) async def on_error(error): await bot.send_message(chat_id=chat_id, text=f"Error: {error}") async def on_complete(): pass # Phase 2 will add typing indicator cleanup async def on_status(status): await bot.send_message(chat_id=chat_id, text=f"[{status}]") return on_output, on_error, on_complete, on_status def run_command(cmd: list, timeout: int = 30) -> str: """Run a shell command and return output.""" try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, env={**os.environ, 'PATH': f"/home/mikkel/bin:{os.environ.get('PATH', '')}"} ) output = result.stdout or result.stderr or "No output" # Telegram has 4096 char limit per message if len(output) > 4000: output = output[:4000] + "\n... (truncated)" return output except subprocess.TimeoutExpired: return "Command timed out" except Exception as e: return f"Error: {e}" # Command handlers async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /start command - first contact with bot.""" user = update.effective_user chat_id = update.effective_chat.id if not is_authorized(user.id): # First user becomes authorized automatically if not get_authorized_users(): add_authorized_user(user.id) await update.message.reply_text( f"Welcome {user.first_name}! You're now authorized as the admin.\n" f"Your chat ID: {chat_id}\n\n" f"Use /help to see available commands." ) else: await update.message.reply_text( f"Unauthorized. Your user ID: {user.id}\n" "Contact the admin to get access." ) return await update.message.reply_text( f"Welcome back {user.first_name}!\n" f"Use /help to see available commands." ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /help command.""" if not is_authorized(update.effective_user.id): return help_text = """ *Homelab Bot Commands* *Claude Sessions:* /new [persona] - Create new Claude session /session - Switch to a session /archive - Archive and remove a session *Status & Monitoring:* /status - Quick service overview /pbs - PBS backup status /backups - Last backup per VM/CT /beszel - Server metrics /kuma - Uptime Kuma status *PBS Commands:* /pbs\\_status - Full PBS overview /pbs\\_errors - Recent errors /pbs\\_gc - Garbage collection status *System:* /ping - Ping a host /help - This help message /chatid - Show your chat ID """ await update.message.reply_text(help_text, parse_mode='Markdown') async def chatid(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show chat ID (useful for notifications).""" await update.message.reply_text(f"Chat ID: `{update.effective_chat.id}`", parse_mode='Markdown') async def status(update: Update, context: ContextTypes.DEFAULT_TYPE): """Quick status overview.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Checking services...") # Quick checks output_lines = ["*Homelab Status*\n"] # Check key services via ping services = [ ("NPM", "10.5.0.1"), ("DNS", "10.5.0.2"), ("PBS", "10.5.0.6"), ("Dockge", "10.5.0.10"), ("Forgejo", "10.5.0.14"), ] for name, ip in services: result = subprocess.run( ["ping", "-c", "1", "-W", "1", ip], capture_output=True ) status = "up" if result.returncode == 0 else "down" output_lines.append(f"{'✅' if status == 'up' else '❌'} {name} ({ip})") await update.message.reply_text("\n".join(output_lines), parse_mode='Markdown') async def pbs(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS quick status.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Fetching PBS status...") output = run_command(["/home/mikkel/bin/pbs", "status"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def pbs_status(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS full status.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Fetching PBS status...") output = run_command(["/home/mikkel/bin/pbs", "status"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def backups(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS backups per VM/CT.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Fetching backup status...") output = run_command(["/home/mikkel/bin/pbs", "backups"], timeout=60) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def pbs_errors(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS errors.""" if not is_authorized(update.effective_user.id): return output = run_command(["/home/mikkel/bin/pbs", "errors"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def pbs_gc(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS garbage collection status.""" if not is_authorized(update.effective_user.id): return output = run_command(["/home/mikkel/bin/pbs", "gc"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def beszel(update: Update, context: ContextTypes.DEFAULT_TYPE): """Beszel server metrics.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Fetching server metrics...") output = run_command(["/home/mikkel/bin/beszel", "status"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def kuma(update: Update, context: ContextTypes.DEFAULT_TYPE): """Uptime Kuma status.""" if not is_authorized(update.effective_user.id): return output = run_command(["/home/mikkel/bin/kuma", "list"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def ping_host(update: Update, context: ContextTypes.DEFAULT_TYPE): """Ping a host.""" if not is_authorized(update.effective_user.id): return if not context.args: await update.message.reply_text("Usage: /ping ") return host = context.args[0] # Basic validation if not host.replace('.', '').replace('-', '').isalnum(): await update.message.reply_text("Invalid hostname") return result = subprocess.run( ["ping", "-c", "3", "-W", "2", host], capture_output=True, text=True, timeout=10 ) output = result.stdout or result.stderr await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE): """Create a new Claude session.""" if not is_authorized(update.effective_user.id): return if not context.args: await update.message.reply_text("Usage: /new [persona]") return name = context.args[0] persona = context.args[1] if len(context.args) > 1 else None try: # Create session session_manager.create_session(name, persona=persona) # Auto-switch to new session session_manager.switch_session(name) # Prepare reply if persona: reply = f"Session '{name}' created with persona '{persona}'." else: reply = f"Session '{name}' created." await update.message.reply_text(reply) # Spawn subprocess for the new session (but don't send message yet) session_dir = session_manager.get_session_dir(name) persona_config = session_manager.get_session(name) persona_data = session_manager.load_persona(persona or 'default') # Create callbacks bound to this chat callbacks = make_callbacks(context.bot, update.effective_chat.id) # Create subprocess subprocesses[name] = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, on_output=callbacks[0], on_error=callbacks[1], on_complete=callbacks[2], on_status=callbacks[3], ) logger.info(f"Created session '{name}' with persona '{persona or 'default'}'") except ValueError as e: # Session already exists await update.message.reply_text( f"Session '{name}' already exists. Use /session to switch to it." ) except FileNotFoundError as e: # Persona not found await update.message.reply_text(f"Error: {e}") except Exception as e: logger.error(f"Error creating session: {e}") await update.message.reply_text(f"Error creating session: {e}") async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): """Switch to a different Claude session.""" if not is_authorized(update.effective_user.id): return if not context.args: # Show usage and list available sessions sessions = session_manager.list_sessions() if not sessions: await update.message.reply_text("No sessions found. Use /new to create one.") return session_list = "\n".join( f"- {s['name']} ({s['status']}) - {s.get('persona', 'default')}" for s in sessions ) await update.message.reply_text( f"Usage: /session \n\nAvailable sessions:\n{session_list}" ) return name = context.args[0] try: # Check if session exists if not session_manager.session_exists(name): await update.message.reply_text( f"Session '{name}' doesn't exist. Use /new to create it." ) return # Switch session session_manager.switch_session(name) # Auto-spawn subprocess if not alive if name not in subprocesses or not subprocesses[name].is_alive: session_dir = session_manager.get_session_dir(name) session_data = session_manager.get_session(name) persona_name = session_data.get('persona', 'default') persona_data = session_manager.load_persona(persona_name) # Create callbacks bound to this chat callbacks = make_callbacks(context.bot, update.effective_chat.id) # Create subprocess subprocesses[name] = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, on_output=callbacks[0], on_error=callbacks[1], on_complete=callbacks[2], on_status=callbacks[3], ) logger.info(f"Auto-spawned subprocess for session '{name}'") # Get persona for reply session_data = session_manager.get_session(name) persona_name = session_data.get('persona', 'default') if persona_name and persona_name != 'default': reply = f"Switched to session '{name}' (persona: {persona_name})." else: reply = f"Switched to session '{name}'." await update.message.reply_text(reply) except Exception as e: logger.error(f"Error switching session: {e}") await update.message.reply_text(f"Error switching session: {e}") async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): """Archive a Claude session (compress and remove).""" if not is_authorized(update.effective_user.id): return if not context.args: await update.message.reply_text("Usage: /archive ") return name = context.args[0] try: # Terminate subprocess if running if name in subprocesses: if subprocesses[name].is_alive: await subprocesses[name].terminate() del subprocesses[name] # Archive the session archive_path = session_manager.archive_session(name) size_mb = archive_path.stat().st_size / (1024 * 1024) await update.message.reply_text( f"Session '{name}' archived ({size_mb:.1f} MB)\n{archive_path.name}" ) logger.info(f"Archived session '{name}' to {archive_path}") except ValueError as e: await update.message.reply_text(str(e)) except Exception as e: logger.error(f"Error archiving session: {e}") await update.message.reply_text(f"Error archiving session: {e}") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle free text messages - route to active Claude session.""" if not is_authorized(update.effective_user.id): return # Check for active session active_session = session_manager.get_active_session() if not active_session: await update.message.reply_text( "No active session. Use /new to start one." ) return message = update.message.text logger.info(f"Routing message to session '{active_session}': {message[:50]}...") try: # Get or create subprocess for active session if active_session not in subprocesses or not subprocesses[active_session].is_alive: session_dir = session_manager.get_session_dir(active_session) session_data = session_manager.get_session(active_session) persona_name = session_data.get('persona', 'default') persona_data = session_manager.load_persona(persona_name) # Create callbacks bound to this chat callbacks = make_callbacks(context.bot, update.effective_chat.id) # Create subprocess subprocesses[active_session] = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, on_output=callbacks[0], on_error=callbacks[1], on_complete=callbacks[2], on_status=callbacks[3], ) logger.info(f"Auto-spawned subprocess for active session '{active_session}'") # Send message to Claude await subprocesses[active_session].send_message(message) except Exception as e: logger.error(f"Error handling message: {e}") await update.message.reply_text(f"Error: {e}") async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle photo messages - download and save for Claude.""" if not is_authorized(update.effective_user.id): return user = update.effective_user timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Create images directory images_dir = Path(__file__).parent / 'images' images_dir.mkdir(exist_ok=True) # Get the largest photo (best quality) photo = update.message.photo[-1] file = await context.bot.get_file(photo.file_id) # Download the image filename = f"{file_timestamp}.jpg" filepath = images_dir / filename await file.download_to_drive(filepath) # Get caption if any caption = update.message.caption or "" caption_text = f" - \"{caption}\"" if caption else "" # Log to inbox with open(INBOX_FILE, 'a') as f: f.write(f"[{timestamp}] {user.first_name}: [IMAGE: {filepath}]{caption_text}\n") logger.info(f"Photo saved from {user.first_name}: {filepath}") async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle document/file messages - download and save for Claude.""" if not is_authorized(update.effective_user.id): return user = update.effective_user timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Create files directory files_dir = Path(__file__).parent / 'files' files_dir.mkdir(exist_ok=True) # Get document info doc = update.message.document original_name = doc.file_name or "unknown" file = await context.bot.get_file(doc.file_id) # Download with timestamp prefix to avoid collisions filename = f"{file_timestamp}_{original_name}" filepath = files_dir / filename await file.download_to_drive(filepath) # Get caption if any caption = update.message.caption or "" caption_text = f" - \"{caption}\"" if caption else "" # Log to inbox with open(INBOX_FILE, 'a') as f: f.write(f"[{timestamp}] {user.first_name}: [FILE: {filepath}]{caption_text}\n") logger.info(f"Document saved from {user.first_name}: {filepath}") async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle unknown commands.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Unknown command. Use /help for available commands.") def main(): """Start the bot.""" # Create application app = Application.builder().token(TOKEN).build() # Add handlers app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("help", help_command)) app.add_handler(CommandHandler("chatid", chatid)) app.add_handler(CommandHandler("new", new_session)) app.add_handler(CommandHandler("session", switch_session_cmd)) app.add_handler(CommandHandler("archive", archive_session_cmd)) app.add_handler(CommandHandler("status", status)) app.add_handler(CommandHandler("pbs", pbs)) app.add_handler(CommandHandler("pbs_status", pbs_status)) app.add_handler(CommandHandler("backups", backups)) app.add_handler(CommandHandler("pbs_errors", pbs_errors)) app.add_handler(CommandHandler("pbs_gc", pbs_gc)) app.add_handler(CommandHandler("beszel", beszel)) app.add_handler(CommandHandler("kuma", kuma)) app.add_handler(CommandHandler("ping", ping_host)) # Unknown command handler app.add_handler(MessageHandler(filters.COMMAND, unknown)) # Free text message handler (for messages to Claude) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message, block=False)) # Photo handler app.add_handler(MessageHandler(filters.PHOTO, handle_photo)) # Document/file handler app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) # Start polling logger.info("Starting Homelab Bot...") app.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == '__main__': main()