feat(03-02): wire suspend/resume lifecycle with race locks, startup cleanup, and graceful shutdown

- Added suspend_session() callback for idle timer (terminates subprocess, updates metadata to suspended, silent)
- Added get_subprocess_lock() helper to prevent race between timeout and user message
- Updated handle_message/handle_photo/handle_document with resume logic (detects suspended, shows 'Resuming session...', spawns with --continue)
- Added idle timer reset in on_complete callback (timer only starts after Claude finishes)
- Added idle timer reset on user activity (message/photo/document)
- Updated new_session() to initialize idle timer after subprocess creation
- Updated switch_session_cmd() to initialize idle timer when auto-spawning subprocess
- Updated archive_session_cmd() to cancel idle timer and remove subprocess lock
- Updated model_cmd() to cancel idle timer when terminating subprocess
- Added cleanup_orphaned_subprocesses() to kill orphaned PIDs verified via /proc/cmdline at startup
- Added post_init() callback for startup cleanup
- Added post_shutdown() callback for graceful shutdown (terminates all subprocesses, cancels all timers)
- Updated help text with new commands

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Mikkel Georgsen 2026-02-04 23:36:43 +00:00
parent a7d0f4515f
commit 6ebdb4a555

View file

@ -7,9 +7,10 @@ Two-way interactive bot for homelab management and notifications.
import asyncio import asyncio
import logging import logging
import os import os
import signal
import subprocess import subprocess
import time import time
from datetime import datetime from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from telegram import Update from telegram import Update
@ -18,6 +19,7 @@ from session_manager import SessionManager
from claude_subprocess import ClaudeSubprocess from claude_subprocess import ClaudeSubprocess
from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop
from message_batcher import MessageBatcher from message_batcher import MessageBatcher
from idle_timer import SessionIdleTimer
# Setup logging # Setup logging
logging.basicConfig( logging.basicConfig(
@ -48,6 +50,8 @@ session_manager = SessionManager()
subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session
batchers: dict[str, MessageBatcher] = {} # Message batcher per session batchers: dict[str, MessageBatcher] = {} # Message batcher per session
typing_tasks: dict[str, tuple[asyncio.Task, asyncio.Event]] = {} # Typing indicator per session typing_tasks: dict[str, tuple[asyncio.Task, asyncio.Event]] = {} # Typing indicator per session
idle_timers: dict[str, SessionIdleTimer] = {} # Idle timer per session
subprocess_locks: dict[str, asyncio.Lock] = {} # Lock per session to prevent races
def get_authorized_users(): def get_authorized_users():
"""Load authorized user IDs.""" """Load authorized user IDs."""
@ -68,6 +72,50 @@ def is_authorized(user_id: int) -> bool:
"""Check if user is authorized.""" """Check if user is authorized."""
return user_id in get_authorized_users() return user_id in get_authorized_users()
def get_subprocess_lock(session_name: str) -> asyncio.Lock:
"""Get or create subprocess lock for a session."""
return subprocess_locks.setdefault(session_name, asyncio.Lock())
async def suspend_session(session_name: str):
"""Suspend a session after idle timeout (callback for idle timer)."""
async with get_subprocess_lock(session_name):
# Check if subprocess exists and is alive
if session_name not in subprocesses or not subprocesses[session_name].is_alive:
logger.debug(f"Session '{session_name}' already not running, updating metadata only")
session_manager.update_session(session_name, status='suspended', pid=None)
return
# Check if subprocess is busy (Claude is mid-processing)
if subprocesses[session_name].is_busy:
logger.info(f"Session '{session_name}' is busy, deferring suspension")
# Reset timer to try again later
if session_name in idle_timers:
idle_timers[session_name].reset()
return
# Store PID for logging
pid = subprocesses[session_name].pid
logger.info(f"Suspending session '{session_name}' (PID {pid}) after idle timeout")
# Terminate subprocess
await subprocesses[session_name].terminate()
del subprocesses[session_name]
# Flush and remove batcher if exists
if session_name in batchers:
await batchers[session_name].flush_immediately()
del batchers[session_name]
# Update session metadata
session_manager.update_session(session_name, status='suspended', pid=None)
# Cancel and remove idle timer
if session_name in idle_timers:
idle_timers[session_name].cancel()
del idle_timers[session_name]
logger.info(f"Session '{session_name}' suspended after idle timeout")
MODEL_ALIASES = { MODEL_ALIASES = {
"sonnet": "claude-sonnet-4-5-20250929", "sonnet": "claude-sonnet-4-5-20250929",
"opus": "claude-opus-4-5-20251101", "opus": "claude-opus-4-5-20251101",
@ -132,6 +180,10 @@ def make_callbacks(bot, chat_id, session_name: str):
# Stop typing indicator on completion # Stop typing indicator on completion
_stop_typing() _stop_typing()
# Reset idle timer (only start counting AFTER Claude finishes)
if session_name in idle_timers:
idle_timers[session_name].reset()
async def on_status(status): async def on_status(status):
await bot.send_message(chat_id=chat_id, text=f"[{status}]") await bot.send_message(chat_id=chat_id, text=f"[{status}]")
@ -233,7 +285,9 @@ async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
*Claude Sessions:* *Claude Sessions:*
/new <name> [persona] - Create new Claude session /new <name> [persona] - Create new Claude session
/session <name> - Switch to a session /session <name> - Switch to a session
/sessions - List all sessions with status
/model <name> - Switch model (sonnet/opus/haiku) /model <name> - Switch model (sonnet/opus/haiku)
/timeout <minutes> - Set idle timeout (1-120)
/archive <name> - Archive and remove a session /archive <name> - Archive and remove a session
*Status & Monitoring:* *Status & Monitoring:*
@ -417,6 +471,10 @@ async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE):
on_tool_use=callbacks['on_tool_use'], on_tool_use=callbacks['on_tool_use'],
) )
# Initialize idle timer for the new session
timeout_secs = session_manager.get_session_timeout(name)
idle_timers[name] = SessionIdleTimer(name, timeout_secs, on_timeout=suspend_session)
logger.info(f"Created session '{name}' with persona '{persona or 'default'}'") logger.info(f"Created session '{name}' with persona '{persona or 'default'}'")
except ValueError as e: except ValueError as e:
@ -499,6 +557,11 @@ async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE)
on_tool_use=callbacks['on_tool_use'], on_tool_use=callbacks['on_tool_use'],
) )
# Initialize idle timer for the switched-to session
timeout_secs = session_manager.get_session_timeout(name)
if name not in idle_timers:
idle_timers[name] = SessionIdleTimer(name, timeout_secs, on_timeout=suspend_session)
logger.info(f"Auto-spawned subprocess for session '{name}'") logger.info(f"Auto-spawned subprocess for session '{name}'")
# Get persona for reply # Get persona for reply
@ -549,6 +612,14 @@ async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE
await subprocesses[name].terminate() await subprocesses[name].terminate()
del subprocesses[name] del subprocesses[name]
# Cancel idle timer if exists
if name in idle_timers:
idle_timers[name].cancel()
del idle_timers[name]
# Remove subprocess lock if exists
subprocess_locks.pop(name, None)
# Archive the session # Archive the session
archive_path = session_manager.archive_session(name) archive_path = session_manager.archive_session(name)
size_mb = archive_path.stat().st_size / (1024 * 1024) size_mb = archive_path.stat().st_size / (1024 * 1024)
@ -583,6 +654,10 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
logger.info(f"[TIMING] Message received: session='{active_session}', age={msg_age:.1f}s, text={message[:50]}...") logger.info(f"[TIMING] Message received: session='{active_session}', age={msg_age:.1f}s, text={message[:50]}...")
try: try:
# Reset idle timer on user activity (even before processing message)
if active_session in idle_timers:
idle_timers[active_session].reset()
# Clean up stale typing task (from previous message completion) # Clean up stale typing task (from previous message completion)
if active_session in typing_tasks: if active_session in typing_tasks:
old_task, old_event = typing_tasks[active_session] old_task, old_event = typing_tasks[active_session]
@ -597,40 +672,75 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
) )
typing_tasks[active_session] = (typing_task, stop_typing) typing_tasks[active_session] = (typing_task, stop_typing)
# Get or create subprocess for active session (avoid double-start) # Acquire lock to prevent race with suspend_session
already_alive = active_session in subprocesses and subprocesses[active_session].is_alive lock = get_subprocess_lock(active_session)
if not already_alive: async with lock:
session_dir = session_manager.get_session_dir(active_session) # Get or create subprocess for active session (avoid double-start)
persona_data = load_persona_for_session(active_session) already_alive = active_session in subprocesses and subprocesses[active_session].is_alive
if not already_alive:
session_dir = session_manager.get_session_dir(active_session)
persona_data = load_persona_for_session(active_session)
# Create callbacks bound to this chat (typing looked up dynamically) # Check if this is a resume (has .claude/ dir with history)
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) is_resume = (session_dir / ".claude").exists()
if is_resume:
# Calculate idle duration
session_data = session_manager.get_session(active_session)
last_active_str = session_data.get('last_active')
if last_active_str:
last_active = datetime.fromisoformat(last_active_str)
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
if idle_duration > 60:
# Show idle duration if >1 min
idle_minutes = int(idle_duration / 60)
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
else:
await update.message.reply_text("Resuming session...")
else:
await update.message.reply_text("Resuming session...")
# Create subprocess # Create callbacks bound to this chat (typing looked up dynamically)
subprocess_inst = ClaudeSubprocess( callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
session_dir=session_dir,
persona=persona_data,
on_output=callbacks['on_output'],
on_error=callbacks['on_error'],
on_complete=callbacks['on_complete'],
on_status=callbacks['on_status'],
on_tool_use=callbacks['on_tool_use'],
)
await subprocess_inst.start()
subprocesses[active_session] = subprocess_inst
else:
subprocess_inst = subprocesses[active_session]
subprocess_state = "reused" if already_alive else "cold-start" # Create subprocess
logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocess_inst.is_busy}") subprocess_inst = ClaudeSubprocess(
session_dir=session_dir,
persona=persona_data,
on_output=callbacks['on_output'],
on_error=callbacks['on_error'],
on_complete=callbacks['on_complete'],
on_status=callbacks['on_status'],
on_tool_use=callbacks['on_tool_use'],
)
await subprocess_inst.start()
subprocesses[active_session] = subprocess_inst
# Get or create message batcher for this session # Update metadata with PID and status
if active_session not in batchers: now_iso = datetime.now(timezone.utc).isoformat()
# Batcher callback sends message to subprocess session_manager.update_session(
batchers[active_session] = MessageBatcher( active_session,
callback=subprocess_inst.send_message, status='active',
debounce_seconds=2.0 last_active=now_iso,
) pid=subprocess_inst.pid
)
else:
subprocess_inst = subprocesses[active_session]
subprocess_state = "reused" if already_alive else "cold-start"
logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocess_inst.is_busy}")
# Get or create message batcher for this session
if active_session not in batchers:
# Batcher callback sends message to subprocess
batchers[active_session] = MessageBatcher(
callback=subprocess_inst.send_message,
debounce_seconds=2.0
)
# Create/reset idle timer for the session (outside lock)
timeout_secs = session_manager.get_session_timeout(active_session)
if active_session not in idle_timers:
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
# Add message to batcher (typing indicator already running) # Add message to batcher (typing indicator already running)
await batchers[active_session].add_message(message) await batchers[active_session].add_message(message)
@ -676,6 +786,10 @@ async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Get caption if any # Get caption if any
caption = update.message.caption or "" caption = update.message.caption or ""
# Reset idle timer on user activity
if active_session in idle_timers:
idle_timers[active_session].reset()
# Clean up stale typing task # Clean up stale typing task
if active_session in typing_tasks: if active_session in typing_tasks:
old_task, old_event = typing_tasks[active_session] old_task, old_event = typing_tasks[active_session]
@ -696,23 +810,56 @@ async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
else: else:
analysis_message = f"I've attached a photo: {filename}. Please describe what you see." analysis_message = f"I've attached a photo: {filename}. Please describe what you see."
# Get or create subprocess # Acquire lock to prevent race with suspend_session
if active_session not in subprocesses or not subprocesses[active_session].is_alive: lock = get_subprocess_lock(active_session)
persona_data = load_persona_for_session(active_session) async with lock:
# Get or create subprocess
if active_session not in subprocesses or not subprocesses[active_session].is_alive:
# Check if this is a resume
is_resume = (session_dir / ".claude").exists()
if is_resume:
session_data = session_manager.get_session(active_session)
last_active_str = session_data.get('last_active')
if last_active_str:
last_active = datetime.fromisoformat(last_active_str)
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
if idle_duration > 60:
idle_minutes = int(idle_duration / 60)
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
else:
await update.message.reply_text("Resuming session...")
else:
await update.message.reply_text("Resuming session...")
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) persona_data = load_persona_for_session(active_session)
subprocess_inst = ClaudeSubprocess( callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
session_dir=session_dir,
persona=persona_data, subprocess_inst = ClaudeSubprocess(
on_output=callbacks['on_output'], session_dir=session_dir,
on_error=callbacks['on_error'], persona=persona_data,
on_complete=callbacks['on_complete'], on_output=callbacks['on_output'],
on_status=callbacks['on_status'], on_error=callbacks['on_error'],
on_tool_use=callbacks['on_tool_use'], on_complete=callbacks['on_complete'],
) on_status=callbacks['on_status'],
await subprocess_inst.start() on_tool_use=callbacks['on_tool_use'],
subprocesses[active_session] = subprocess_inst )
await subprocess_inst.start()
subprocesses[active_session] = subprocess_inst
# Update metadata with PID and status
now_iso = datetime.now(timezone.utc).isoformat()
session_manager.update_session(
active_session,
status='active',
last_active=now_iso,
pid=subprocess_inst.pid
)
# Create/reset idle timer (outside lock)
timeout_secs = session_manager.get_session_timeout(active_session)
if active_session not in idle_timers:
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
# Send analysis message directly (not batched) # Send analysis message directly (not batched)
await subprocesses[active_session].send_message(analysis_message) await subprocesses[active_session].send_message(analysis_message)
@ -751,6 +898,10 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Get caption if any # Get caption if any
caption = update.message.caption or "" caption = update.message.caption or ""
# Reset idle timer on user activity
if active_session in idle_timers:
idle_timers[active_session].reset()
# Clean up stale typing task # Clean up stale typing task
if active_session in typing_tasks: if active_session in typing_tasks:
old_task, old_event = typing_tasks[active_session] old_task, old_event = typing_tasks[active_session]
@ -771,23 +922,56 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
else: else:
notify_message = f"User uploaded file: {filename}" notify_message = f"User uploaded file: {filename}"
# Get or create subprocess # Acquire lock to prevent race with suspend_session
if active_session not in subprocesses or not subprocesses[active_session].is_alive: lock = get_subprocess_lock(active_session)
persona_data = load_persona_for_session(active_session) async with lock:
# Get or create subprocess
if active_session not in subprocesses or not subprocesses[active_session].is_alive:
# Check if this is a resume
is_resume = (session_dir / ".claude").exists()
if is_resume:
session_data = session_manager.get_session(active_session)
last_active_str = session_data.get('last_active')
if last_active_str:
last_active = datetime.fromisoformat(last_active_str)
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
if idle_duration > 60:
idle_minutes = int(idle_duration / 60)
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
else:
await update.message.reply_text("Resuming session...")
else:
await update.message.reply_text("Resuming session...")
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session) persona_data = load_persona_for_session(active_session)
subprocess_inst = ClaudeSubprocess( callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
session_dir=session_dir,
persona=persona_data, subprocess_inst = ClaudeSubprocess(
on_output=callbacks['on_output'], session_dir=session_dir,
on_error=callbacks['on_error'], persona=persona_data,
on_complete=callbacks['on_complete'], on_output=callbacks['on_output'],
on_status=callbacks['on_status'], on_error=callbacks['on_error'],
on_tool_use=callbacks['on_tool_use'], on_complete=callbacks['on_complete'],
) on_status=callbacks['on_status'],
await subprocess_inst.start() on_tool_use=callbacks['on_tool_use'],
subprocesses[active_session] = subprocess_inst )
await subprocess_inst.start()
subprocesses[active_session] = subprocess_inst
# Update metadata with PID and status
now_iso = datetime.now(timezone.utc).isoformat()
session_manager.update_session(
active_session,
status='active',
last_active=now_iso,
pid=subprocess_inst.pid
)
# Create/reset idle timer (outside lock)
timeout_secs = session_manager.get_session_timeout(active_session)
if active_session not in idle_timers:
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
# Send notification directly (not batched) # Send notification directly (not batched)
await subprocesses[active_session].send_message(notify_message) await subprocesses[active_session].send_message(notify_message)
@ -833,6 +1017,11 @@ async def model_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
await batchers[active_session].flush_immediately() await batchers[active_session].flush_immediately()
del batchers[active_session] del batchers[active_session]
# Cancel idle timer (will be recreated on next message)
if active_session in idle_timers:
idle_timers[active_session].cancel()
del idle_timers[active_session]
await update.message.reply_text(f"Model set to {resolved} for session '{active_session}'.") await update.message.reply_text(f"Model set to {resolved} for session '{active_session}'.")
logger.info(f"Model changed to {resolved} for session '{active_session}'") logger.info(f"Model changed to {resolved} for session '{active_session}'")
@ -842,10 +1031,81 @@ async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
return return
await update.message.reply_text("Unknown command. Use /help for available commands.") await update.message.reply_text("Unknown command. Use /help for available commands.")
async def cleanup_orphaned_subprocesses():
"""Clean up orphaned subprocesses at bot startup."""
orphan_count = 0
sessions = session_manager.list_sessions()
for session in sessions:
name = session['name']
pid = session.get('pid')
if pid is not None:
# Check if process exists
try:
os.kill(pid, 0)
# Process exists - verify it's a claude process
try:
with open(f"/proc/{pid}/cmdline", "r") as f:
cmdline = f.read()
if "claude" in cmdline:
# It's a claude process - kill it
logger.info(f"Killing orphaned claude process for session '{name}': PID {pid}")
try:
os.kill(pid, signal.SIGTERM)
await asyncio.sleep(2)
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass # Already dead
except ProcessLookupError:
pass # Process died between checks
orphan_count += 1
else:
logger.warning(f"PID {pid} for session '{name}' exists but is not a claude process")
except (FileNotFoundError, PermissionError):
# Can't read cmdline - assume it's not our process
logger.warning(f"Cannot verify PID {pid} for session '{name}'")
except ProcessLookupError:
# Process doesn't exist
pass
# Update metadata - clear PID and set status to suspended
session_manager.update_session(name, pid=None, status='suspended')
# Also suspend any sessions that are marked active but have no PID
elif session.get('status') == 'active':
session_manager.update_session(name, status='suspended')
if orphan_count > 0:
logger.info(f"Cleaned up {orphan_count} orphaned subprocess(es)")
else:
logger.info("No orphaned subprocesses found")
async def post_init(application):
"""Run startup cleanup."""
await cleanup_orphaned_subprocesses()
async def post_shutdown(application):
"""Clean up subprocesses and timers on bot shutdown."""
logger.info("Bot shutting down, cleaning up...")
# Cancel all idle timers
for name, timer in idle_timers.items():
timer.cancel()
# Terminate all subprocesses
for name, proc in list(subprocesses.items()):
if proc.is_alive:
logger.info(f"Terminating subprocess for '{name}'")
await proc.terminate()
logger.info("Cleanup complete")
def main(): def main():
"""Start the bot.""" """Start the bot."""
# Create application # Create application with lifecycle callbacks
app = Application.builder().token(TOKEN).build() app = Application.builder().token(TOKEN).post_init(post_init).post_shutdown(post_shutdown).build()
# Add handlers # Add handlers
app.add_handler(CommandHandler("start", start)) app.add_handler(CommandHandler("start", start))