homelab/telegram/bot.py
Mikkel Georgsen 06c52466f2 feat(03-02): add /timeout and /sessions commands
- Added timeout_cmd() to set per-session idle timeout (1-120 min range)
  - Shows current timeout when called without args
  - Updates session metadata and existing idle timer
- Added sessions_cmd() to list all sessions with status
  - Shows LIVE (subprocess running) or IDLE (suspended) status
  - Displays persona and relative last-active time (e.g., '2m ago')
  - Marks current active session with arrow
- Registered both commands in main()
- Commands already added to help text in Task 1

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 23:37:30 +00:00

1239 lines
47 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Homelab Telegram Bot
Two-way interactive bot for homelab management and notifications.
"""
import asyncio
import logging
import os
import signal
import subprocess
import time
from datetime import datetime, timezone
from pathlib import Path
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
from session_manager import SessionManager
from claude_subprocess import ClaudeSubprocess
from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop
from message_batcher import MessageBatcher
from idle_timer import SessionIdleTimer
# Setup logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Load credentials
CREDENTIALS_FILE = Path(__file__).parent / 'credentials'
config = {}
with open(CREDENTIALS_FILE) as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
TOKEN = config['TELEGRAM_BOT_TOKEN']
# Authorized users file (stores chat IDs that are allowed to use the bot)
AUTHORIZED_FILE = Path(__file__).parent / 'authorized_users'
# Inbox file for messages to Claude
INBOX_FILE = Path(__file__).parent / 'inbox'
# Session manager, subprocess tracking, and message batchers
session_manager = SessionManager()
subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session
batchers: dict[str, MessageBatcher] = {} # Message batcher per session
typing_tasks: dict[str, tuple[asyncio.Task, asyncio.Event]] = {} # Typing indicator per session
idle_timers: dict[str, SessionIdleTimer] = {} # Idle timer per session
subprocess_locks: dict[str, asyncio.Lock] = {} # Lock per session to prevent races
def get_authorized_users():
"""Load authorized user IDs."""
if AUTHORIZED_FILE.exists():
with open(AUTHORIZED_FILE) as f:
return set(int(line.strip()) for line in f if line.strip())
return set()
def add_authorized_user(user_id: int):
"""Add a user to authorized list."""
users = get_authorized_users()
users.add(user_id)
with open(AUTHORIZED_FILE, 'w') as f:
for uid in users:
f.write(f"{uid}\n")
def is_authorized(user_id: int) -> bool:
"""Check if user is authorized."""
return user_id in get_authorized_users()
def get_subprocess_lock(session_name: str) -> asyncio.Lock:
"""Get or create subprocess lock for a session."""
return subprocess_locks.setdefault(session_name, asyncio.Lock())
async def suspend_session(session_name: str):
"""Suspend a session after idle timeout (callback for idle timer)."""
async with get_subprocess_lock(session_name):
# Check if subprocess exists and is alive
if session_name not in subprocesses or not subprocesses[session_name].is_alive:
logger.debug(f"Session '{session_name}' already not running, updating metadata only")
session_manager.update_session(session_name, status='suspended', pid=None)
return
# Check if subprocess is busy (Claude is mid-processing)
if subprocesses[session_name].is_busy:
logger.info(f"Session '{session_name}' is busy, deferring suspension")
# Reset timer to try again later
if session_name in idle_timers:
idle_timers[session_name].reset()
return
# Store PID for logging
pid = subprocesses[session_name].pid
logger.info(f"Suspending session '{session_name}' (PID {pid}) after idle timeout")
# Terminate subprocess
await subprocesses[session_name].terminate()
del subprocesses[session_name]
# Flush and remove batcher if exists
if session_name in batchers:
await batchers[session_name].flush_immediately()
del batchers[session_name]
# Update session metadata
session_manager.update_session(session_name, status='suspended', pid=None)
# Cancel and remove idle timer
if session_name in idle_timers:
idle_timers[session_name].cancel()
del idle_timers[session_name]
logger.info(f"Session '{session_name}' suspended after idle timeout")
MODEL_ALIASES = {
"sonnet": "claude-sonnet-4-5-20250929",
"opus": "claude-opus-4-5-20251101",
"haiku": "claude-haiku-4-5-20251001",
}
def load_persona_for_session(session_name: str) -> dict:
"""Load persona with session-level model override applied."""
session_data = session_manager.get_session(session_name)
persona_name = session_data.get('persona', 'default')
persona = session_manager.load_persona(persona_name)
# Apply session model override if set
model_override = session_data.get('model_override')
if model_override:
if 'settings' not in persona:
persona['settings'] = {}
persona['settings']['model'] = model_override
return persona
def make_callbacks(bot, chat_id, session_name: str):
"""Create callbacks for ClaudeSubprocess bound to specific chat with dynamic typing control.
Typing events are looked up dynamically from typing_tasks dict so callbacks
always reference the CURRENT typing indicator, not a stale one from creation time.
"""
def _stop_typing():
"""Stop the current typing indicator for this session (if any)."""
if session_name in typing_tasks:
_, stop_event = typing_tasks[session_name]
stop_event.set()
async def on_output(text):
t0 = time.monotonic()
# Split message using smart splitting
chunks = split_message_smart(text)
for chunk in chunks:
try:
# Try sending with MarkdownV2
escaped = escape_markdown_v2(chunk)
await bot.send_message(chat_id=chat_id, text=escaped, parse_mode='MarkdownV2')
except Exception as e:
# Fall back to plain text if MarkdownV2 fails
logger.warning(f"MarkdownV2 parse failed, falling back to plain text: {e}")
await bot.send_message(chat_id=chat_id, text=chunk)
# Stop typing indicator (dynamic lookup)
_stop_typing()
elapsed = time.monotonic() - t0
logger.info(f"[TIMING] Telegram send: {elapsed:.3f}s ({len(text)} chars, {len(chunks)} chunks)")
async def on_error(error):
await bot.send_message(chat_id=chat_id, text=f"Error: {error}")
# Stop typing indicator on error
_stop_typing()
async def on_complete():
# Stop typing indicator on completion
_stop_typing()
# Reset idle timer (only start counting AFTER Claude finishes)
if session_name in idle_timers:
idle_timers[session_name].reset()
async def on_status(status):
await bot.send_message(chat_id=chat_id, text=f"[{status}]")
async def on_tool_use(tool_name: str, tool_input: dict):
"""Format and send tool call progress notification."""
# Extract meaningful target from tool_input
target = ""
if tool_name == "Bash":
cmd = tool_input.get("command", "")
target = cmd[:50] + "..." if len(cmd) > 50 else cmd
elif tool_name == "Read":
target = tool_input.get("file_path", "")
elif tool_name == "Edit":
target = tool_input.get("file_path", "")
elif tool_name == "Write":
target = tool_input.get("file_path", "")
elif tool_name == "Grep":
pattern = tool_input.get("pattern", "")
target = f"pattern: {pattern}"
elif tool_name == "Glob":
pattern = tool_input.get("pattern", "")
target = f"pattern: {pattern}"
else:
target = str(tool_input)[:50]
# Format as italic message
message = f"_{tool_name}: {target}_"
try:
await bot.send_message(chat_id=chat_id, text=message, parse_mode='MarkdownV2')
except Exception as e:
# Fall back to plain text
logger.warning(f"Failed to send tool notification with MarkdownV2: {e}")
await bot.send_message(chat_id=chat_id, text=f"{tool_name}: {target}")
return {
'on_output': on_output,
'on_error': on_error,
'on_complete': on_complete,
'on_status': on_status,
'on_tool_use': on_tool_use,
}
def run_command(cmd: list, timeout: int = 30) -> str:
"""Run a shell command and return output."""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env={**os.environ, 'PATH': f"/home/mikkel/bin:{os.environ.get('PATH', '')}"}
)
output = result.stdout or result.stderr or "No output"
# Telegram has 4096 char limit per message
if len(output) > 4000:
output = output[:4000] + "\n... (truncated)"
return output
except subprocess.TimeoutExpired:
return "Command timed out"
except Exception as e:
return f"Error: {e}"
# Command handlers
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command - first contact with bot."""
user = update.effective_user
chat_id = update.effective_chat.id
if not is_authorized(user.id):
# First user becomes authorized automatically
if not get_authorized_users():
add_authorized_user(user.id)
await update.message.reply_text(
f"Welcome {user.first_name}! You're now authorized as the admin.\n"
f"Your chat ID: {chat_id}\n\n"
f"Use /help to see available commands."
)
else:
await update.message.reply_text(
f"Unauthorized. Your user ID: {user.id}\n"
"Contact the admin to get access."
)
return
await update.message.reply_text(
f"Welcome back {user.first_name}!\n"
f"Use /help to see available commands."
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /help command."""
if not is_authorized(update.effective_user.id):
return
help_text = """
*Homelab Bot Commands*
*Claude Sessions:*
/new <name> [persona] - Create new Claude session
/session <name> - Switch to a session
/sessions - List all sessions with status
/model <name> - Switch model (sonnet/opus/haiku)
/timeout <minutes> - Set idle timeout (1-120)
/archive <name> - Archive and remove a session
*Status & Monitoring:*
/status - Quick service overview
/pbs - PBS backup status
/backups - Last backup per VM/CT
/beszel - Server metrics
/kuma - Uptime Kuma status
*PBS Commands:*
/pbs\\_status - Full PBS overview
/pbs\\_errors - Recent errors
/pbs\\_gc - Garbage collection status
*System:*
/ping <host> - Ping a host
/help - This help message
/chatid - Show your chat ID
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def chatid(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show chat ID (useful for notifications)."""
await update.message.reply_text(f"Chat ID: `{update.effective_chat.id}`", parse_mode='Markdown')
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Quick status overview."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Checking services...")
# Quick checks
output_lines = ["*Homelab Status*\n"]
# Check key services via ping
services = [
("NPM", "10.5.0.1"),
("DNS", "10.5.0.2"),
("PBS", "10.5.0.6"),
("Dockge", "10.5.0.10"),
("Forgejo", "10.5.0.14"),
]
for name, ip in services:
result = subprocess.run(
["ping", "-c", "1", "-W", "1", ip],
capture_output=True
)
status = "up" if result.returncode == 0 else "down"
output_lines.append(f"{'' if status == 'up' else ''} {name} ({ip})")
await update.message.reply_text("\n".join(output_lines), parse_mode='Markdown')
async def pbs(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS quick status."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Fetching PBS status...")
output = run_command(["/home/mikkel/bin/pbs", "status"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def pbs_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS full status."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Fetching PBS status...")
output = run_command(["/home/mikkel/bin/pbs", "status"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def backups(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS backups per VM/CT."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Fetching backup status...")
output = run_command(["/home/mikkel/bin/pbs", "backups"], timeout=60)
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def pbs_errors(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS errors."""
if not is_authorized(update.effective_user.id):
return
output = run_command(["/home/mikkel/bin/pbs", "errors"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def pbs_gc(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS garbage collection status."""
if not is_authorized(update.effective_user.id):
return
output = run_command(["/home/mikkel/bin/pbs", "gc"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def beszel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Beszel server metrics."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Fetching server metrics...")
output = run_command(["/home/mikkel/bin/beszel", "status"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def kuma(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Uptime Kuma status."""
if not is_authorized(update.effective_user.id):
return
output = run_command(["/home/mikkel/bin/kuma", "list"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def ping_host(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Ping a host."""
if not is_authorized(update.effective_user.id):
return
if not context.args:
await update.message.reply_text("Usage: /ping <host>")
return
host = context.args[0]
# Basic validation
if not host.replace('.', '').replace('-', '').isalnum():
await update.message.reply_text("Invalid hostname")
return
result = subprocess.run(
["ping", "-c", "3", "-W", "2", host],
capture_output=True,
text=True,
timeout=10
)
output = result.stdout or result.stderr
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Create a new Claude session."""
if not is_authorized(update.effective_user.id):
return
if not context.args:
await update.message.reply_text("Usage: /new <name> [persona]")
return
name = context.args[0]
persona = context.args[1] if len(context.args) > 1 else None
try:
# Create session
session_manager.create_session(name, persona=persona)
# Auto-switch to new session
session_manager.switch_session(name)
# Prepare reply
if persona:
reply = f"Session '{name}' created with persona '{persona}'."
else:
reply = f"Session '{name}' created."
await update.message.reply_text(reply)
# Spawn subprocess for the new session (but don't send message yet)
session_dir = session_manager.get_session_dir(name)
persona_data = load_persona_for_session(name)
# Create callbacks bound to this chat (typing looked up dynamically)
callbacks = make_callbacks(context.bot, update.effective_chat.id, name)
# Create subprocess
subprocesses[name] = ClaudeSubprocess(
session_dir=session_dir,
persona=persona_data,
on_output=callbacks['on_output'],
on_error=callbacks['on_error'],
on_complete=callbacks['on_complete'],
on_status=callbacks['on_status'],
on_tool_use=callbacks['on_tool_use'],
)
# Initialize idle timer for the new session
timeout_secs = session_manager.get_session_timeout(name)
idle_timers[name] = SessionIdleTimer(name, timeout_secs, on_timeout=suspend_session)
logger.info(f"Created session '{name}' with persona '{persona or 'default'}'")
except ValueError as e:
# Session already exists
await update.message.reply_text(
f"Session '{name}' already exists. Use /session <name> to switch to it."
)
except FileNotFoundError as e:
# Persona not found
await update.message.reply_text(f"Error: {e}")
except Exception as e:
logger.error(f"Error creating session: {e}")
await update.message.reply_text(f"Error creating session: {e}")
async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Switch to a different Claude session."""
if not is_authorized(update.effective_user.id):
return
if not context.args:
# Show usage and list available sessions
sessions = session_manager.list_sessions()
if not sessions:
await update.message.reply_text("No sessions found. Use /new <name> to create one.")
return
session_list = "\n".join(
f"- {s['name']} ({s['status']}) - {s.get('persona', 'default')}"
for s in sessions
)
await update.message.reply_text(
f"Usage: /session <name>\n\nAvailable sessions:\n{session_list}"
)
return
name = context.args[0]
try:
# Check if session exists
if not session_manager.session_exists(name):
await update.message.reply_text(
f"Session '{name}' doesn't exist. Use /new <name> to create it."
)
return
# Stop typing indicator for previous session if running
prev_session = session_manager.get_active_session()
if prev_session and prev_session in typing_tasks:
task, stop_event = typing_tasks[prev_session]
stop_event.set()
try:
await task
except Exception:
pass
del typing_tasks[prev_session]
# Flush batcher for previous session immediately
if prev_session and prev_session in batchers:
await batchers[prev_session].flush_immediately()
# Switch session
session_manager.switch_session(name)
# Auto-spawn subprocess if not alive
if name not in subprocesses or not subprocesses[name].is_alive:
session_dir = session_manager.get_session_dir(name)
persona_data = load_persona_for_session(name)
# Create callbacks bound to this chat (typing looked up dynamically)
callbacks = make_callbacks(context.bot, update.effective_chat.id, name)
# Create subprocess
subprocesses[name] = ClaudeSubprocess(
session_dir=session_dir,
persona=persona_data,
on_output=callbacks['on_output'],
on_error=callbacks['on_error'],
on_complete=callbacks['on_complete'],
on_status=callbacks['on_status'],
on_tool_use=callbacks['on_tool_use'],
)
# Initialize idle timer for the switched-to session
timeout_secs = session_manager.get_session_timeout(name)
if name not in idle_timers:
idle_timers[name] = SessionIdleTimer(name, timeout_secs, on_timeout=suspend_session)
logger.info(f"Auto-spawned subprocess for session '{name}'")
# Get persona for reply
session_data = session_manager.get_session(name)
persona_name = session_data.get('persona', 'default')
if persona_name and persona_name != 'default':
reply = f"Switched to session '{name}' (persona: {persona_name})."
else:
reply = f"Switched to session '{name}'."
await update.message.reply_text(reply)
except Exception as e:
logger.error(f"Error switching session: {e}")
await update.message.reply_text(f"Error switching session: {e}")
async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Archive a Claude session (compress and remove)."""
if not is_authorized(update.effective_user.id):
return
if not context.args:
await update.message.reply_text("Usage: /archive <name>")
return
name = context.args[0]
try:
# Stop typing indicator if running
if name in typing_tasks:
task, stop_event = typing_tasks[name]
stop_event.set()
try:
await task
except Exception:
pass
del typing_tasks[name]
# Flush and remove batcher
if name in batchers:
await batchers[name].flush_immediately()
del batchers[name]
# Terminate subprocess if running
if name in subprocesses:
if subprocesses[name].is_alive:
await subprocesses[name].terminate()
del subprocesses[name]
# Cancel idle timer if exists
if name in idle_timers:
idle_timers[name].cancel()
del idle_timers[name]
# Remove subprocess lock if exists
subprocess_locks.pop(name, None)
# Archive the session
archive_path = session_manager.archive_session(name)
size_mb = archive_path.stat().st_size / (1024 * 1024)
await update.message.reply_text(
f"Session '{name}' archived ({size_mb:.1f} MB)\n{archive_path.name}"
)
logger.info(f"Archived session '{name}' to {archive_path}")
except ValueError as e:
await update.message.reply_text(str(e))
except Exception as e:
logger.error(f"Error archiving session: {e}")
await update.message.reply_text(f"Error archiving session: {e}")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle free text messages - route to active Claude session with batching."""
if not is_authorized(update.effective_user.id):
return
# Check for active session
active_session = session_manager.get_active_session()
if not active_session:
await update.message.reply_text(
"No active session. Use /new <name> to start one."
)
return
message = update.message.text
t_start = time.monotonic()
# Telegram message timestamp vs now = delivery delay
msg_age = (datetime.utcnow() - update.message.date.replace(tzinfo=None)).total_seconds()
logger.info(f"[TIMING] Message received: session='{active_session}', age={msg_age:.1f}s, text={message[:50]}...")
try:
# Reset idle timer on user activity (even before processing message)
if active_session in idle_timers:
idle_timers[active_session].reset()
# Clean up stale typing task (from previous message completion)
if active_session in typing_tasks:
old_task, old_event = typing_tasks[active_session]
if old_task.done():
del typing_tasks[active_session]
# Start fresh typing indicator for this message
if active_session not in typing_tasks:
stop_typing = asyncio.Event()
typing_task = asyncio.create_task(
typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing)
)
typing_tasks[active_session] = (typing_task, stop_typing)
# Acquire lock to prevent race with suspend_session
lock = get_subprocess_lock(active_session)
async with lock:
# Get or create subprocess for active session (avoid double-start)
already_alive = active_session in subprocesses and subprocesses[active_session].is_alive
if not already_alive:
session_dir = session_manager.get_session_dir(active_session)
persona_data = load_persona_for_session(active_session)
# Check if this is a resume (has .claude/ dir with history)
is_resume = (session_dir / ".claude").exists()
if is_resume:
# Calculate idle duration
session_data = session_manager.get_session(active_session)
last_active_str = session_data.get('last_active')
if last_active_str:
last_active = datetime.fromisoformat(last_active_str)
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
if idle_duration > 60:
# Show idle duration if >1 min
idle_minutes = int(idle_duration / 60)
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
else:
await update.message.reply_text("Resuming session...")
else:
await update.message.reply_text("Resuming session...")
# Create callbacks bound to this chat (typing looked up dynamically)
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
# Create subprocess
subprocess_inst = ClaudeSubprocess(
session_dir=session_dir,
persona=persona_data,
on_output=callbacks['on_output'],
on_error=callbacks['on_error'],
on_complete=callbacks['on_complete'],
on_status=callbacks['on_status'],
on_tool_use=callbacks['on_tool_use'],
)
await subprocess_inst.start()
subprocesses[active_session] = subprocess_inst
# Update metadata with PID and status
now_iso = datetime.now(timezone.utc).isoformat()
session_manager.update_session(
active_session,
status='active',
last_active=now_iso,
pid=subprocess_inst.pid
)
else:
subprocess_inst = subprocesses[active_session]
subprocess_state = "reused" if already_alive else "cold-start"
logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocess_inst.is_busy}")
# Get or create message batcher for this session
if active_session not in batchers:
# Batcher callback sends message to subprocess
batchers[active_session] = MessageBatcher(
callback=subprocess_inst.send_message,
debounce_seconds=2.0
)
# Create/reset idle timer for the session (outside lock)
timeout_secs = session_manager.get_session_timeout(active_session)
if active_session not in idle_timers:
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
# Add message to batcher (typing indicator already running)
await batchers[active_session].add_message(message)
except Exception as e:
logger.error(f"Error handling message: {e}")
await update.message.reply_text(f"Error: {e}")
# Stop typing indicator on error
if active_session in typing_tasks:
task, stop_event = typing_tasks[active_session]
stop_event.set()
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle photo messages - save to session directory and auto-analyze."""
if not is_authorized(update.effective_user.id):
return
# Check for active session
active_session = session_manager.get_active_session()
if not active_session:
await update.message.reply_text(
"No active session. Use /new <name> to start one, then send the photo again."
)
return
user = update.effective_user
file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Get session directory
session_dir = session_manager.get_session_dir(active_session)
# Get the largest photo (best quality)
photo = update.message.photo[-1]
file = await context.bot.get_file(photo.file_id)
# Download to session directory
filename = f"photo_{file_timestamp}.jpg"
filepath = session_dir / filename
await file.download_to_drive(filepath)
logger.info(f"Photo saved to session '{active_session}': {filepath}")
# Get caption if any
caption = update.message.caption or ""
# Reset idle timer on user activity
if active_session in idle_timers:
idle_timers[active_session].reset()
# Clean up stale typing task
if active_session in typing_tasks:
old_task, old_event = typing_tasks[active_session]
if old_task.done():
del typing_tasks[active_session]
# Start typing indicator
if active_session not in typing_tasks:
stop_typing = asyncio.Event()
typing_task = asyncio.create_task(
typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing)
)
typing_tasks[active_session] = (typing_task, stop_typing)
# Auto-analyze: send message to Claude
if caption:
analysis_message = f"I've attached a photo: {filename}. {caption}"
else:
analysis_message = f"I've attached a photo: {filename}. Please describe what you see."
# Acquire lock to prevent race with suspend_session
lock = get_subprocess_lock(active_session)
async with lock:
# Get or create subprocess
if active_session not in subprocesses or not subprocesses[active_session].is_alive:
# Check if this is a resume
is_resume = (session_dir / ".claude").exists()
if is_resume:
session_data = session_manager.get_session(active_session)
last_active_str = session_data.get('last_active')
if last_active_str:
last_active = datetime.fromisoformat(last_active_str)
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
if idle_duration > 60:
idle_minutes = int(idle_duration / 60)
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
else:
await update.message.reply_text("Resuming session...")
else:
await update.message.reply_text("Resuming session...")
persona_data = load_persona_for_session(active_session)
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
subprocess_inst = ClaudeSubprocess(
session_dir=session_dir,
persona=persona_data,
on_output=callbacks['on_output'],
on_error=callbacks['on_error'],
on_complete=callbacks['on_complete'],
on_status=callbacks['on_status'],
on_tool_use=callbacks['on_tool_use'],
)
await subprocess_inst.start()
subprocesses[active_session] = subprocess_inst
# Update metadata with PID and status
now_iso = datetime.now(timezone.utc).isoformat()
session_manager.update_session(
active_session,
status='active',
last_active=now_iso,
pid=subprocess_inst.pid
)
# Create/reset idle timer (outside lock)
timeout_secs = session_manager.get_session_timeout(active_session)
if active_session not in idle_timers:
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
# Send analysis message directly (not batched)
await subprocesses[active_session].send_message(analysis_message)
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle document/file messages - save to session directory and notify Claude."""
if not is_authorized(update.effective_user.id):
return
# Check for active session
active_session = session_manager.get_active_session()
if not active_session:
await update.message.reply_text(
"No active session. Use /new <name> to start one, then send the file again."
)
return
user = update.effective_user
file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Get session directory
session_dir = session_manager.get_session_dir(active_session)
# Get document info
doc = update.message.document
original_name = doc.file_name or "unknown"
file = await context.bot.get_file(doc.file_id)
# Download with timestamp prefix to avoid collisions
filename = f"{file_timestamp}_{original_name}"
filepath = session_dir / filename
await file.download_to_drive(filepath)
logger.info(f"Document saved to session '{active_session}': {filepath}")
# Get caption if any
caption = update.message.caption or ""
# Reset idle timer on user activity
if active_session in idle_timers:
idle_timers[active_session].reset()
# Clean up stale typing task
if active_session in typing_tasks:
old_task, old_event = typing_tasks[active_session]
if old_task.done():
del typing_tasks[active_session]
# Start typing indicator
if active_session not in typing_tasks:
stop_typing = asyncio.Event()
typing_task = asyncio.create_task(
typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing)
)
typing_tasks[active_session] = (typing_task, stop_typing)
# Notify Claude
if caption:
notify_message = f"{caption}\nThe file {filename} has been saved to your session."
else:
notify_message = f"User uploaded file: {filename}"
# Acquire lock to prevent race with suspend_session
lock = get_subprocess_lock(active_session)
async with lock:
# Get or create subprocess
if active_session not in subprocesses or not subprocesses[active_session].is_alive:
# Check if this is a resume
is_resume = (session_dir / ".claude").exists()
if is_resume:
session_data = session_manager.get_session(active_session)
last_active_str = session_data.get('last_active')
if last_active_str:
last_active = datetime.fromisoformat(last_active_str)
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
if idle_duration > 60:
idle_minutes = int(idle_duration / 60)
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
else:
await update.message.reply_text("Resuming session...")
else:
await update.message.reply_text("Resuming session...")
persona_data = load_persona_for_session(active_session)
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
subprocess_inst = ClaudeSubprocess(
session_dir=session_dir,
persona=persona_data,
on_output=callbacks['on_output'],
on_error=callbacks['on_error'],
on_complete=callbacks['on_complete'],
on_status=callbacks['on_status'],
on_tool_use=callbacks['on_tool_use'],
)
await subprocess_inst.start()
subprocesses[active_session] = subprocess_inst
# Update metadata with PID and status
now_iso = datetime.now(timezone.utc).isoformat()
session_manager.update_session(
active_session,
status='active',
last_active=now_iso,
pid=subprocess_inst.pid
)
# Create/reset idle timer (outside lock)
timeout_secs = session_manager.get_session_timeout(active_session)
if active_session not in idle_timers:
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
# Send notification directly (not batched)
await subprocesses[active_session].send_message(notify_message)
async def model_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Switch model for current session. Persists across session switches."""
if not is_authorized(update.effective_user.id):
return
active_session = session_manager.get_active_session()
if not active_session:
await update.message.reply_text("No active session. Use /new <name> to start one.")
return
if not context.args:
# Show current model
session_data = session_manager.get_session(active_session)
model_override = session_data.get('model_override')
persona_name = session_data.get('persona', 'default')
persona = session_manager.load_persona(persona_name)
current = model_override or persona.get('settings', {}).get('model', 'default')
aliases = "\n".join(f" {k}{v}" for k, v in MODEL_ALIASES.items())
await update.message.reply_text(
f"Current model: {current}\n\nUsage: /model <name>\n\nAliases:\n{aliases}"
)
return
model = context.args[0]
# Resolve alias
resolved = MODEL_ALIASES.get(model, model)
# Persist to session metadata
session_manager.update_session(active_session, model_override=resolved)
# Terminate current subprocess so next message spawns with new model
if active_session in subprocesses:
if subprocesses[active_session].is_alive:
await subprocesses[active_session].terminate()
del subprocesses[active_session]
# Clean up batcher too
if active_session in batchers:
await batchers[active_session].flush_immediately()
del batchers[active_session]
# Cancel idle timer (will be recreated on next message)
if active_session in idle_timers:
idle_timers[active_session].cancel()
del idle_timers[active_session]
await update.message.reply_text(f"Model set to {resolved} for session '{active_session}'.")
logger.info(f"Model changed to {resolved} for session '{active_session}'")
async def timeout_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Set idle timeout for current session."""
if not is_authorized(update.effective_user.id):
return
active_session = session_manager.get_active_session()
if not active_session:
await update.message.reply_text("No active session. Use /new <name> to start one.")
return
if not context.args:
# Show current timeout
timeout_secs = session_manager.get_session_timeout(active_session)
minutes = timeout_secs // 60
await update.message.reply_text(
f"Idle timeout: {minutes} minutes\n\nUsage: /timeout <minutes> (1-120)"
)
return
# Parse and validate timeout value
try:
minutes = int(context.args[0])
if minutes < 1 or minutes > 120:
await update.message.reply_text("Timeout must be between 1 and 120 minutes")
return
except ValueError:
await update.message.reply_text("Invalid number. Usage: /timeout <minutes>")
return
# Convert to seconds and update
timeout_seconds = minutes * 60
session_manager.update_session(active_session, idle_timeout=timeout_seconds)
# Update existing idle timer if present
if active_session in idle_timers:
idle_timers[active_session].timeout_seconds = timeout_seconds
idle_timers[active_session].reset()
await update.message.reply_text(
f"Idle timeout set to {minutes} minutes for session '{active_session}'."
)
logger.info(f"Idle timeout set to {minutes} minutes for session '{active_session}'")
async def sessions_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""List all sessions with status."""
if not is_authorized(update.effective_user.id):
return
sessions = session_manager.list_sessions()
if not sessions:
await update.message.reply_text("No sessions. Use /new <name> to create one.")
return
active_session = session_manager.get_active_session()
def format_relative_time(iso_str):
"""Format ISO timestamp as relative time (e.g., '2m ago')."""
dt = datetime.fromisoformat(iso_str)
delta = datetime.now(timezone.utc) - dt
secs = delta.total_seconds()
if secs < 60:
return "just now"
if secs < 3600:
return f"{int(secs/60)}m ago"
if secs < 86400:
return f"{int(secs/3600)}h ago"
return f"{int(secs/86400)}d ago"
lines = []
for s in sessions:
name = s['name']
persona = s.get('persona', 'default')
# Determine status
if name in subprocesses and subprocesses[name].is_alive:
status = "🟢 LIVE"
elif s.get('status') == 'suspended':
status = "⚪ IDLE"
else:
status = s.get('status', 'unknown').upper()
# Format last active time
last_active = s.get('last_active', '')
rel_time = format_relative_time(last_active) if last_active else "unknown"
# Mark current active session
marker = "" if name == active_session else " "
lines.append(f"{marker}{status} `{name}` ({persona}) - {rel_time}")
await update.message.reply_text("\n".join(lines), parse_mode='Markdown')
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle unknown commands."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Unknown command. Use /help for available commands.")
async def cleanup_orphaned_subprocesses():
"""Clean up orphaned subprocesses at bot startup."""
orphan_count = 0
sessions = session_manager.list_sessions()
for session in sessions:
name = session['name']
pid = session.get('pid')
if pid is not None:
# Check if process exists
try:
os.kill(pid, 0)
# Process exists - verify it's a claude process
try:
with open(f"/proc/{pid}/cmdline", "r") as f:
cmdline = f.read()
if "claude" in cmdline:
# It's a claude process - kill it
logger.info(f"Killing orphaned claude process for session '{name}': PID {pid}")
try:
os.kill(pid, signal.SIGTERM)
await asyncio.sleep(2)
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass # Already dead
except ProcessLookupError:
pass # Process died between checks
orphan_count += 1
else:
logger.warning(f"PID {pid} for session '{name}' exists but is not a claude process")
except (FileNotFoundError, PermissionError):
# Can't read cmdline - assume it's not our process
logger.warning(f"Cannot verify PID {pid} for session '{name}'")
except ProcessLookupError:
# Process doesn't exist
pass
# Update metadata - clear PID and set status to suspended
session_manager.update_session(name, pid=None, status='suspended')
# Also suspend any sessions that are marked active but have no PID
elif session.get('status') == 'active':
session_manager.update_session(name, status='suspended')
if orphan_count > 0:
logger.info(f"Cleaned up {orphan_count} orphaned subprocess(es)")
else:
logger.info("No orphaned subprocesses found")
async def post_init(application):
"""Run startup cleanup."""
await cleanup_orphaned_subprocesses()
async def post_shutdown(application):
"""Clean up subprocesses and timers on bot shutdown."""
logger.info("Bot shutting down, cleaning up...")
# Cancel all idle timers
for name, timer in idle_timers.items():
timer.cancel()
# Terminate all subprocesses
for name, proc in list(subprocesses.items()):
if proc.is_alive:
logger.info(f"Terminating subprocess for '{name}'")
await proc.terminate()
logger.info("Cleanup complete")
def main():
"""Start the bot."""
# Create application with lifecycle callbacks
app = Application.builder().token(TOKEN).post_init(post_init).post_shutdown(post_shutdown).build()
# Add handlers
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("help", help_command))
app.add_handler(CommandHandler("chatid", chatid))
app.add_handler(CommandHandler("new", new_session))
app.add_handler(CommandHandler("session", switch_session_cmd))
app.add_handler(CommandHandler("sessions", sessions_cmd))
app.add_handler(CommandHandler("archive", archive_session_cmd))
app.add_handler(CommandHandler("model", model_cmd))
app.add_handler(CommandHandler("timeout", timeout_cmd))
app.add_handler(CommandHandler("status", status))
app.add_handler(CommandHandler("pbs", pbs))
app.add_handler(CommandHandler("pbs_status", pbs_status))
app.add_handler(CommandHandler("backups", backups))
app.add_handler(CommandHandler("pbs_errors", pbs_errors))
app.add_handler(CommandHandler("pbs_gc", pbs_gc))
app.add_handler(CommandHandler("beszel", beszel))
app.add_handler(CommandHandler("kuma", kuma))
app.add_handler(CommandHandler("ping", ping_host))
# Unknown command handler
app.add_handler(MessageHandler(filters.COMMAND, unknown))
# Free text message handler (for messages to Claude)
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message, block=False))
# Photo handler
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
# Document/file handler
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
# Start polling
logger.info("Starting Homelab Bot...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()