From 3a62e01f6f28a72db72007ff70f3b5c476af7e67 Mon Sep 17 00:00:00 2001 From: Mikkel Georgsen Date: Wed, 4 Feb 2026 17:40:43 +0000 Subject: [PATCH] feat(01-03): wire session manager and Claude subprocess into Telegram bot - Add /new and /session commands to create and switch sessions - Route plain text messages to active session's Claude subprocess - Auto-spawn subprocess when switching to session with no process - Update help text with session commands - Handle async callbacks in ClaudeSubprocess (inspect.iscoroutinefunction) - Preserve all existing bot commands (/status, /pbs, etc.) - Use block=False for non-blocking message handling Co-Authored-By: Claude Opus 4.5 --- telegram/bot.py | 211 ++++++++++++++++++++++++++++++++-- telegram/claude_subprocess.py | 31 ++++- 2 files changed, 226 insertions(+), 16 deletions(-) diff --git a/telegram/bot.py b/telegram/bot.py index 7f29f5d..d516b7d 100755 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -13,6 +13,8 @@ from pathlib import Path from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters +from telegram.session_manager import SessionManager +from telegram.claude_subprocess import ClaudeSubprocess # Setup logging logging.basicConfig( @@ -38,6 +40,10 @@ AUTHORIZED_FILE = Path(__file__).parent / 'authorized_users' # Inbox file for messages to Claude INBOX_FILE = Path(__file__).parent / 'inbox' +# Session manager and subprocess tracking +session_manager = SessionManager() +subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session + def get_authorized_users(): """Load authorized user IDs.""" if AUTHORIZED_FILE.exists(): @@ -57,6 +63,25 @@ def is_authorized(user_id: int) -> bool: """Check if user is authorized.""" return user_id in get_authorized_users() +def make_callbacks(bot, chat_id): + """Create callbacks for ClaudeSubprocess bound to specific chat.""" + async def on_output(text): + # Truncate to Telegram limit + if len(text) > 4000: + text = text[:4000] + "\n... (truncated)" + await bot.send_message(chat_id=chat_id, text=text) + + async def on_error(error): + await bot.send_message(chat_id=chat_id, text=f"Error: {error}") + + async def on_complete(): + pass # Phase 2 will add typing indicator cleanup + + async def on_status(status): + await bot.send_message(chat_id=chat_id, text=f"[{status}]") + + return on_output, on_error, on_complete, on_status + def run_command(cmd: list, timeout: int = 30) -> str: """Run a shell command and return output.""" try: @@ -112,6 +137,10 @@ async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): help_text = """ *Homelab Bot Commands* +*Claude Sessions:* +/new [persona] - Create new Claude session +/session - Switch to a session + *Status & Monitoring:* /status - Quick service overview /pbs - PBS backup status @@ -248,21 +277,181 @@ async def ping_host(update: Update, context: ContextTypes.DEFAULT_TYPE): output = result.stdout or result.stderr await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown') -async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Handle free text messages - save to inbox for Claude.""" +async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Create a new Claude session.""" if not is_authorized(update.effective_user.id): return - user = update.effective_user + if not context.args: + await update.message.reply_text("Usage: /new [persona]") + return + + name = context.args[0] + persona = context.args[1] if len(context.args) > 1 else None + + try: + # Create session + session_manager.create_session(name, persona=persona) + + # Auto-switch to new session + session_manager.switch_session(name) + + # Prepare reply + if persona: + reply = f"Session '{name}' created with persona '{persona}'." + else: + reply = f"Session '{name}' created." + + await update.message.reply_text(reply) + + # Spawn subprocess for the new session (but don't send message yet) + session_dir = session_manager.get_session_dir(name) + persona_config = session_manager.get_session(name) + persona_data = session_manager.load_persona(persona or 'default') + + # Create callbacks bound to this chat + callbacks = make_callbacks(context.bot, update.effective_chat.id) + + # Create subprocess + subprocesses[name] = ClaudeSubprocess( + session_dir=session_dir, + persona=persona_data, + on_output=callbacks[0], + on_error=callbacks[1], + on_complete=callbacks[2], + on_status=callbacks[3], + ) + + logger.info(f"Created session '{name}' with persona '{persona or 'default'}'") + + except ValueError as e: + # Session already exists + await update.message.reply_text( + f"Session '{name}' already exists. Use /session to switch to it." + ) + except FileNotFoundError as e: + # Persona not found + await update.message.reply_text(f"Error: {e}") + except Exception as e: + logger.error(f"Error creating session: {e}") + await update.message.reply_text(f"Error creating session: {e}") + +async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Switch to a different Claude session.""" + if not is_authorized(update.effective_user.id): + return + + if not context.args: + # Show usage and list available sessions + sessions = session_manager.list_sessions() + if not sessions: + await update.message.reply_text("No sessions found. Use /new to create one.") + return + + session_list = "\n".join( + f"- {s['name']} ({s['status']}) - {s.get('persona', 'default')}" + for s in sessions + ) + await update.message.reply_text( + f"Usage: /session \n\nAvailable sessions:\n{session_list}" + ) + return + + name = context.args[0] + + try: + # Check if session exists + if not session_manager.session_exists(name): + await update.message.reply_text( + f"Session '{name}' doesn't exist. Use /new to create it." + ) + return + + # Switch session + session_manager.switch_session(name) + + # Auto-spawn subprocess if not alive + if name not in subprocesses or not subprocesses[name].is_alive: + session_dir = session_manager.get_session_dir(name) + session_data = session_manager.get_session(name) + persona_name = session_data.get('persona', 'default') + persona_data = session_manager.load_persona(persona_name) + + # Create callbacks bound to this chat + callbacks = make_callbacks(context.bot, update.effective_chat.id) + + # Create subprocess + subprocesses[name] = ClaudeSubprocess( + session_dir=session_dir, + persona=persona_data, + on_output=callbacks[0], + on_error=callbacks[1], + on_complete=callbacks[2], + on_status=callbacks[3], + ) + + logger.info(f"Auto-spawned subprocess for session '{name}'") + + # Get persona for reply + session_data = session_manager.get_session(name) + persona_name = session_data.get('persona', 'default') + + if persona_name and persona_name != 'default': + reply = f"Switched to session '{name}' (persona: {persona_name})." + else: + reply = f"Switched to session '{name}'." + + await update.message.reply_text(reply) + + except Exception as e: + logger.error(f"Error switching session: {e}") + await update.message.reply_text(f"Error switching session: {e}") + +async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handle free text messages - route to active Claude session.""" + if not is_authorized(update.effective_user.id): + return + + # Check for active session + active_session = session_manager.get_active_session() + if not active_session: + await update.message.reply_text( + "No active session. Use /new to start one." + ) + return + message = update.message.text - timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + logger.info(f"Routing message to session '{active_session}': {message[:50]}...") - # Append to inbox file - with open(INBOX_FILE, 'a') as f: - f.write(f"[{timestamp}] {user.first_name}: {message}\n") + try: + # Get or create subprocess for active session + if active_session not in subprocesses or not subprocesses[active_session].is_alive: + session_dir = session_manager.get_session_dir(active_session) + session_data = session_manager.get_session(active_session) + persona_name = session_data.get('persona', 'default') + persona_data = session_manager.load_persona(persona_name) - # Silent save - no reply needed - logger.info(f"Inbox message from {user.first_name}: {message[:50]}...") + # Create callbacks bound to this chat + callbacks = make_callbacks(context.bot, update.effective_chat.id) + + # Create subprocess + subprocesses[active_session] = ClaudeSubprocess( + session_dir=session_dir, + persona=persona_data, + on_output=callbacks[0], + on_error=callbacks[1], + on_complete=callbacks[2], + on_status=callbacks[3], + ) + + logger.info(f"Auto-spawned subprocess for active session '{active_session}'") + + # Send message to Claude + await subprocesses[active_session].send_message(message) + + except Exception as e: + logger.error(f"Error handling message: {e}") + await update.message.reply_text(f"Error: {e}") async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle photo messages - download and save for Claude.""" @@ -344,6 +533,8 @@ def main(): app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("help", help_command)) app.add_handler(CommandHandler("chatid", chatid)) + app.add_handler(CommandHandler("new", new_session)) + app.add_handler(CommandHandler("session", switch_session_cmd)) app.add_handler(CommandHandler("status", status)) app.add_handler(CommandHandler("pbs", pbs)) app.add_handler(CommandHandler("pbs_status", pbs_status)) @@ -358,7 +549,7 @@ def main(): app.add_handler(MessageHandler(filters.COMMAND, unknown)) # Free text message handler (for messages to Claude) - app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) + app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message, block=False)) # Photo handler app.add_handler(MessageHandler(filters.PHOTO, handle_photo)) diff --git a/telegram/claude_subprocess.py b/telegram/claude_subprocess.py index 5834af7..8ec40f3 100644 --- a/telegram/claude_subprocess.py +++ b/telegram/claude_subprocess.py @@ -21,6 +21,7 @@ Based on research in: .planning/phases/01-session-process-foundation/01-RESEARCH """ import asyncio +import inspect import json import logging import os @@ -249,7 +250,10 @@ class ClaudeSubprocess: # Call completion callback if self.on_complete: try: - self.on_complete() + if inspect.iscoroutinefunction(self.on_complete): + asyncio.create_task(self.on_complete()) + else: + self.on_complete() except Exception as e: logger.error(f"Error in on_complete callback: {e}") @@ -316,7 +320,10 @@ class ClaudeSubprocess: text = block.get("text", "") if text and self.on_output: try: - self.on_output(text) + if inspect.iscoroutinefunction(self.on_output): + asyncio.create_task(self.on_output(text)) + else: + self.on_output(text) except Exception as e: logger.error(f"Error in on_output callback: {e}") @@ -330,7 +337,10 @@ class ClaudeSubprocess: if event.get("is_error") and self.on_error: error_msg = event.get("error", "Unknown error") try: - self.on_error(f"Claude error: {error_msg}") + if inspect.iscoroutinefunction(self.on_error): + asyncio.create_task(self.on_error(f"Claude error: {error_msg}")) + else: + self.on_error(f"Claude error: {error_msg}") except Exception as e: logger.error(f"Error in on_error callback: {e}") @@ -343,7 +353,10 @@ class ClaudeSubprocess: if subtype == "error" and self.on_error: error_msg = event.get("message", "System error") try: - self.on_error(f"System error: {error_msg}") + if inspect.iscoroutinefunction(self.on_error): + asyncio.create_task(self.on_error(f"System error: {error_msg}")) + else: + self.on_error(f"System error: {error_msg}") except Exception as e: logger.error(f"Error in on_error callback: {e}") @@ -369,7 +382,10 @@ class ClaudeSubprocess: # If line contains error, notify via callback if "error" in line.lower() and self.on_error: try: - self.on_error(f"Claude stderr: {line}") + if inspect.iscoroutinefunction(self.on_error): + asyncio.create_task(self.on_error(f"Claude stderr: {line}")) + else: + self.on_error(f"Claude stderr: {line}") except Exception as e: logger.error(f"Error in on_error callback: {e}") @@ -396,7 +412,10 @@ class ClaudeSubprocess: # Notify user if self.on_status: try: - self.on_status("Claude crashed, restarting with context preserved...") + if inspect.iscoroutinefunction(self.on_status): + asyncio.create_task(self.on_status("Claude crashed, restarting with context preserved...")) + else: + self.on_status("Claude crashed, restarting with context preserved...") except Exception as e: logger.error(f"Error in on_status callback: {e}")