- Added timeout_cmd() to set per-session idle timeout (1-120 min range) - Shows current timeout when called without args - Updates session metadata and existing idle timer - Added sessions_cmd() to list all sessions with status - Shows LIVE (subprocess running) or IDLE (suspended) status - Displays persona and relative last-active time (e.g., '2m ago') - Marks current active session with arrow - Registered both commands in main() - Commands already added to help text in Task 1 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1239 lines
47 KiB
Python
Executable file
1239 lines
47 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Homelab Telegram Bot
|
|
Two-way interactive bot for homelab management and notifications.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from telegram import Update
|
|
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
|
|
from session_manager import SessionManager
|
|
from claude_subprocess import ClaudeSubprocess
|
|
from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop
|
|
from message_batcher import MessageBatcher
|
|
from idle_timer import SessionIdleTimer
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
level=logging.INFO
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Load credentials
|
|
CREDENTIALS_FILE = Path(__file__).parent / 'credentials'
|
|
config = {}
|
|
with open(CREDENTIALS_FILE) as f:
|
|
for line in f:
|
|
if '=' in line:
|
|
key, value = line.strip().split('=', 1)
|
|
config[key] = value
|
|
|
|
TOKEN = config['TELEGRAM_BOT_TOKEN']
|
|
|
|
# Authorized users file (stores chat IDs that are allowed to use the bot)
|
|
AUTHORIZED_FILE = Path(__file__).parent / 'authorized_users'
|
|
|
|
# Inbox file for messages to Claude
|
|
INBOX_FILE = Path(__file__).parent / 'inbox'
|
|
|
|
# Session manager, subprocess tracking, and message batchers
|
|
session_manager = SessionManager()
|
|
subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session
|
|
batchers: dict[str, MessageBatcher] = {} # Message batcher per session
|
|
typing_tasks: dict[str, tuple[asyncio.Task, asyncio.Event]] = {} # Typing indicator per session
|
|
idle_timers: dict[str, SessionIdleTimer] = {} # Idle timer per session
|
|
subprocess_locks: dict[str, asyncio.Lock] = {} # Lock per session to prevent races
|
|
|
|
def get_authorized_users():
|
|
"""Load authorized user IDs."""
|
|
if AUTHORIZED_FILE.exists():
|
|
with open(AUTHORIZED_FILE) as f:
|
|
return set(int(line.strip()) for line in f if line.strip())
|
|
return set()
|
|
|
|
def add_authorized_user(user_id: int):
|
|
"""Add a user to authorized list."""
|
|
users = get_authorized_users()
|
|
users.add(user_id)
|
|
with open(AUTHORIZED_FILE, 'w') as f:
|
|
for uid in users:
|
|
f.write(f"{uid}\n")
|
|
|
|
def is_authorized(user_id: int) -> bool:
|
|
"""Check if user is authorized."""
|
|
return user_id in get_authorized_users()
|
|
|
|
def get_subprocess_lock(session_name: str) -> asyncio.Lock:
|
|
"""Get or create subprocess lock for a session."""
|
|
return subprocess_locks.setdefault(session_name, asyncio.Lock())
|
|
|
|
async def suspend_session(session_name: str):
|
|
"""Suspend a session after idle timeout (callback for idle timer)."""
|
|
async with get_subprocess_lock(session_name):
|
|
# Check if subprocess exists and is alive
|
|
if session_name not in subprocesses or not subprocesses[session_name].is_alive:
|
|
logger.debug(f"Session '{session_name}' already not running, updating metadata only")
|
|
session_manager.update_session(session_name, status='suspended', pid=None)
|
|
return
|
|
|
|
# Check if subprocess is busy (Claude is mid-processing)
|
|
if subprocesses[session_name].is_busy:
|
|
logger.info(f"Session '{session_name}' is busy, deferring suspension")
|
|
# Reset timer to try again later
|
|
if session_name in idle_timers:
|
|
idle_timers[session_name].reset()
|
|
return
|
|
|
|
# Store PID for logging
|
|
pid = subprocesses[session_name].pid
|
|
logger.info(f"Suspending session '{session_name}' (PID {pid}) after idle timeout")
|
|
|
|
# Terminate subprocess
|
|
await subprocesses[session_name].terminate()
|
|
del subprocesses[session_name]
|
|
|
|
# Flush and remove batcher if exists
|
|
if session_name in batchers:
|
|
await batchers[session_name].flush_immediately()
|
|
del batchers[session_name]
|
|
|
|
# Update session metadata
|
|
session_manager.update_session(session_name, status='suspended', pid=None)
|
|
|
|
# Cancel and remove idle timer
|
|
if session_name in idle_timers:
|
|
idle_timers[session_name].cancel()
|
|
del idle_timers[session_name]
|
|
|
|
logger.info(f"Session '{session_name}' suspended after idle timeout")
|
|
|
|
MODEL_ALIASES = {
|
|
"sonnet": "claude-sonnet-4-5-20250929",
|
|
"opus": "claude-opus-4-5-20251101",
|
|
"haiku": "claude-haiku-4-5-20251001",
|
|
}
|
|
|
|
def load_persona_for_session(session_name: str) -> dict:
|
|
"""Load persona with session-level model override applied."""
|
|
session_data = session_manager.get_session(session_name)
|
|
persona_name = session_data.get('persona', 'default')
|
|
persona = session_manager.load_persona(persona_name)
|
|
|
|
# Apply session model override if set
|
|
model_override = session_data.get('model_override')
|
|
if model_override:
|
|
if 'settings' not in persona:
|
|
persona['settings'] = {}
|
|
persona['settings']['model'] = model_override
|
|
|
|
return persona
|
|
|
|
def make_callbacks(bot, chat_id, session_name: str):
|
|
"""Create callbacks for ClaudeSubprocess bound to specific chat with dynamic typing control.
|
|
|
|
Typing events are looked up dynamically from typing_tasks dict so callbacks
|
|
always reference the CURRENT typing indicator, not a stale one from creation time.
|
|
"""
|
|
def _stop_typing():
|
|
"""Stop the current typing indicator for this session (if any)."""
|
|
if session_name in typing_tasks:
|
|
_, stop_event = typing_tasks[session_name]
|
|
stop_event.set()
|
|
|
|
async def on_output(text):
|
|
t0 = time.monotonic()
|
|
|
|
# Split message using smart splitting
|
|
chunks = split_message_smart(text)
|
|
|
|
for chunk in chunks:
|
|
try:
|
|
# Try sending with MarkdownV2
|
|
escaped = escape_markdown_v2(chunk)
|
|
await bot.send_message(chat_id=chat_id, text=escaped, parse_mode='MarkdownV2')
|
|
except Exception as e:
|
|
# Fall back to plain text if MarkdownV2 fails
|
|
logger.warning(f"MarkdownV2 parse failed, falling back to plain text: {e}")
|
|
await bot.send_message(chat_id=chat_id, text=chunk)
|
|
|
|
# Stop typing indicator (dynamic lookup)
|
|
_stop_typing()
|
|
|
|
elapsed = time.monotonic() - t0
|
|
logger.info(f"[TIMING] Telegram send: {elapsed:.3f}s ({len(text)} chars, {len(chunks)} chunks)")
|
|
|
|
async def on_error(error):
|
|
await bot.send_message(chat_id=chat_id, text=f"Error: {error}")
|
|
# Stop typing indicator on error
|
|
_stop_typing()
|
|
|
|
async def on_complete():
|
|
# Stop typing indicator on completion
|
|
_stop_typing()
|
|
|
|
# Reset idle timer (only start counting AFTER Claude finishes)
|
|
if session_name in idle_timers:
|
|
idle_timers[session_name].reset()
|
|
|
|
async def on_status(status):
|
|
await bot.send_message(chat_id=chat_id, text=f"[{status}]")
|
|
|
|
async def on_tool_use(tool_name: str, tool_input: dict):
|
|
"""Format and send tool call progress notification."""
|
|
# Extract meaningful target from tool_input
|
|
target = ""
|
|
if tool_name == "Bash":
|
|
cmd = tool_input.get("command", "")
|
|
target = cmd[:50] + "..." if len(cmd) > 50 else cmd
|
|
elif tool_name == "Read":
|
|
target = tool_input.get("file_path", "")
|
|
elif tool_name == "Edit":
|
|
target = tool_input.get("file_path", "")
|
|
elif tool_name == "Write":
|
|
target = tool_input.get("file_path", "")
|
|
elif tool_name == "Grep":
|
|
pattern = tool_input.get("pattern", "")
|
|
target = f"pattern: {pattern}"
|
|
elif tool_name == "Glob":
|
|
pattern = tool_input.get("pattern", "")
|
|
target = f"pattern: {pattern}"
|
|
else:
|
|
target = str(tool_input)[:50]
|
|
|
|
# Format as italic message
|
|
message = f"_{tool_name}: {target}_"
|
|
|
|
try:
|
|
await bot.send_message(chat_id=chat_id, text=message, parse_mode='MarkdownV2')
|
|
except Exception as e:
|
|
# Fall back to plain text
|
|
logger.warning(f"Failed to send tool notification with MarkdownV2: {e}")
|
|
await bot.send_message(chat_id=chat_id, text=f"{tool_name}: {target}")
|
|
|
|
return {
|
|
'on_output': on_output,
|
|
'on_error': on_error,
|
|
'on_complete': on_complete,
|
|
'on_status': on_status,
|
|
'on_tool_use': on_tool_use,
|
|
}
|
|
|
|
def run_command(cmd: list, timeout: int = 30) -> str:
|
|
"""Run a shell command and return output."""
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
env={**os.environ, 'PATH': f"/home/mikkel/bin:{os.environ.get('PATH', '')}"}
|
|
)
|
|
output = result.stdout or result.stderr or "No output"
|
|
# Telegram has 4096 char limit per message
|
|
if len(output) > 4000:
|
|
output = output[:4000] + "\n... (truncated)"
|
|
return output
|
|
except subprocess.TimeoutExpired:
|
|
return "Command timed out"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
# Command handlers
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /start command - first contact with bot."""
|
|
user = update.effective_user
|
|
chat_id = update.effective_chat.id
|
|
|
|
if not is_authorized(user.id):
|
|
# First user becomes authorized automatically
|
|
if not get_authorized_users():
|
|
add_authorized_user(user.id)
|
|
await update.message.reply_text(
|
|
f"Welcome {user.first_name}! You're now authorized as the admin.\n"
|
|
f"Your chat ID: {chat_id}\n\n"
|
|
f"Use /help to see available commands."
|
|
)
|
|
else:
|
|
await update.message.reply_text(
|
|
f"Unauthorized. Your user ID: {user.id}\n"
|
|
"Contact the admin to get access."
|
|
)
|
|
return
|
|
|
|
await update.message.reply_text(
|
|
f"Welcome back {user.first_name}!\n"
|
|
f"Use /help to see available commands."
|
|
)
|
|
|
|
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /help command."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
help_text = """
|
|
*Homelab Bot Commands*
|
|
|
|
*Claude Sessions:*
|
|
/new <name> [persona] - Create new Claude session
|
|
/session <name> - Switch to a session
|
|
/sessions - List all sessions with status
|
|
/model <name> - Switch model (sonnet/opus/haiku)
|
|
/timeout <minutes> - Set idle timeout (1-120)
|
|
/archive <name> - Archive and remove a session
|
|
|
|
*Status & Monitoring:*
|
|
/status - Quick service overview
|
|
/pbs - PBS backup status
|
|
/backups - Last backup per VM/CT
|
|
/beszel - Server metrics
|
|
/kuma - Uptime Kuma status
|
|
|
|
*PBS Commands:*
|
|
/pbs\\_status - Full PBS overview
|
|
/pbs\\_errors - Recent errors
|
|
/pbs\\_gc - Garbage collection status
|
|
|
|
*System:*
|
|
/ping <host> - Ping a host
|
|
/help - This help message
|
|
/chatid - Show your chat ID
|
|
"""
|
|
await update.message.reply_text(help_text, parse_mode='Markdown')
|
|
|
|
async def chatid(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Show chat ID (useful for notifications)."""
|
|
await update.message.reply_text(f"Chat ID: `{update.effective_chat.id}`", parse_mode='Markdown')
|
|
|
|
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Quick status overview."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Checking services...")
|
|
|
|
# Quick checks
|
|
output_lines = ["*Homelab Status*\n"]
|
|
|
|
# Check key services via ping
|
|
services = [
|
|
("NPM", "10.5.0.1"),
|
|
("DNS", "10.5.0.2"),
|
|
("PBS", "10.5.0.6"),
|
|
("Dockge", "10.5.0.10"),
|
|
("Forgejo", "10.5.0.14"),
|
|
]
|
|
|
|
for name, ip in services:
|
|
result = subprocess.run(
|
|
["ping", "-c", "1", "-W", "1", ip],
|
|
capture_output=True
|
|
)
|
|
status = "up" if result.returncode == 0 else "down"
|
|
output_lines.append(f"{'✅' if status == 'up' else '❌'} {name} ({ip})")
|
|
|
|
await update.message.reply_text("\n".join(output_lines), parse_mode='Markdown')
|
|
|
|
async def pbs(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS quick status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Fetching PBS status...")
|
|
output = run_command(["/home/mikkel/bin/pbs", "status"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def pbs_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS full status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Fetching PBS status...")
|
|
output = run_command(["/home/mikkel/bin/pbs", "status"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def backups(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS backups per VM/CT."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Fetching backup status...")
|
|
output = run_command(["/home/mikkel/bin/pbs", "backups"], timeout=60)
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def pbs_errors(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS errors."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
output = run_command(["/home/mikkel/bin/pbs", "errors"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def pbs_gc(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS garbage collection status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
output = run_command(["/home/mikkel/bin/pbs", "gc"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def beszel(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Beszel server metrics."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Fetching server metrics...")
|
|
output = run_command(["/home/mikkel/bin/beszel", "status"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def kuma(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Uptime Kuma status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
output = run_command(["/home/mikkel/bin/kuma", "list"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def ping_host(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Ping a host."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
if not context.args:
|
|
await update.message.reply_text("Usage: /ping <host>")
|
|
return
|
|
|
|
host = context.args[0]
|
|
# Basic validation
|
|
if not host.replace('.', '').replace('-', '').isalnum():
|
|
await update.message.reply_text("Invalid hostname")
|
|
return
|
|
|
|
result = subprocess.run(
|
|
["ping", "-c", "3", "-W", "2", host],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
output = result.stdout or result.stderr
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Create a new Claude session."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
if not context.args:
|
|
await update.message.reply_text("Usage: /new <name> [persona]")
|
|
return
|
|
|
|
name = context.args[0]
|
|
persona = context.args[1] if len(context.args) > 1 else None
|
|
|
|
try:
|
|
# Create session
|
|
session_manager.create_session(name, persona=persona)
|
|
|
|
# Auto-switch to new session
|
|
session_manager.switch_session(name)
|
|
|
|
# Prepare reply
|
|
if persona:
|
|
reply = f"Session '{name}' created with persona '{persona}'."
|
|
else:
|
|
reply = f"Session '{name}' created."
|
|
|
|
await update.message.reply_text(reply)
|
|
|
|
# Spawn subprocess for the new session (but don't send message yet)
|
|
session_dir = session_manager.get_session_dir(name)
|
|
persona_data = load_persona_for_session(name)
|
|
|
|
# Create callbacks bound to this chat (typing looked up dynamically)
|
|
callbacks = make_callbacks(context.bot, update.effective_chat.id, name)
|
|
|
|
# Create subprocess
|
|
subprocesses[name] = ClaudeSubprocess(
|
|
session_dir=session_dir,
|
|
persona=persona_data,
|
|
on_output=callbacks['on_output'],
|
|
on_error=callbacks['on_error'],
|
|
on_complete=callbacks['on_complete'],
|
|
on_status=callbacks['on_status'],
|
|
on_tool_use=callbacks['on_tool_use'],
|
|
)
|
|
|
|
# Initialize idle timer for the new session
|
|
timeout_secs = session_manager.get_session_timeout(name)
|
|
idle_timers[name] = SessionIdleTimer(name, timeout_secs, on_timeout=suspend_session)
|
|
|
|
logger.info(f"Created session '{name}' with persona '{persona or 'default'}'")
|
|
|
|
except ValueError as e:
|
|
# Session already exists
|
|
await update.message.reply_text(
|
|
f"Session '{name}' already exists. Use /session <name> to switch to it."
|
|
)
|
|
except FileNotFoundError as e:
|
|
# Persona not found
|
|
await update.message.reply_text(f"Error: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error creating session: {e}")
|
|
await update.message.reply_text(f"Error creating session: {e}")
|
|
|
|
async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Switch to a different Claude session."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
if not context.args:
|
|
# Show usage and list available sessions
|
|
sessions = session_manager.list_sessions()
|
|
if not sessions:
|
|
await update.message.reply_text("No sessions found. Use /new <name> to create one.")
|
|
return
|
|
|
|
session_list = "\n".join(
|
|
f"- {s['name']} ({s['status']}) - {s.get('persona', 'default')}"
|
|
for s in sessions
|
|
)
|
|
await update.message.reply_text(
|
|
f"Usage: /session <name>\n\nAvailable sessions:\n{session_list}"
|
|
)
|
|
return
|
|
|
|
name = context.args[0]
|
|
|
|
try:
|
|
# Check if session exists
|
|
if not session_manager.session_exists(name):
|
|
await update.message.reply_text(
|
|
f"Session '{name}' doesn't exist. Use /new <name> to create it."
|
|
)
|
|
return
|
|
|
|
# Stop typing indicator for previous session if running
|
|
prev_session = session_manager.get_active_session()
|
|
if prev_session and prev_session in typing_tasks:
|
|
task, stop_event = typing_tasks[prev_session]
|
|
stop_event.set()
|
|
try:
|
|
await task
|
|
except Exception:
|
|
pass
|
|
del typing_tasks[prev_session]
|
|
|
|
# Flush batcher for previous session immediately
|
|
if prev_session and prev_session in batchers:
|
|
await batchers[prev_session].flush_immediately()
|
|
|
|
# Switch session
|
|
session_manager.switch_session(name)
|
|
|
|
# Auto-spawn subprocess if not alive
|
|
if name not in subprocesses or not subprocesses[name].is_alive:
|
|
session_dir = session_manager.get_session_dir(name)
|
|
persona_data = load_persona_for_session(name)
|
|
|
|
# Create callbacks bound to this chat (typing looked up dynamically)
|
|
callbacks = make_callbacks(context.bot, update.effective_chat.id, name)
|
|
|
|
# Create subprocess
|
|
subprocesses[name] = ClaudeSubprocess(
|
|
session_dir=session_dir,
|
|
persona=persona_data,
|
|
on_output=callbacks['on_output'],
|
|
on_error=callbacks['on_error'],
|
|
on_complete=callbacks['on_complete'],
|
|
on_status=callbacks['on_status'],
|
|
on_tool_use=callbacks['on_tool_use'],
|
|
)
|
|
|
|
# Initialize idle timer for the switched-to session
|
|
timeout_secs = session_manager.get_session_timeout(name)
|
|
if name not in idle_timers:
|
|
idle_timers[name] = SessionIdleTimer(name, timeout_secs, on_timeout=suspend_session)
|
|
|
|
logger.info(f"Auto-spawned subprocess for session '{name}'")
|
|
|
|
# Get persona for reply
|
|
session_data = session_manager.get_session(name)
|
|
persona_name = session_data.get('persona', 'default')
|
|
|
|
if persona_name and persona_name != 'default':
|
|
reply = f"Switched to session '{name}' (persona: {persona_name})."
|
|
else:
|
|
reply = f"Switched to session '{name}'."
|
|
|
|
await update.message.reply_text(reply)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error switching session: {e}")
|
|
await update.message.reply_text(f"Error switching session: {e}")
|
|
|
|
async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Archive a Claude session (compress and remove)."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
if not context.args:
|
|
await update.message.reply_text("Usage: /archive <name>")
|
|
return
|
|
|
|
name = context.args[0]
|
|
|
|
try:
|
|
# Stop typing indicator if running
|
|
if name in typing_tasks:
|
|
task, stop_event = typing_tasks[name]
|
|
stop_event.set()
|
|
try:
|
|
await task
|
|
except Exception:
|
|
pass
|
|
del typing_tasks[name]
|
|
|
|
# Flush and remove batcher
|
|
if name in batchers:
|
|
await batchers[name].flush_immediately()
|
|
del batchers[name]
|
|
|
|
# Terminate subprocess if running
|
|
if name in subprocesses:
|
|
if subprocesses[name].is_alive:
|
|
await subprocesses[name].terminate()
|
|
del subprocesses[name]
|
|
|
|
# Cancel idle timer if exists
|
|
if name in idle_timers:
|
|
idle_timers[name].cancel()
|
|
del idle_timers[name]
|
|
|
|
# Remove subprocess lock if exists
|
|
subprocess_locks.pop(name, None)
|
|
|
|
# Archive the session
|
|
archive_path = session_manager.archive_session(name)
|
|
size_mb = archive_path.stat().st_size / (1024 * 1024)
|
|
await update.message.reply_text(
|
|
f"Session '{name}' archived ({size_mb:.1f} MB)\n{archive_path.name}"
|
|
)
|
|
logger.info(f"Archived session '{name}' to {archive_path}")
|
|
|
|
except ValueError as e:
|
|
await update.message.reply_text(str(e))
|
|
except Exception as e:
|
|
logger.error(f"Error archiving session: {e}")
|
|
await update.message.reply_text(f"Error archiving session: {e}")
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle free text messages - route to active Claude session with batching."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
# Check for active session
|
|
active_session = session_manager.get_active_session()
|
|
if not active_session:
|
|
await update.message.reply_text(
|
|
"No active session. Use /new <name> to start one."
|
|
)
|
|
return
|
|
|
|
message = update.message.text
|
|
t_start = time.monotonic()
|
|
# Telegram message timestamp vs now = delivery delay
|
|
msg_age = (datetime.utcnow() - update.message.date.replace(tzinfo=None)).total_seconds()
|
|
logger.info(f"[TIMING] Message received: session='{active_session}', age={msg_age:.1f}s, text={message[:50]}...")
|
|
|
|
try:
|
|
# Reset idle timer on user activity (even before processing message)
|
|
if active_session in idle_timers:
|
|
idle_timers[active_session].reset()
|
|
|
|
# Clean up stale typing task (from previous message completion)
|
|
if active_session in typing_tasks:
|
|
old_task, old_event = typing_tasks[active_session]
|
|
if old_task.done():
|
|
del typing_tasks[active_session]
|
|
|
|
# Start fresh typing indicator for this message
|
|
if active_session not in typing_tasks:
|
|
stop_typing = asyncio.Event()
|
|
typing_task = asyncio.create_task(
|
|
typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing)
|
|
)
|
|
typing_tasks[active_session] = (typing_task, stop_typing)
|
|
|
|
# Acquire lock to prevent race with suspend_session
|
|
lock = get_subprocess_lock(active_session)
|
|
async with lock:
|
|
# Get or create subprocess for active session (avoid double-start)
|
|
already_alive = active_session in subprocesses and subprocesses[active_session].is_alive
|
|
if not already_alive:
|
|
session_dir = session_manager.get_session_dir(active_session)
|
|
persona_data = load_persona_for_session(active_session)
|
|
|
|
# Check if this is a resume (has .claude/ dir with history)
|
|
is_resume = (session_dir / ".claude").exists()
|
|
if is_resume:
|
|
# Calculate idle duration
|
|
session_data = session_manager.get_session(active_session)
|
|
last_active_str = session_data.get('last_active')
|
|
if last_active_str:
|
|
last_active = datetime.fromisoformat(last_active_str)
|
|
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
|
|
if idle_duration > 60:
|
|
# Show idle duration if >1 min
|
|
idle_minutes = int(idle_duration / 60)
|
|
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
|
|
else:
|
|
await update.message.reply_text("Resuming session...")
|
|
else:
|
|
await update.message.reply_text("Resuming session...")
|
|
|
|
# Create callbacks bound to this chat (typing looked up dynamically)
|
|
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
|
|
|
|
# Create subprocess
|
|
subprocess_inst = ClaudeSubprocess(
|
|
session_dir=session_dir,
|
|
persona=persona_data,
|
|
on_output=callbacks['on_output'],
|
|
on_error=callbacks['on_error'],
|
|
on_complete=callbacks['on_complete'],
|
|
on_status=callbacks['on_status'],
|
|
on_tool_use=callbacks['on_tool_use'],
|
|
)
|
|
await subprocess_inst.start()
|
|
subprocesses[active_session] = subprocess_inst
|
|
|
|
# Update metadata with PID and status
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
session_manager.update_session(
|
|
active_session,
|
|
status='active',
|
|
last_active=now_iso,
|
|
pid=subprocess_inst.pid
|
|
)
|
|
else:
|
|
subprocess_inst = subprocesses[active_session]
|
|
|
|
subprocess_state = "reused" if already_alive else "cold-start"
|
|
logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocess_inst.is_busy}")
|
|
|
|
# Get or create message batcher for this session
|
|
if active_session not in batchers:
|
|
# Batcher callback sends message to subprocess
|
|
batchers[active_session] = MessageBatcher(
|
|
callback=subprocess_inst.send_message,
|
|
debounce_seconds=2.0
|
|
)
|
|
|
|
# Create/reset idle timer for the session (outside lock)
|
|
timeout_secs = session_manager.get_session_timeout(active_session)
|
|
if active_session not in idle_timers:
|
|
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
|
|
|
|
# Add message to batcher (typing indicator already running)
|
|
await batchers[active_session].add_message(message)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling message: {e}")
|
|
await update.message.reply_text(f"Error: {e}")
|
|
# Stop typing indicator on error
|
|
if active_session in typing_tasks:
|
|
task, stop_event = typing_tasks[active_session]
|
|
stop_event.set()
|
|
|
|
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle photo messages - save to session directory and auto-analyze."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
# Check for active session
|
|
active_session = session_manager.get_active_session()
|
|
if not active_session:
|
|
await update.message.reply_text(
|
|
"No active session. Use /new <name> to start one, then send the photo again."
|
|
)
|
|
return
|
|
|
|
user = update.effective_user
|
|
file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Get session directory
|
|
session_dir = session_manager.get_session_dir(active_session)
|
|
|
|
# Get the largest photo (best quality)
|
|
photo = update.message.photo[-1]
|
|
file = await context.bot.get_file(photo.file_id)
|
|
|
|
# Download to session directory
|
|
filename = f"photo_{file_timestamp}.jpg"
|
|
filepath = session_dir / filename
|
|
await file.download_to_drive(filepath)
|
|
|
|
logger.info(f"Photo saved to session '{active_session}': {filepath}")
|
|
|
|
# Get caption if any
|
|
caption = update.message.caption or ""
|
|
|
|
# Reset idle timer on user activity
|
|
if active_session in idle_timers:
|
|
idle_timers[active_session].reset()
|
|
|
|
# Clean up stale typing task
|
|
if active_session in typing_tasks:
|
|
old_task, old_event = typing_tasks[active_session]
|
|
if old_task.done():
|
|
del typing_tasks[active_session]
|
|
|
|
# Start typing indicator
|
|
if active_session not in typing_tasks:
|
|
stop_typing = asyncio.Event()
|
|
typing_task = asyncio.create_task(
|
|
typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing)
|
|
)
|
|
typing_tasks[active_session] = (typing_task, stop_typing)
|
|
|
|
# Auto-analyze: send message to Claude
|
|
if caption:
|
|
analysis_message = f"I've attached a photo: {filename}. {caption}"
|
|
else:
|
|
analysis_message = f"I've attached a photo: {filename}. Please describe what you see."
|
|
|
|
# Acquire lock to prevent race with suspend_session
|
|
lock = get_subprocess_lock(active_session)
|
|
async with lock:
|
|
# Get or create subprocess
|
|
if active_session not in subprocesses or not subprocesses[active_session].is_alive:
|
|
# Check if this is a resume
|
|
is_resume = (session_dir / ".claude").exists()
|
|
if is_resume:
|
|
session_data = session_manager.get_session(active_session)
|
|
last_active_str = session_data.get('last_active')
|
|
if last_active_str:
|
|
last_active = datetime.fromisoformat(last_active_str)
|
|
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
|
|
if idle_duration > 60:
|
|
idle_minutes = int(idle_duration / 60)
|
|
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
|
|
else:
|
|
await update.message.reply_text("Resuming session...")
|
|
else:
|
|
await update.message.reply_text("Resuming session...")
|
|
|
|
persona_data = load_persona_for_session(active_session)
|
|
|
|
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
|
|
|
|
subprocess_inst = ClaudeSubprocess(
|
|
session_dir=session_dir,
|
|
persona=persona_data,
|
|
on_output=callbacks['on_output'],
|
|
on_error=callbacks['on_error'],
|
|
on_complete=callbacks['on_complete'],
|
|
on_status=callbacks['on_status'],
|
|
on_tool_use=callbacks['on_tool_use'],
|
|
)
|
|
await subprocess_inst.start()
|
|
subprocesses[active_session] = subprocess_inst
|
|
|
|
# Update metadata with PID and status
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
session_manager.update_session(
|
|
active_session,
|
|
status='active',
|
|
last_active=now_iso,
|
|
pid=subprocess_inst.pid
|
|
)
|
|
|
|
# Create/reset idle timer (outside lock)
|
|
timeout_secs = session_manager.get_session_timeout(active_session)
|
|
if active_session not in idle_timers:
|
|
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
|
|
|
|
# Send analysis message directly (not batched)
|
|
await subprocesses[active_session].send_message(analysis_message)
|
|
|
|
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle document/file messages - save to session directory and notify Claude."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
# Check for active session
|
|
active_session = session_manager.get_active_session()
|
|
if not active_session:
|
|
await update.message.reply_text(
|
|
"No active session. Use /new <name> to start one, then send the file again."
|
|
)
|
|
return
|
|
|
|
user = update.effective_user
|
|
file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Get session directory
|
|
session_dir = session_manager.get_session_dir(active_session)
|
|
|
|
# Get document info
|
|
doc = update.message.document
|
|
original_name = doc.file_name or "unknown"
|
|
file = await context.bot.get_file(doc.file_id)
|
|
|
|
# Download with timestamp prefix to avoid collisions
|
|
filename = f"{file_timestamp}_{original_name}"
|
|
filepath = session_dir / filename
|
|
await file.download_to_drive(filepath)
|
|
|
|
logger.info(f"Document saved to session '{active_session}': {filepath}")
|
|
|
|
# Get caption if any
|
|
caption = update.message.caption or ""
|
|
|
|
# Reset idle timer on user activity
|
|
if active_session in idle_timers:
|
|
idle_timers[active_session].reset()
|
|
|
|
# Clean up stale typing task
|
|
if active_session in typing_tasks:
|
|
old_task, old_event = typing_tasks[active_session]
|
|
if old_task.done():
|
|
del typing_tasks[active_session]
|
|
|
|
# Start typing indicator
|
|
if active_session not in typing_tasks:
|
|
stop_typing = asyncio.Event()
|
|
typing_task = asyncio.create_task(
|
|
typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing)
|
|
)
|
|
typing_tasks[active_session] = (typing_task, stop_typing)
|
|
|
|
# Notify Claude
|
|
if caption:
|
|
notify_message = f"{caption}\nThe file {filename} has been saved to your session."
|
|
else:
|
|
notify_message = f"User uploaded file: {filename}"
|
|
|
|
# Acquire lock to prevent race with suspend_session
|
|
lock = get_subprocess_lock(active_session)
|
|
async with lock:
|
|
# Get or create subprocess
|
|
if active_session not in subprocesses or not subprocesses[active_session].is_alive:
|
|
# Check if this is a resume
|
|
is_resume = (session_dir / ".claude").exists()
|
|
if is_resume:
|
|
session_data = session_manager.get_session(active_session)
|
|
last_active_str = session_data.get('last_active')
|
|
if last_active_str:
|
|
last_active = datetime.fromisoformat(last_active_str)
|
|
idle_duration = (datetime.now(timezone.utc) - last_active).total_seconds()
|
|
if idle_duration > 60:
|
|
idle_minutes = int(idle_duration / 60)
|
|
await update.message.reply_text(f"Resuming session (idle for {idle_minutes} min)...")
|
|
else:
|
|
await update.message.reply_text("Resuming session...")
|
|
else:
|
|
await update.message.reply_text("Resuming session...")
|
|
|
|
persona_data = load_persona_for_session(active_session)
|
|
|
|
callbacks = make_callbacks(context.bot, update.effective_chat.id, active_session)
|
|
|
|
subprocess_inst = ClaudeSubprocess(
|
|
session_dir=session_dir,
|
|
persona=persona_data,
|
|
on_output=callbacks['on_output'],
|
|
on_error=callbacks['on_error'],
|
|
on_complete=callbacks['on_complete'],
|
|
on_status=callbacks['on_status'],
|
|
on_tool_use=callbacks['on_tool_use'],
|
|
)
|
|
await subprocess_inst.start()
|
|
subprocesses[active_session] = subprocess_inst
|
|
|
|
# Update metadata with PID and status
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
session_manager.update_session(
|
|
active_session,
|
|
status='active',
|
|
last_active=now_iso,
|
|
pid=subprocess_inst.pid
|
|
)
|
|
|
|
# Create/reset idle timer (outside lock)
|
|
timeout_secs = session_manager.get_session_timeout(active_session)
|
|
if active_session not in idle_timers:
|
|
idle_timers[active_session] = SessionIdleTimer(active_session, timeout_secs, on_timeout=suspend_session)
|
|
|
|
# Send notification directly (not batched)
|
|
await subprocesses[active_session].send_message(notify_message)
|
|
|
|
async def model_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Switch model for current session. Persists across session switches."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
active_session = session_manager.get_active_session()
|
|
if not active_session:
|
|
await update.message.reply_text("No active session. Use /new <name> to start one.")
|
|
return
|
|
|
|
if not context.args:
|
|
# Show current model
|
|
session_data = session_manager.get_session(active_session)
|
|
model_override = session_data.get('model_override')
|
|
persona_name = session_data.get('persona', 'default')
|
|
persona = session_manager.load_persona(persona_name)
|
|
current = model_override or persona.get('settings', {}).get('model', 'default')
|
|
aliases = "\n".join(f" {k} → {v}" for k, v in MODEL_ALIASES.items())
|
|
await update.message.reply_text(
|
|
f"Current model: {current}\n\nUsage: /model <name>\n\nAliases:\n{aliases}"
|
|
)
|
|
return
|
|
|
|
model = context.args[0]
|
|
# Resolve alias
|
|
resolved = MODEL_ALIASES.get(model, model)
|
|
|
|
# Persist to session metadata
|
|
session_manager.update_session(active_session, model_override=resolved)
|
|
|
|
# Terminate current subprocess so next message spawns with new model
|
|
if active_session in subprocesses:
|
|
if subprocesses[active_session].is_alive:
|
|
await subprocesses[active_session].terminate()
|
|
del subprocesses[active_session]
|
|
|
|
# Clean up batcher too
|
|
if active_session in batchers:
|
|
await batchers[active_session].flush_immediately()
|
|
del batchers[active_session]
|
|
|
|
# Cancel idle timer (will be recreated on next message)
|
|
if active_session in idle_timers:
|
|
idle_timers[active_session].cancel()
|
|
del idle_timers[active_session]
|
|
|
|
await update.message.reply_text(f"Model set to {resolved} for session '{active_session}'.")
|
|
logger.info(f"Model changed to {resolved} for session '{active_session}'")
|
|
|
|
async def timeout_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Set idle timeout for current session."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
active_session = session_manager.get_active_session()
|
|
if not active_session:
|
|
await update.message.reply_text("No active session. Use /new <name> to start one.")
|
|
return
|
|
|
|
if not context.args:
|
|
# Show current timeout
|
|
timeout_secs = session_manager.get_session_timeout(active_session)
|
|
minutes = timeout_secs // 60
|
|
await update.message.reply_text(
|
|
f"Idle timeout: {minutes} minutes\n\nUsage: /timeout <minutes> (1-120)"
|
|
)
|
|
return
|
|
|
|
# Parse and validate timeout value
|
|
try:
|
|
minutes = int(context.args[0])
|
|
if minutes < 1 or minutes > 120:
|
|
await update.message.reply_text("Timeout must be between 1 and 120 minutes")
|
|
return
|
|
except ValueError:
|
|
await update.message.reply_text("Invalid number. Usage: /timeout <minutes>")
|
|
return
|
|
|
|
# Convert to seconds and update
|
|
timeout_seconds = minutes * 60
|
|
session_manager.update_session(active_session, idle_timeout=timeout_seconds)
|
|
|
|
# Update existing idle timer if present
|
|
if active_session in idle_timers:
|
|
idle_timers[active_session].timeout_seconds = timeout_seconds
|
|
idle_timers[active_session].reset()
|
|
|
|
await update.message.reply_text(
|
|
f"Idle timeout set to {minutes} minutes for session '{active_session}'."
|
|
)
|
|
logger.info(f"Idle timeout set to {minutes} minutes for session '{active_session}'")
|
|
|
|
async def sessions_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""List all sessions with status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
sessions = session_manager.list_sessions()
|
|
if not sessions:
|
|
await update.message.reply_text("No sessions. Use /new <name> to create one.")
|
|
return
|
|
|
|
active_session = session_manager.get_active_session()
|
|
|
|
def format_relative_time(iso_str):
|
|
"""Format ISO timestamp as relative time (e.g., '2m ago')."""
|
|
dt = datetime.fromisoformat(iso_str)
|
|
delta = datetime.now(timezone.utc) - dt
|
|
secs = delta.total_seconds()
|
|
if secs < 60:
|
|
return "just now"
|
|
if secs < 3600:
|
|
return f"{int(secs/60)}m ago"
|
|
if secs < 86400:
|
|
return f"{int(secs/3600)}h ago"
|
|
return f"{int(secs/86400)}d ago"
|
|
|
|
lines = []
|
|
for s in sessions:
|
|
name = s['name']
|
|
persona = s.get('persona', 'default')
|
|
|
|
# Determine status
|
|
if name in subprocesses and subprocesses[name].is_alive:
|
|
status = "🟢 LIVE"
|
|
elif s.get('status') == 'suspended':
|
|
status = "⚪ IDLE"
|
|
else:
|
|
status = s.get('status', 'unknown').upper()
|
|
|
|
# Format last active time
|
|
last_active = s.get('last_active', '')
|
|
rel_time = format_relative_time(last_active) if last_active else "unknown"
|
|
|
|
# Mark current active session
|
|
marker = "→ " if name == active_session else " "
|
|
|
|
lines.append(f"{marker}{status} `{name}` ({persona}) - {rel_time}")
|
|
|
|
await update.message.reply_text("\n".join(lines), parse_mode='Markdown')
|
|
|
|
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle unknown commands."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
await update.message.reply_text("Unknown command. Use /help for available commands.")
|
|
|
|
async def cleanup_orphaned_subprocesses():
|
|
"""Clean up orphaned subprocesses at bot startup."""
|
|
orphan_count = 0
|
|
sessions = session_manager.list_sessions()
|
|
|
|
for session in sessions:
|
|
name = session['name']
|
|
pid = session.get('pid')
|
|
|
|
if pid is not None:
|
|
# Check if process exists
|
|
try:
|
|
os.kill(pid, 0)
|
|
# Process exists - verify it's a claude process
|
|
try:
|
|
with open(f"/proc/{pid}/cmdline", "r") as f:
|
|
cmdline = f.read()
|
|
if "claude" in cmdline:
|
|
# It's a claude process - kill it
|
|
logger.info(f"Killing orphaned claude process for session '{name}': PID {pid}")
|
|
try:
|
|
os.kill(pid, signal.SIGTERM)
|
|
await asyncio.sleep(2)
|
|
try:
|
|
os.kill(pid, signal.SIGKILL)
|
|
except ProcessLookupError:
|
|
pass # Already dead
|
|
except ProcessLookupError:
|
|
pass # Process died between checks
|
|
orphan_count += 1
|
|
else:
|
|
logger.warning(f"PID {pid} for session '{name}' exists but is not a claude process")
|
|
except (FileNotFoundError, PermissionError):
|
|
# Can't read cmdline - assume it's not our process
|
|
logger.warning(f"Cannot verify PID {pid} for session '{name}'")
|
|
except ProcessLookupError:
|
|
# Process doesn't exist
|
|
pass
|
|
|
|
# Update metadata - clear PID and set status to suspended
|
|
session_manager.update_session(name, pid=None, status='suspended')
|
|
|
|
# Also suspend any sessions that are marked active but have no PID
|
|
elif session.get('status') == 'active':
|
|
session_manager.update_session(name, status='suspended')
|
|
|
|
if orphan_count > 0:
|
|
logger.info(f"Cleaned up {orphan_count} orphaned subprocess(es)")
|
|
else:
|
|
logger.info("No orphaned subprocesses found")
|
|
|
|
async def post_init(application):
|
|
"""Run startup cleanup."""
|
|
await cleanup_orphaned_subprocesses()
|
|
|
|
async def post_shutdown(application):
|
|
"""Clean up subprocesses and timers on bot shutdown."""
|
|
logger.info("Bot shutting down, cleaning up...")
|
|
|
|
# Cancel all idle timers
|
|
for name, timer in idle_timers.items():
|
|
timer.cancel()
|
|
|
|
# Terminate all subprocesses
|
|
for name, proc in list(subprocesses.items()):
|
|
if proc.is_alive:
|
|
logger.info(f"Terminating subprocess for '{name}'")
|
|
await proc.terminate()
|
|
|
|
logger.info("Cleanup complete")
|
|
|
|
def main():
|
|
"""Start the bot."""
|
|
# Create application with lifecycle callbacks
|
|
app = Application.builder().token(TOKEN).post_init(post_init).post_shutdown(post_shutdown).build()
|
|
|
|
# Add handlers
|
|
app.add_handler(CommandHandler("start", start))
|
|
app.add_handler(CommandHandler("help", help_command))
|
|
app.add_handler(CommandHandler("chatid", chatid))
|
|
app.add_handler(CommandHandler("new", new_session))
|
|
app.add_handler(CommandHandler("session", switch_session_cmd))
|
|
app.add_handler(CommandHandler("sessions", sessions_cmd))
|
|
app.add_handler(CommandHandler("archive", archive_session_cmd))
|
|
app.add_handler(CommandHandler("model", model_cmd))
|
|
app.add_handler(CommandHandler("timeout", timeout_cmd))
|
|
app.add_handler(CommandHandler("status", status))
|
|
app.add_handler(CommandHandler("pbs", pbs))
|
|
app.add_handler(CommandHandler("pbs_status", pbs_status))
|
|
app.add_handler(CommandHandler("backups", backups))
|
|
app.add_handler(CommandHandler("pbs_errors", pbs_errors))
|
|
app.add_handler(CommandHandler("pbs_gc", pbs_gc))
|
|
app.add_handler(CommandHandler("beszel", beszel))
|
|
app.add_handler(CommandHandler("kuma", kuma))
|
|
app.add_handler(CommandHandler("ping", ping_host))
|
|
|
|
# Unknown command handler
|
|
app.add_handler(MessageHandler(filters.COMMAND, unknown))
|
|
|
|
# Free text message handler (for messages to Claude)
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message, block=False))
|
|
|
|
# Photo handler
|
|
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
|
|
|
|
# Document/file handler
|
|
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
|
|
|
|
# Start polling
|
|
logger.info("Starting Homelab Bot...")
|
|
app.run_polling(allowed_updates=Update.ALL_TYPES)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|