#!/usr/bin/env python3 """ Homelab Telegram Bot Two-way interactive bot for homelab management and notifications. """ import asyncio import logging import os import subprocess import time from datetime import datetime from pathlib import Path from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from session_manager import SessionManager from claude_subprocess import ClaudeSubprocess from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop from message_batcher import MessageBatcher # Setup logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # Load credentials CREDENTIALS_FILE = Path(__file__).parent / 'credentials' config = {} with open(CREDENTIALS_FILE) as f: for line in f: if '=' in line: key, value = line.strip().split('=', 1) config[key] = value TOKEN = config['TELEGRAM_BOT_TOKEN'] # Authorized users file (stores chat IDs that are allowed to use the bot) AUTHORIZED_FILE = Path(__file__).parent / 'authorized_users' # Inbox file for messages to Claude INBOX_FILE = Path(__file__).parent / 'inbox' # Session manager, subprocess tracking, and message batchers session_manager = SessionManager() subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session batchers: dict[str, MessageBatcher] = {} # Message batcher per session typing_tasks: dict[str, tuple[asyncio.Task, asyncio.Event]] = {} # Typing indicator per session def get_authorized_users(): """Load authorized user IDs.""" if AUTHORIZED_FILE.exists(): with open(AUTHORIZED_FILE) as f: return set(int(line.strip()) for line in f if line.strip()) return set() def add_authorized_user(user_id: int): """Add a user to authorized list.""" users = get_authorized_users() users.add(user_id) with open(AUTHORIZED_FILE, 'w') as f: for uid in users: f.write(f"{uid}\n") def is_authorized(user_id: int) -> bool: """Check if user is authorized.""" return user_id in get_authorized_users() MODEL_ALIASES = { "sonnet": "claude-sonnet-4-5-20250929", "opus": "claude-opus-4-5-20251101", "haiku": "claude-haiku-4-5-20251001", } def load_persona_for_session(session_name: str) -> dict: """Load persona with session-level model override applied.""" session_data = session_manager.get_session(session_name) persona_name = session_data.get('persona', 'default') persona = session_manager.load_persona(persona_name) # Apply session model override if set model_override = session_data.get('model_override') if model_override: if 'settings' not in persona: persona['settings'] = {} persona['settings']['model'] = model_override return persona def make_callbacks(bot, chat_id, session_name: str): """Create callbacks for ClaudeSubprocess bound to specific chat with dynamic typing control. Typing events are looked up dynamically from typing_tasks dict so callbacks always reference the CURRENT typing indicator, not a stale one from creation time. """ def _stop_typing(): """Stop the current typing indicator for this session (if any).""" if session_name in typing_tasks: _, stop_event = typing_tasks[session_name] stop_event.set() async def on_output(text): t0 = time.monotonic() # Split message using smart splitting chunks = split_message_smart(text) for chunk in chunks: try: # Try sending with MarkdownV2 escaped = escape_markdown_v2(chunk) await bot.send_message(chat_id=chat_id, text=escaped, parse_mode='MarkdownV2') except Exception as e: # Fall back to plain text if MarkdownV2 fails logger.warning(f"MarkdownV2 parse failed, falling back to plain text: {e}") await bot.send_message(chat_id=chat_id, text=chunk) # Stop typing indicator (dynamic lookup) _stop_typing() elapsed = time.monotonic() - t0 logger.info(f"[TIMING] Telegram send: {elapsed:.3f}s ({len(text)} chars, {len(chunks)} chunks)") async def on_error(error): await bot.send_message(chat_id=chat_id, text=f"Error: {error}") # Stop typing indicator on error _stop_typing() async def on_complete(): # Stop typing indicator on completion _stop_typing() async def on_status(status): await bot.send_message(chat_id=chat_id, text=f"[{status}]") async def on_tool_use(tool_name: str, tool_input: dict): """Format and send tool call progress notification.""" # Extract meaningful target from tool_input target = "" if tool_name == "Bash": cmd = tool_input.get("command", "") target = cmd[:50] + "..." if len(cmd) > 50 else cmd elif tool_name == "Read": target = tool_input.get("file_path", "") elif tool_name == "Edit": target = tool_input.get("file_path", "") elif tool_name == "Write": target = tool_input.get("file_path", "") elif tool_name == "Grep": pattern = tool_input.get("pattern", "") target = f"pattern: {pattern}" elif tool_name == "Glob": pattern = tool_input.get("pattern", "") target = f"pattern: {pattern}" else: target = str(tool_input)[:50] # Format as italic message message = f"_{tool_name}: {target}_" try: await bot.send_message(chat_id=chat_id, text=message, parse_mode='MarkdownV2') except Exception as e: # Fall back to plain text logger.warning(f"Failed to send tool notification with MarkdownV2: {e}") await bot.send_message(chat_id=chat_id, text=f"{tool_name}: {target}") return { 'on_output': on_output, 'on_error': on_error, 'on_complete': on_complete, 'on_status': on_status, 'on_tool_use': on_tool_use, } def run_command(cmd: list, timeout: int = 30) -> str: """Run a shell command and return output.""" try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, env={**os.environ, 'PATH': f"/home/mikkel/bin:{os.environ.get('PATH', '')}"} ) output = result.stdout or result.stderr or "No output" # Telegram has 4096 char limit per message if len(output) > 4000: output = output[:4000] + "\n... (truncated)" return output except subprocess.TimeoutExpired: return "Command timed out" except Exception as e: return f"Error: {e}" # Command handlers async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /start command - first contact with bot.""" user = update.effective_user chat_id = update.effective_chat.id if not is_authorized(user.id): # First user becomes authorized automatically if not get_authorized_users(): add_authorized_user(user.id) await update.message.reply_text( f"Welcome {user.first_name}! You're now authorized as the admin.\n" f"Your chat ID: {chat_id}\n\n" f"Use /help to see available commands." ) else: await update.message.reply_text( f"Unauthorized. Your user ID: {user.id}\n" "Contact the admin to get access." ) return await update.message.reply_text( f"Welcome back {user.first_name}!\n" f"Use /help to see available commands." ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /help command.""" if not is_authorized(update.effective_user.id): return help_text = """ *Homelab Bot Commands* *Claude Sessions:* /new [persona] - Create new Claude session /session - Switch to a session /model - Switch model (sonnet/opus/haiku) /archive - Archive and remove a session *Status & Monitoring:* /status - Quick service overview /pbs - PBS backup status /backups - Last backup per VM/CT /beszel - Server metrics /kuma - Uptime Kuma status *PBS Commands:* /pbs\\_status - Full PBS overview /pbs\\_errors - Recent errors /pbs\\_gc - Garbage collection status *System:* /ping - Ping a host /help - This help message /chatid - Show your chat ID """ await update.message.reply_text(help_text, parse_mode='Markdown') async def chatid(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show chat ID (useful for notifications).""" await update.message.reply_text(f"Chat ID: `{update.effective_chat.id}`", parse_mode='Markdown') async def status(update: Update, context: ContextTypes.DEFAULT_TYPE): """Quick status overview.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Checking services...") # Quick checks output_lines = ["*Homelab Status*\n"] # Check key services via ping services = [ ("NPM", "10.5.0.1"), ("DNS", "10.5.0.2"), ("PBS", "10.5.0.6"), ("Dockge", "10.5.0.10"), ("Forgejo", "10.5.0.14"), ] for name, ip in services: result = subprocess.run( ["ping", "-c", "1", "-W", "1", ip], capture_output=True ) status = "up" if result.returncode == 0 else "down" output_lines.append(f"{'✅' if status == 'up' else '❌'} {name} ({ip})") await update.message.reply_text("\n".join(output_lines), parse_mode='Markdown') async def pbs(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS quick status.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Fetching PBS status...") output = run_command(["/home/mikkel/bin/pbs", "status"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def pbs_status(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS full status.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Fetching PBS status...") output = run_command(["/home/mikkel/bin/pbs", "status"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def backups(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS backups per VM/CT.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Fetching backup status...") output = run_command(["/home/mikkel/bin/pbs", "backups"], timeout=60) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def pbs_errors(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS errors.""" if not is_authorized(update.effective_user.id): return output = run_command(["/home/mikkel/bin/pbs", "errors"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def pbs_gc(update: Update, context: ContextTypes.DEFAULT_TYPE): """PBS garbage collection status.""" if not is_authorized(update.effective_user.id): return output = run_command(["/home/mikkel/bin/pbs", "gc"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def beszel(update: Update, context: ContextTypes.DEFAULT_TYPE): """Beszel server metrics.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Fetching server metrics...") output = run_command(["/home/mikkel/bin/beszel", "status"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def kuma(update: Update, context: ContextTypes.DEFAULT_TYPE): """Uptime Kuma status.""" if not is_authorized(update.effective_user.id): return output = run_command(["/home/mikkel/bin/kuma", "list"]) await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def ping_host(update: Update, context: ContextTypes.DEFAULT_TYPE): """Ping a host.""" if not is_authorized(update.effective_user.id): return if not context.args: await update.message.reply_text("Usage: /ping ") return host = context.args[0] # Basic validation if not host.replace('.', '').replace('-', '').isalnum(): await update.message.reply_text("Invalid hostname") return result = subprocess.run( ["ping", "-c", "3", "-W", "2", host], capture_output=True, text=True, timeout=10 ) output = result.stdout or result.stderr await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE): """Create a new Claude session.""" if not is_authorized(update.effective_user.id): return if not context.args: await update.message.reply_text("Usage: /new [persona]") return name = context.args[0] persona = context.args[1] if len(context.args) > 1 else None try: # Create session session_manager.create_session(name, persona=persona) # Auto-switch to new session session_manager.switch_session(name) # Prepare reply if persona: reply = f"Session '{name}' created with persona '{persona}'." else: reply = f"Session '{name}' created." await update.message.reply_text(reply) # Spawn subprocess for the new session (but don't send message yet) session_dir = session_manager.get_session_dir(name) persona_data = load_persona_for_session(name) # Create callbacks bound to this chat (typing looked up dynamically) callbacks = make_callbacks(context.bot, update.effective_chat.id, name) # Create subprocess subprocesses[name] = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, on_output=callbacks['on_output'], on_error=callbacks['on_error'], on_complete=callbacks['on_complete'], on_status=callbacks['on_status'], on_tool_use=callbacks['on_tool_use'], ) logger.info(f"Created session '{name}' with persona '{persona or 'default'}'") except ValueError as e: # Session already exists await update.message.reply_text( f"Session '{name}' already exists. Use /session to switch to it." ) except FileNotFoundError as e: # Persona not found await update.message.reply_text(f"Error: {e}") except Exception as e: logger.error(f"Error creating session: {e}") await update.message.reply_text(f"Error creating session: {e}") async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): """Switch to a different Claude session.""" if not is_authorized(update.effective_user.id): return if not context.args: # Show usage and list available sessions sessions = session_manager.list_sessions() if not sessions: await update.message.reply_text("No sessions found. Use /new to create one.") return session_list = "\n".join( f"- {s['name']} ({s['status']}) - {s.get('persona', 'default')}" for s in sessions ) await update.message.reply_text( f"Usage: /session \n\nAvailable sessions:\n{session_list}" ) return name = context.args[0] try: # Check if session exists if not session_manager.session_exists(name): await update.message.reply_text( f"Session '{name}' doesn't exist. Use /new to create it." ) return # Stop typing indicator for previous session if running prev_session = session_manager.get_active_session() if prev_session and prev_session in typing_tasks: task, stop_event = typing_tasks[prev_session] stop_event.set() try: await task except Exception: pass del typing_tasks[prev_session] # Flush batcher for previous session immediately if prev_session and prev_session in batchers: await batchers[prev_session].flush_immediately() # Switch session session_manager.switch_session(name) # Auto-spawn subprocess if not alive if name not in subprocesses or not subprocesses[name].is_alive: session_dir = session_manager.get_session_dir(name) persona_data = load_persona_for_session(name) # Create callbacks bound to this chat (typing looked up dynamically) callbacks = make_callbacks(context.bot, update.effective_chat.id, name) # Create subprocess subprocesses[name] = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, on_output=callbacks['on_output'], on_error=callbacks['on_error'], on_complete=callbacks['on_complete'], on_status=callbacks['on_status'], on_tool_use=callbacks['on_tool_use'], ) logger.info(f"Auto-spawned subprocess for session '{name}'") # Get persona for reply session_data = session_manager.get_session(name) persona_name = session_data.get('persona', 'default') if persona_name and persona_name != 'default': reply = f"Switched to session '{name}' (persona: {persona_name})." else: reply = f"Switched to session '{name}'." await update.message.reply_text(reply) except Exception as e: logger.error(f"Error switching session: {e}") await update.message.reply_text(f"Error switching session: {e}") async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): """Archive a Claude session (compress and remove).""" if not is_authorized(update.effective_user.id): return if not context.args: await update.message.reply_text("Usage: /archive ") return name = context.args[0] try: # Stop typing indicator if running if name in typing_tasks: task, stop_event = typing_tasks[name] stop_event.set() try: await task except Exception: pass del typing_tasks[name] # Flush and remove batcher if name in batchers: await batchers[name].flush_immediately() del batchers[name] # Terminate subprocess if running if name in subprocesses: if subprocesses[name].is_alive: await subprocesses[name].terminate() del subprocesses[name] # Archive the session archive_path = session_manager.archive_session(name) size_mb = archive_path.stat().st_size / (1024 * 1024) await update.message.reply_text( f"Session '{name}' archived ({size_mb:.1f} MB)\n{archive_path.name}" ) logger.info(f"Archived session '{name}' to {archive_path}") except ValueError as e: await update.message.reply_text(str(e)) except Exception as e: logger.error(f"Error archiving session: {e}") await update.message.reply_text(f"Error archiving session: {e}") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle free text messages - route to active Claude session with batching.""" if not is_authorized(update.effective_user.id): return # Check for active session active_session = session_manager.get_active_session() if not active_session: await update.message.reply_text( "No active session. Use /new to start one." ) return message = update.message.text t_start = time.monotonic() # Telegram message timestamp vs now = delivery delay msg_age = (datetime.utcnow() - update.message.date.replace(tzinfo=None)).total_seconds() logger.info(f"[TIMING] Message received: session='{active_session}', age={msg_age:.1f}s, text={message[:50]}...") try: # Clean up stale typing task (from previous message completion) if active_session in typing_tasks: old_task, old_event = typing_tasks[active_session] if old_task.done(): del typing_tasks[active_session] # Start fresh typing indicator for this message if active_session not in typing_tasks: stop_typing = asyncio.Event() typing_task = asyncio.create_task( typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing) ) typing_tasks[active_session] = (typing_task, stop_typing) # Get or create subprocess for active session (avoid double-start) already_alive = active_session in subprocesses and subprocesses[active_session].is_alive if not already_alive: session_dir = session_manager.get_session_dir(active_session) persona_data = load_persona_for_session(active_session) # Create callbacks bound to this chat (typing looked up dynamically) callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) # Create subprocess subprocess_inst = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, on_output=callbacks['on_output'], on_error=callbacks['on_error'], on_complete=callbacks['on_complete'], on_status=callbacks['on_status'], on_tool_use=callbacks['on_tool_use'], ) await subprocess_inst.start() subprocesses[active_session] = subprocess_inst else: subprocess_inst = subprocesses[active_session] subprocess_state = "reused" if already_alive else "cold-start" logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocess_inst.is_busy}") # Get or create message batcher for this session if active_session not in batchers: # Batcher callback sends message to subprocess batchers[active_session] = MessageBatcher( callback=subprocess_inst.send_message, debounce_seconds=2.0 ) # Add message to batcher (typing indicator already running) await batchers[active_session].add_message(message) except Exception as e: logger.error(f"Error handling message: {e}") await update.message.reply_text(f"Error: {e}") # Stop typing indicator on error if active_session in typing_tasks: task, stop_event = typing_tasks[active_session] stop_event.set() async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle photo messages - save to session directory and auto-analyze.""" if not is_authorized(update.effective_user.id): return # Check for active session active_session = session_manager.get_active_session() if not active_session: await update.message.reply_text( "No active session. Use /new to start one, then send the photo again." ) return user = update.effective_user file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Get session directory session_dir = session_manager.get_session_dir(active_session) # Get the largest photo (best quality) photo = update.message.photo[-1] file = await context.bot.get_file(photo.file_id) # Download to session directory filename = f"photo_{file_timestamp}.jpg" filepath = session_dir / filename await file.download_to_drive(filepath) logger.info(f"Photo saved to session '{active_session}': {filepath}") # Get caption if any caption = update.message.caption or "" # Clean up stale typing task if active_session in typing_tasks: old_task, old_event = typing_tasks[active_session] if old_task.done(): del typing_tasks[active_session] # Start typing indicator if active_session not in typing_tasks: stop_typing = asyncio.Event() typing_task = asyncio.create_task( typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing) ) typing_tasks[active_session] = (typing_task, stop_typing) # Auto-analyze: send message to Claude if caption: analysis_message = f"I've attached a photo: {filename}. {caption}" else: analysis_message = f"I've attached a photo: {filename}. Please describe what you see." # Get or create subprocess if active_session not in subprocesses or not subprocesses[active_session].is_alive: persona_data = load_persona_for_session(active_session) callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) subprocess_inst = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, on_output=callbacks['on_output'], on_error=callbacks['on_error'], on_complete=callbacks['on_complete'], on_status=callbacks['on_status'], on_tool_use=callbacks['on_tool_use'], ) await subprocess_inst.start() subprocesses[active_session] = subprocess_inst # Send analysis message directly (not batched) await subprocesses[active_session].send_message(analysis_message) async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle document/file messages - save to session directory and notify Claude.""" if not is_authorized(update.effective_user.id): return # Check for active session active_session = session_manager.get_active_session() if not active_session: await update.message.reply_text( "No active session. Use /new to start one, then send the file again." ) return user = update.effective_user file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Get session directory session_dir = session_manager.get_session_dir(active_session) # Get document info doc = update.message.document original_name = doc.file_name or "unknown" file = await context.bot.get_file(doc.file_id) # Download with timestamp prefix to avoid collisions filename = f"{file_timestamp}_{original_name}" filepath = session_dir / filename await file.download_to_drive(filepath) logger.info(f"Document saved to session '{active_session}': {filepath}") # Get caption if any caption = update.message.caption or "" # Clean up stale typing task if active_session in typing_tasks: old_task, old_event = typing_tasks[active_session] if old_task.done(): del typing_tasks[active_session] # Start typing indicator if active_session not in typing_tasks: stop_typing = asyncio.Event() typing_task = asyncio.create_task( typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing) ) typing_tasks[active_session] = (typing_task, stop_typing) # Notify Claude if caption: notify_message = f"{caption}\nThe file {filename} has been saved to your session." else: notify_message = f"User uploaded file: {filename}" # Get or create subprocess if active_session not in subprocesses or not subprocesses[active_session].is_alive: persona_data = load_persona_for_session(active_session) callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) subprocess_inst = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, on_output=callbacks['on_output'], on_error=callbacks['on_error'], on_complete=callbacks['on_complete'], on_status=callbacks['on_status'], on_tool_use=callbacks['on_tool_use'], ) await subprocess_inst.start() subprocesses[active_session] = subprocess_inst # Send notification directly (not batched) await subprocesses[active_session].send_message(notify_message) async def model_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): """Switch model for current session. Persists across session switches.""" if not is_authorized(update.effective_user.id): return active_session = session_manager.get_active_session() if not active_session: await update.message.reply_text("No active session. Use /new to start one.") return if not context.args: # Show current model session_data = session_manager.get_session(active_session) model_override = session_data.get('model_override') persona_name = session_data.get('persona', 'default') persona = session_manager.load_persona(persona_name) current = model_override or persona.get('settings', {}).get('model', 'default') aliases = "\n".join(f" {k} → {v}" for k, v in MODEL_ALIASES.items()) await update.message.reply_text( f"Current model: {current}\n\nUsage: /model \n\nAliases:\n{aliases}" ) return model = context.args[0] # Resolve alias resolved = MODEL_ALIASES.get(model, model) # Persist to session metadata session_manager.update_session(active_session, model_override=resolved) # Terminate current subprocess so next message spawns with new model if active_session in subprocesses: if subprocesses[active_session].is_alive: await subprocesses[active_session].terminate() del subprocesses[active_session] # Clean up batcher too if active_session in batchers: await batchers[active_session].flush_immediately() del batchers[active_session] await update.message.reply_text(f"Model set to {resolved} for session '{active_session}'.") logger.info(f"Model changed to {resolved} for session '{active_session}'") async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle unknown commands.""" if not is_authorized(update.effective_user.id): return await update.message.reply_text("Unknown command. Use /help for available commands.") def main(): """Start the bot.""" # Create application app = Application.builder().token(TOKEN).build() # Add handlers app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("help", help_command)) app.add_handler(CommandHandler("chatid", chatid)) app.add_handler(CommandHandler("new", new_session)) app.add_handler(CommandHandler("session", switch_session_cmd)) app.add_handler(CommandHandler("archive", archive_session_cmd)) app.add_handler(CommandHandler("model", model_cmd)) app.add_handler(CommandHandler("status", status)) app.add_handler(CommandHandler("pbs", pbs)) app.add_handler(CommandHandler("pbs_status", pbs_status)) app.add_handler(CommandHandler("backups", backups)) app.add_handler(CommandHandler("pbs_errors", pbs_errors)) app.add_handler(CommandHandler("pbs_gc", pbs_gc)) app.add_handler(CommandHandler("beszel", beszel)) app.add_handler(CommandHandler("kuma", kuma)) app.add_handler(CommandHandler("ping", ping_host)) # Unknown command handler app.add_handler(MessageHandler(filters.COMMAND, unknown)) # Free text message handler (for messages to Claude) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message, block=False)) # Photo handler app.add_handler(MessageHandler(filters.PHOTO, handle_photo)) # Document/file handler app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) # Start polling logger.info("Starting Homelab Bot...") app.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == '__main__': main()