From 6ebdb4a55555d6bd84acd635ac3960217108e889 Mon Sep 17 00:00:00 2001 From: Mikkel Georgsen Date: Wed, 4 Feb 2026 23:36:43 +0000 Subject: [PATCH] feat(03-02): wire suspend/resume lifecycle with race locks, startup cleanup, and graceful shutdown - Added suspend_session() callback for idle timer (terminates subprocess, updates metadata to suspended, silent) - Added get_subprocess_lock() helper to prevent race between timeout and user message - Updated handle_message/handle_photo/handle_document with resume logic (detects suspended, shows 'Resuming session...', spawns with --continue) - Added idle timer reset in on_complete callback (timer only starts after Claude finishes) - Added idle timer reset on user activity (message/photo/document) - Updated new_session() to initialize idle timer after subprocess creation - Updated switch_session_cmd() to initialize idle timer when auto-spawning subprocess - Updated archive_session_cmd() to cancel idle timer and remove subprocess lock - Updated model_cmd() to cancel idle timer when terminating subprocess - Added cleanup_orphaned_subprocesses() to kill orphaned PIDs verified via /proc/cmdline at startup - Added post_init() callback for startup cleanup - Added post_shutdown() callback for graceful shutdown (terminates all subprocesses, cancels all timers) - Updated help text with new commands Co-Authored-By: Claude Opus 4.5 --- telegram/bot.py | 386 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 323 insertions(+), 63 deletions(-) diff --git a/telegram/bot.py b/telegram/bot.py index 9b5be09..0f7dc51 100755 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -7,9 +7,10 @@ Two-way interactive bot for homelab management and notifications. import asyncio import logging import os +import signal import subprocess import time -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from telegram import Update @@ -18,6 +19,7 @@ from session_manager import SessionManager from claude_subprocess import ClaudeSubprocess from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop from message_batcher import MessageBatcher +from idle_timer import SessionIdleTimer # Setup logging logging.basicConfig( @@ -48,6 +50,8 @@ session_manager = SessionManager() subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session batchers: dict[str, MessageBatcher] = {} # Message batcher per session typing_tasks: dict[str, tuple[asyncio.Task, asyncio.Event]] = {} # Typing indicator per session +idle_timers: dict[str, SessionIdleTimer] = {} # Idle timer per session +subprocess_locks: dict[str, asyncio.Lock] = {} # Lock per session to prevent races def get_authorized_users(): """Load authorized user IDs.""" @@ -68,6 +72,50 @@ def is_authorized(user_id: int) -> bool: """Check if user is authorized.""" return user_id in get_authorized_users() +def get_subprocess_lock(session_name: str) -> asyncio.Lock: + """Get or create subprocess lock for a session.""" + return subprocess_locks.setdefault(session_name, asyncio.Lock()) + +async def suspend_session(session_name: str): + """Suspend a session after idle timeout (callback for idle timer).""" + async with get_subprocess_lock(session_name): + # Check if subprocess exists and is alive + if session_name not in subprocesses or not subprocesses[session_name].is_alive: + logger.debug(f"Session '{session_name}' already not running, updating metadata only") + session_manager.update_session(session_name, status='suspended', pid=None) + return + + # Check if subprocess is busy (Claude is mid-processing) + if subprocesses[session_name].is_busy: + logger.info(f"Session '{session_name}' is busy, deferring suspension") + # Reset timer to try again later + if session_name in idle_timers: + idle_timers[session_name].reset() + return + + # Store PID for logging + pid = subprocesses[session_name].pid + logger.info(f"Suspending session '{session_name}' (PID {pid}) after idle timeout") + + # Terminate subprocess + await subprocesses[session_name].terminate() + del subprocesses[session_name] + + # Flush and remove batcher if exists + if session_name in batchers: + await batchers[session_name].flush_immediately() + del batchers[session_name] + + # Update session metadata + session_manager.update_session(session_name, status='suspended', pid=None) + + # Cancel and remove idle timer + if session_name in idle_timers: + idle_timers[session_name].cancel() + del idle_timers[session_name] + + logger.info(f"Session '{session_name}' suspended after idle timeout") + MODEL_ALIASES = { "sonnet": "claude-sonnet-4-5-20250929", "opus": "claude-opus-4-5-20251101", @@ -132,6 +180,10 @@ def make_callbacks(bot, chat_id, session_name: str): # Stop typing indicator on completion _stop_typing() + # Reset idle timer (only start counting AFTER Claude finishes) + if session_name in idle_timers: + idle_timers[session_name].reset() + async def on_status(status): await bot.send_message(chat_id=chat_id, text=f"[{status}]") @@ -233,7 +285,9 @@ async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): *Claude Sessions:* /new [persona] - Create new Claude session /session - Switch to a session +/sessions - List all sessions with status /model - Switch model (sonnet/opus/haiku) +/timeout - Set idle timeout (1-120) /archive - Archive and remove a session *Status & Monitoring:* @@ -417,6 +471,10 @@ async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE): on_tool_use=callbacks['on_tool_use'], ) + # Initialize idle timer for the new session + timeout_secs = session_manager.get_session_timeout(name) + idle_timers[name] = SessionIdleTimer(name, timeout_secs, on_timeout=suspend_session) + logger.info(f"Created session '{name}' with persona '{persona or 'default'}'") except ValueError as e: @@ -499,6 +557,11 @@ async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) on_tool_use=callbacks['on_tool_use'], ) + # Initialize idle timer for the switched-to session + timeout_secs = session_manager.get_session_timeout(name) + if name not in idle_timers: + idle_timers[name] = SessionIdleTimer(name, timeout_secs, on_timeout=suspend_session) + logger.info(f"Auto-spawned subprocess for session '{name}'") # Get persona for reply @@ -549,6 +612,14 @@ async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE await subprocesses[name].terminate() del subprocesses[name] + # Cancel idle timer if exists + if name in idle_timers: + idle_timers[name].cancel() + del idle_timers[name] + + # Remove subprocess lock if exists + subprocess_locks.pop(name, None) + # Archive the session archive_path = session_manager.archive_session(name) size_mb = archive_path.stat().st_size / (1024 * 1024) @@ -583,6 +654,10 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): logger.info(f"[TIMING] Message received: session='{active_session}', age={msg_age:.1f}s, text={message[:50]}...") try: + # Reset idle timer on user activity (even before processing message) + if active_session in idle_timers: + idle_timers[active_session].reset() + # Clean up stale typing task (from previous message completion) if active_session in typing_tasks: old_task, old_event = typing_tasks[active_session] @@ -597,40 +672,75 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): ) typing_tasks[active_session] = (typing_task, stop_typing) - # Get or create subprocess for active session (avoid double-start) - already_alive = active_session in subprocesses and subprocesses[active_session].is_alive - if not already_alive: - session_dir = session_manager.get_session_dir(active_session) - persona_data = load_persona_for_session(active_session) + # Acquire lock to prevent race with suspend_session + lock = get_subprocess_lock(active_session) + async with lock: + # Get or create subprocess for active session (avoid double-start) + already_alive = active_session in subprocesses and subprocesses[active_session].is_alive + if not already_alive: + session_dir = session_manager.get_session_dir(active_session) + persona_data = load_persona_for_session(active_session) - # Create callbacks bound to this chat (typing looked up dynamically) - callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) + # Check if this is a resume (has .claude/ dir with history) + is_resume = (session_dir / ".claude").exists() + if is_resume: + # Calculate idle duration + session_data = session_manager.get_session(active_session) + last_active_str = session_data.get('last_active') + if last_active_str: + last_active = datetime.fromisoformat(last_active_str) + idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds() + if idle_duration > 60: + # Show idle duration if >1 min + idle_minutes = int(idle_duration / 60) + await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...") + else: + await update.message.reply_text("Resuming session...") + else: + await update.message.reply_text("Resuming session...") - # Create subprocess - subprocess_inst = ClaudeSubprocess( - session_dir=session_dir, - persona=persona_data, - on_output=callbacks['on_output'], - on_error=callbacks['on_error'], - on_complete=callbacks['on_complete'], - on_status=callbacks['on_status'], - on_tool_use=callbacks['on_tool_use'], - ) - await subprocess_inst.start() - subprocesses[active_session] = subprocess_inst - else: - subprocess_inst = subprocesses[active_session] + # Create callbacks bound to this chat (typing looked up dynamically) + callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) - subprocess_state = "reused" if already_alive else "cold-start" - logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocess_inst.is_busy}") + # Create subprocess + subprocess_inst = ClaudeSubprocess( + session_dir=session_dir, + persona=persona_data, + on_output=callbacks['on_output'], + on_error=callbacks['on_error'], + on_complete=callbacks['on_complete'], + on_status=callbacks['on_status'], + on_tool_use=callbacks['on_tool_use'], + ) + await subprocess_inst.start() + subprocesses[active_session] = subprocess_inst - # Get or create message batcher for this session - if active_session not in batchers: - # Batcher callback sends message to subprocess - batchers[active_session] = MessageBatcher( - callback=subprocess_inst.send_message, - debounce_seconds=2.0 - ) + # Update metadata with PID and status + now_iso = datetime.now(timezone.utc).isoformat() + session_manager.update_session( + active_session, + status='active', + last_active=now_iso, + pid=subprocess_inst.pid + ) + else: + subprocess_inst = subprocesses[active_session] + + subprocess_state = "reused" if already_alive else "cold-start" + logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocess_inst.is_busy}") + + # Get or create message batcher for this session + if active_session not in batchers: + # Batcher callback sends message to subprocess + batchers[active_session] = MessageBatcher( + callback=subprocess_inst.send_message, + debounce_seconds=2.0 + ) + + # Create/reset idle timer for the session (outside lock) + timeout_secs = session_manager.get_session_timeout(active_session) + if active_session not in idle_timers: + idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session) # Add message to batcher (typing indicator already running) await batchers[active_session].add_message(message) @@ -676,6 +786,10 @@ async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): # Get caption if any caption = update.message.caption or "" + # Reset idle timer on user activity + if active_session in idle_timers: + idle_timers[active_session].reset() + # Clean up stale typing task if active_session in typing_tasks: old_task, old_event = typing_tasks[active_session] @@ -696,23 +810,56 @@ async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): else: analysis_message = f"I've attached a photo: {filename}. Please describe what you see." - # Get or create subprocess - if active_session not in subprocesses or not subprocesses[active_session].is_alive: - persona_data = load_persona_for_session(active_session) + # Acquire lock to prevent race with suspend_session + lock = get_subprocess_lock(active_session) + async with lock: + # Get or create subprocess + if active_session not in subprocesses or not subprocesses[active_session].is_alive: + # Check if this is a resume + is_resume = (session_dir / ".claude").exists() + if is_resume: + session_data = session_manager.get_session(active_session) + last_active_str = session_data.get('last_active') + if last_active_str: + last_active = datetime.fromisoformat(last_active_str) + idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds() + if idle_duration > 60: + idle_minutes = int(idle_duration / 60) + await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...") + else: + await update.message.reply_text("Resuming session...") + else: + await update.message.reply_text("Resuming session...") - callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) + persona_data = load_persona_for_session(active_session) - subprocess_inst = ClaudeSubprocess( - session_dir=session_dir, - persona=persona_data, - on_output=callbacks['on_output'], - on_error=callbacks['on_error'], - on_complete=callbacks['on_complete'], - on_status=callbacks['on_status'], - on_tool_use=callbacks['on_tool_use'], - ) - await subprocess_inst.start() - subprocesses[active_session] = subprocess_inst + callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) + + subprocess_inst = ClaudeSubprocess( + session_dir=session_dir, + persona=persona_data, + on_output=callbacks['on_output'], + on_error=callbacks['on_error'], + on_complete=callbacks['on_complete'], + on_status=callbacks['on_status'], + on_tool_use=callbacks['on_tool_use'], + ) + await subprocess_inst.start() + subprocesses[active_session] = subprocess_inst + + # Update metadata with PID and status + now_iso = datetime.now(timezone.utc).isoformat() + session_manager.update_session( + active_session, + status='active', + last_active=now_iso, + pid=subprocess_inst.pid + ) + + # Create/reset idle timer (outside lock) + timeout_secs = session_manager.get_session_timeout(active_session) + if active_session not in idle_timers: + idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session) # Send analysis message directly (not batched) await subprocesses[active_session].send_message(analysis_message) @@ -751,6 +898,10 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): # Get caption if any caption = update.message.caption or "" + # Reset idle timer on user activity + if active_session in idle_timers: + idle_timers[active_session].reset() + # Clean up stale typing task if active_session in typing_tasks: old_task, old_event = typing_tasks[active_session] @@ -771,23 +922,56 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): else: notify_message = f"User uploaded file: {filename}" - # Get or create subprocess - if active_session not in subprocesses or not subprocesses[active_session].is_alive: - persona_data = load_persona_for_session(active_session) + # Acquire lock to prevent race with suspend_session + lock = get_subprocess_lock(active_session) + async with lock: + # Get or create subprocess + if active_session not in subprocesses or not subprocesses[active_session].is_alive: + # Check if this is a resume + is_resume = (session_dir / ".claude").exists() + if is_resume: + session_data = session_manager.get_session(active_session) + last_active_str = session_data.get('last_active') + if last_active_str: + last_active = datetime.fromisoformat(last_active_str) + idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds() + if idle_duration > 60: + idle_minutes = int(idle_duration / 60) + await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...") + else: + await update.message.reply_text("Resuming session...") + else: + await update.message.reply_text("Resuming session...") - callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) + persona_data = load_persona_for_session(active_session) - subprocess_inst = ClaudeSubprocess( - session_dir=session_dir, - persona=persona_data, - on_output=callbacks['on_output'], - on_error=callbacks['on_error'], - on_complete=callbacks['on_complete'], - on_status=callbacks['on_status'], - on_tool_use=callbacks['on_tool_use'], - ) - await subprocess_inst.start() - subprocesses[active_session] = subprocess_inst + callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) + + subprocess_inst = ClaudeSubprocess( + session_dir=session_dir, + persona=persona_data, + on_output=callbacks['on_output'], + on_error=callbacks['on_error'], + on_complete=callbacks['on_complete'], + on_status=callbacks['on_status'], + on_tool_use=callbacks['on_tool_use'], + ) + await subprocess_inst.start() + subprocesses[active_session] = subprocess_inst + + # Update metadata with PID and status + now_iso = datetime.now(timezone.utc).isoformat() + session_manager.update_session( + active_session, + status='active', + last_active=now_iso, + pid=subprocess_inst.pid + ) + + # Create/reset idle timer (outside lock) + timeout_secs = session_manager.get_session_timeout(active_session) + if active_session not in idle_timers: + idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session) # Send notification directly (not batched) await subprocesses[active_session].send_message(notify_message) @@ -833,6 +1017,11 @@ async def model_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE): await batchers[active_session].flush_immediately() del batchers[active_session] + # Cancel idle timer (will be recreated on next message) + if active_session in idle_timers: + idle_timers[active_session].cancel() + del idle_timers[active_session] + await update.message.reply_text(f"Model set to {resolved} for session '{active_session}'.") logger.info(f"Model changed to {resolved} for session '{active_session}'") @@ -842,10 +1031,81 @@ async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE): return await update.message.reply_text("Unknown command. Use /help for available commands.") +async def cleanup_orphaned_subprocesses(): + """Clean up orphaned subprocesses at bot startup.""" + orphan_count = 0 + sessions = session_manager.list_sessions() + + for session in sessions: + name = session['name'] + pid = session.get('pid') + + if pid is not None: + # Check if process exists + try: + os.kill(pid, 0) + # Process exists - verify it's a claude process + try: + with open(f"/proc/{pid}/cmdline", "r") as f: + cmdline = f.read() + if "claude" in cmdline: + # It's a claude process - kill it + logger.info(f"Killing orphaned claude process for session '{name}': PID {pid}") + try: + os.kill(pid, signal.SIGTERM) + await asyncio.sleep(2) + try: + os.kill(pid, signal.SIGKILL) + except ProcessLookupError: + pass # Already dead + except ProcessLookupError: + pass # Process died between checks + orphan_count += 1 + else: + logger.warning(f"PID {pid} for session '{name}' exists but is not a claude process") + except (FileNotFoundError, PermissionError): + # Can't read cmdline - assume it's not our process + logger.warning(f"Cannot verify PID {pid} for session '{name}'") + except ProcessLookupError: + # Process doesn't exist + pass + + # Update metadata - clear PID and set status to suspended + session_manager.update_session(name, pid=None, status='suspended') + + # Also suspend any sessions that are marked active but have no PID + elif session.get('status') == 'active': + session_manager.update_session(name, status='suspended') + + if orphan_count > 0: + logger.info(f"Cleaned up {orphan_count} orphaned subprocess(es)") + else: + logger.info("No orphaned subprocesses found") + +async def post_init(application): + """Run startup cleanup.""" + await cleanup_orphaned_subprocesses() + +async def post_shutdown(application): + """Clean up subprocesses and timers on bot shutdown.""" + logger.info("Bot shutting down, cleaning up...") + + # Cancel all idle timers + for name, timer in idle_timers.items(): + timer.cancel() + + # Terminate all subprocesses + for name, proc in list(subprocesses.items()): + if proc.is_alive: + logger.info(f"Terminating subprocess for '{name}'") + await proc.terminate() + + logger.info("Cleanup complete") + def main(): """Start the bot.""" - # Create application - app = Application.builder().token(TOKEN).build() + # Create application with lifecycle callbacks + app = Application.builder().token(TOKEN).post_init(post_init).post_shutdown(post_shutdown).build() # Add handlers app.add_handler(CommandHandler("start", start))