Archives session directory with tar+pigz to sessions_archive/, terminates any running subprocess first, clears active session if needed. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
599 lines
21 KiB
Python
Executable file
599 lines
21 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Homelab Telegram Bot
|
|
Two-way interactive bot for homelab management and notifications.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from telegram import Update
|
|
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
|
|
from session_manager import SessionManager
|
|
from claude_subprocess import ClaudeSubprocess
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
level=logging.INFO
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Load credentials
|
|
CREDENTIALS_FILE = Path(__file__).parent / 'credentials'
|
|
config = {}
|
|
with open(CREDENTIALS_FILE) as f:
|
|
for line in f:
|
|
if '=' in line:
|
|
key, value = line.strip().split('=', 1)
|
|
config[key] = value
|
|
|
|
TOKEN = config['TELEGRAM_BOT_TOKEN']
|
|
|
|
# Authorized users file (stores chat IDs that are allowed to use the bot)
|
|
AUTHORIZED_FILE = Path(__file__).parent / 'authorized_users'
|
|
|
|
# Inbox file for messages to Claude
|
|
INBOX_FILE = Path(__file__).parent / 'inbox'
|
|
|
|
# Session manager and subprocess tracking
|
|
session_manager = SessionManager()
|
|
subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session
|
|
|
|
def get_authorized_users():
|
|
"""Load authorized user IDs."""
|
|
if AUTHORIZED_FILE.exists():
|
|
with open(AUTHORIZED_FILE) as f:
|
|
return set(int(line.strip()) for line in f if line.strip())
|
|
return set()
|
|
|
|
def add_authorized_user(user_id: int):
|
|
"""Add a user to authorized list."""
|
|
users = get_authorized_users()
|
|
users.add(user_id)
|
|
with open(AUTHORIZED_FILE, 'w') as f:
|
|
for uid in users:
|
|
f.write(f"{uid}\n")
|
|
|
|
def is_authorized(user_id: int) -> bool:
|
|
"""Check if user is authorized."""
|
|
return user_id in get_authorized_users()
|
|
|
|
def make_callbacks(bot, chat_id):
|
|
"""Create callbacks for ClaudeSubprocess bound to specific chat."""
|
|
async def on_output(text):
|
|
# Truncate to Telegram limit
|
|
if len(text) > 4000:
|
|
text = text[:4000] + "\n... (truncated)"
|
|
await bot.send_message(chat_id=chat_id, text=text)
|
|
|
|
async def on_error(error):
|
|
await bot.send_message(chat_id=chat_id, text=f"Error: {error}")
|
|
|
|
async def on_complete():
|
|
pass # Phase 2 will add typing indicator cleanup
|
|
|
|
async def on_status(status):
|
|
await bot.send_message(chat_id=chat_id, text=f"[{status}]")
|
|
|
|
return on_output, on_error, on_complete, on_status
|
|
|
|
def run_command(cmd: list, timeout: int = 30) -> str:
|
|
"""Run a shell command and return output."""
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
env={**os.environ, 'PATH': f"/home/mikkel/bin:{os.environ.get('PATH', '')}"}
|
|
)
|
|
output = result.stdout or result.stderr or "No output"
|
|
# Telegram has 4096 char limit per message
|
|
if len(output) > 4000:
|
|
output = output[:4000] + "\n... (truncated)"
|
|
return output
|
|
except subprocess.TimeoutExpired:
|
|
return "Command timed out"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
# Command handlers
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /start command - first contact with bot."""
|
|
user = update.effective_user
|
|
chat_id = update.effective_chat.id
|
|
|
|
if not is_authorized(user.id):
|
|
# First user becomes authorized automatically
|
|
if not get_authorized_users():
|
|
add_authorized_user(user.id)
|
|
await update.message.reply_text(
|
|
f"Welcome {user.first_name}! You're now authorized as the admin.\n"
|
|
f"Your chat ID: {chat_id}\n\n"
|
|
f"Use /help to see available commands."
|
|
)
|
|
else:
|
|
await update.message.reply_text(
|
|
f"Unauthorized. Your user ID: {user.id}\n"
|
|
"Contact the admin to get access."
|
|
)
|
|
return
|
|
|
|
await update.message.reply_text(
|
|
f"Welcome back {user.first_name}!\n"
|
|
f"Use /help to see available commands."
|
|
)
|
|
|
|
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle /help command."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
help_text = """
|
|
*Homelab Bot Commands*
|
|
|
|
*Claude Sessions:*
|
|
/new <name> [persona] - Create new Claude session
|
|
/session <name> - Switch to a session
|
|
/archive <name> - Archive and remove a session
|
|
|
|
*Status & Monitoring:*
|
|
/status - Quick service overview
|
|
/pbs - PBS backup status
|
|
/backups - Last backup per VM/CT
|
|
/beszel - Server metrics
|
|
/kuma - Uptime Kuma status
|
|
|
|
*PBS Commands:*
|
|
/pbs\\_status - Full PBS overview
|
|
/pbs\\_errors - Recent errors
|
|
/pbs\\_gc - Garbage collection status
|
|
|
|
*System:*
|
|
/ping <host> - Ping a host
|
|
/help - This help message
|
|
/chatid - Show your chat ID
|
|
"""
|
|
await update.message.reply_text(help_text, parse_mode='Markdown')
|
|
|
|
async def chatid(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Show chat ID (useful for notifications)."""
|
|
await update.message.reply_text(f"Chat ID: `{update.effective_chat.id}`", parse_mode='Markdown')
|
|
|
|
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Quick status overview."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Checking services...")
|
|
|
|
# Quick checks
|
|
output_lines = ["*Homelab Status*\n"]
|
|
|
|
# Check key services via ping
|
|
services = [
|
|
("NPM", "10.5.0.1"),
|
|
("DNS", "10.5.0.2"),
|
|
("PBS", "10.5.0.6"),
|
|
("Dockge", "10.5.0.10"),
|
|
("Forgejo", "10.5.0.14"),
|
|
]
|
|
|
|
for name, ip in services:
|
|
result = subprocess.run(
|
|
["ping", "-c", "1", "-W", "1", ip],
|
|
capture_output=True
|
|
)
|
|
status = "up" if result.returncode == 0 else "down"
|
|
output_lines.append(f"{'✅' if status == 'up' else '❌'} {name} ({ip})")
|
|
|
|
await update.message.reply_text("\n".join(output_lines), parse_mode='Markdown')
|
|
|
|
async def pbs(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS quick status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Fetching PBS status...")
|
|
output = run_command(["/home/mikkel/bin/pbs", "status"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def pbs_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS full status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Fetching PBS status...")
|
|
output = run_command(["/home/mikkel/bin/pbs", "status"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def backups(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS backups per VM/CT."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Fetching backup status...")
|
|
output = run_command(["/home/mikkel/bin/pbs", "backups"], timeout=60)
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def pbs_errors(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS errors."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
output = run_command(["/home/mikkel/bin/pbs", "errors"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def pbs_gc(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""PBS garbage collection status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
output = run_command(["/home/mikkel/bin/pbs", "gc"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def beszel(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Beszel server metrics."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
await update.message.reply_text("Fetching server metrics...")
|
|
output = run_command(["/home/mikkel/bin/beszel", "status"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def kuma(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Uptime Kuma status."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
output = run_command(["/home/mikkel/bin/kuma", "list"])
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def ping_host(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Ping a host."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
if not context.args:
|
|
await update.message.reply_text("Usage: /ping <host>")
|
|
return
|
|
|
|
host = context.args[0]
|
|
# Basic validation
|
|
if not host.replace('.', '').replace('-', '').isalnum():
|
|
await update.message.reply_text("Invalid hostname")
|
|
return
|
|
|
|
result = subprocess.run(
|
|
["ping", "-c", "3", "-W", "2", host],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
output = result.stdout or result.stderr
|
|
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
|
|
|
|
async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Create a new Claude session."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
if not context.args:
|
|
await update.message.reply_text("Usage: /new <name> [persona]")
|
|
return
|
|
|
|
name = context.args[0]
|
|
persona = context.args[1] if len(context.args) > 1 else None
|
|
|
|
try:
|
|
# Create session
|
|
session_manager.create_session(name, persona=persona)
|
|
|
|
# Auto-switch to new session
|
|
session_manager.switch_session(name)
|
|
|
|
# Prepare reply
|
|
if persona:
|
|
reply = f"Session '{name}' created with persona '{persona}'."
|
|
else:
|
|
reply = f"Session '{name}' created."
|
|
|
|
await update.message.reply_text(reply)
|
|
|
|
# Spawn subprocess for the new session (but don't send message yet)
|
|
session_dir = session_manager.get_session_dir(name)
|
|
persona_config = session_manager.get_session(name)
|
|
persona_data = session_manager.load_persona(persona or 'default')
|
|
|
|
# Create callbacks bound to this chat
|
|
callbacks = make_callbacks(context.bot, update.effective_chat.id)
|
|
|
|
# Create subprocess
|
|
subprocesses[name] = ClaudeSubprocess(
|
|
session_dir=session_dir,
|
|
persona=persona_data,
|
|
on_output=callbacks[0],
|
|
on_error=callbacks[1],
|
|
on_complete=callbacks[2],
|
|
on_status=callbacks[3],
|
|
)
|
|
|
|
logger.info(f"Created session '{name}' with persona '{persona or 'default'}'")
|
|
|
|
except ValueError as e:
|
|
# Session already exists
|
|
await update.message.reply_text(
|
|
f"Session '{name}' already exists. Use /session <name> to switch to it."
|
|
)
|
|
except FileNotFoundError as e:
|
|
# Persona not found
|
|
await update.message.reply_text(f"Error: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Error creating session: {e}")
|
|
await update.message.reply_text(f"Error creating session: {e}")
|
|
|
|
async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Switch to a different Claude session."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
if not context.args:
|
|
# Show usage and list available sessions
|
|
sessions = session_manager.list_sessions()
|
|
if not sessions:
|
|
await update.message.reply_text("No sessions found. Use /new <name> to create one.")
|
|
return
|
|
|
|
session_list = "\n".join(
|
|
f"- {s['name']} ({s['status']}) - {s.get('persona', 'default')}"
|
|
for s in sessions
|
|
)
|
|
await update.message.reply_text(
|
|
f"Usage: /session <name>\n\nAvailable sessions:\n{session_list}"
|
|
)
|
|
return
|
|
|
|
name = context.args[0]
|
|
|
|
try:
|
|
# Check if session exists
|
|
if not session_manager.session_exists(name):
|
|
await update.message.reply_text(
|
|
f"Session '{name}' doesn't exist. Use /new <name> to create it."
|
|
)
|
|
return
|
|
|
|
# Switch session
|
|
session_manager.switch_session(name)
|
|
|
|
# Auto-spawn subprocess if not alive
|
|
if name not in subprocesses or not subprocesses[name].is_alive:
|
|
session_dir = session_manager.get_session_dir(name)
|
|
session_data = session_manager.get_session(name)
|
|
persona_name = session_data.get('persona', 'default')
|
|
persona_data = session_manager.load_persona(persona_name)
|
|
|
|
# Create callbacks bound to this chat
|
|
callbacks = make_callbacks(context.bot, update.effective_chat.id)
|
|
|
|
# Create subprocess
|
|
subprocesses[name] = ClaudeSubprocess(
|
|
session_dir=session_dir,
|
|
persona=persona_data,
|
|
on_output=callbacks[0],
|
|
on_error=callbacks[1],
|
|
on_complete=callbacks[2],
|
|
on_status=callbacks[3],
|
|
)
|
|
|
|
logger.info(f"Auto-spawned subprocess for session '{name}'")
|
|
|
|
# Get persona for reply
|
|
session_data = session_manager.get_session(name)
|
|
persona_name = session_data.get('persona', 'default')
|
|
|
|
if persona_name and persona_name != 'default':
|
|
reply = f"Switched to session '{name}' (persona: {persona_name})."
|
|
else:
|
|
reply = f"Switched to session '{name}'."
|
|
|
|
await update.message.reply_text(reply)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error switching session: {e}")
|
|
await update.message.reply_text(f"Error switching session: {e}")
|
|
|
|
async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Archive a Claude session (compress and remove)."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
if not context.args:
|
|
await update.message.reply_text("Usage: /archive <name>")
|
|
return
|
|
|
|
name = context.args[0]
|
|
|
|
try:
|
|
# Terminate subprocess if running
|
|
if name in subprocesses:
|
|
if subprocesses[name].is_alive:
|
|
await subprocesses[name].terminate()
|
|
del subprocesses[name]
|
|
|
|
# Archive the session
|
|
archive_path = session_manager.archive_session(name)
|
|
size_mb = archive_path.stat().st_size / (1024 * 1024)
|
|
await update.message.reply_text(
|
|
f"Session '{name}' archived ({size_mb:.1f} MB)\n{archive_path.name}"
|
|
)
|
|
logger.info(f"Archived session '{name}' to {archive_path}")
|
|
|
|
except ValueError as e:
|
|
await update.message.reply_text(str(e))
|
|
except Exception as e:
|
|
logger.error(f"Error archiving session: {e}")
|
|
await update.message.reply_text(f"Error archiving session: {e}")
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle free text messages - route to active Claude session."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
# Check for active session
|
|
active_session = session_manager.get_active_session()
|
|
if not active_session:
|
|
await update.message.reply_text(
|
|
"No active session. Use /new <name> to start one."
|
|
)
|
|
return
|
|
|
|
message = update.message.text
|
|
logger.info(f"Routing message to session '{active_session}': {message[:50]}...")
|
|
|
|
try:
|
|
# Get or create subprocess for active session
|
|
if active_session not in subprocesses or not subprocesses[active_session].is_alive:
|
|
session_dir = session_manager.get_session_dir(active_session)
|
|
session_data = session_manager.get_session(active_session)
|
|
persona_name = session_data.get('persona', 'default')
|
|
persona_data = session_manager.load_persona(persona_name)
|
|
|
|
# Create callbacks bound to this chat
|
|
callbacks = make_callbacks(context.bot, update.effective_chat.id)
|
|
|
|
# Create subprocess
|
|
subprocesses[active_session] = ClaudeSubprocess(
|
|
session_dir=session_dir,
|
|
persona=persona_data,
|
|
on_output=callbacks[0],
|
|
on_error=callbacks[1],
|
|
on_complete=callbacks[2],
|
|
on_status=callbacks[3],
|
|
)
|
|
|
|
logger.info(f"Auto-spawned subprocess for active session '{active_session}'")
|
|
|
|
# Send message to Claude
|
|
await subprocesses[active_session].send_message(message)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling message: {e}")
|
|
await update.message.reply_text(f"Error: {e}")
|
|
|
|
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle photo messages - download and save for Claude."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
user = update.effective_user
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Create images directory
|
|
images_dir = Path(__file__).parent / 'images'
|
|
images_dir.mkdir(exist_ok=True)
|
|
|
|
# Get the largest photo (best quality)
|
|
photo = update.message.photo[-1]
|
|
file = await context.bot.get_file(photo.file_id)
|
|
|
|
# Download the image
|
|
filename = f"{file_timestamp}.jpg"
|
|
filepath = images_dir / filename
|
|
await file.download_to_drive(filepath)
|
|
|
|
# Get caption if any
|
|
caption = update.message.caption or ""
|
|
caption_text = f" - \"{caption}\"" if caption else ""
|
|
|
|
# Log to inbox
|
|
with open(INBOX_FILE, 'a') as f:
|
|
f.write(f"[{timestamp}] {user.first_name}: [IMAGE: {filepath}]{caption_text}\n")
|
|
|
|
logger.info(f"Photo saved from {user.first_name}: {filepath}")
|
|
|
|
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle document/file messages - download and save for Claude."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
|
|
user = update.effective_user
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Create files directory
|
|
files_dir = Path(__file__).parent / 'files'
|
|
files_dir.mkdir(exist_ok=True)
|
|
|
|
# Get document info
|
|
doc = update.message.document
|
|
original_name = doc.file_name or "unknown"
|
|
file = await context.bot.get_file(doc.file_id)
|
|
|
|
# Download with timestamp prefix to avoid collisions
|
|
filename = f"{file_timestamp}_{original_name}"
|
|
filepath = files_dir / filename
|
|
await file.download_to_drive(filepath)
|
|
|
|
# Get caption if any
|
|
caption = update.message.caption or ""
|
|
caption_text = f" - \"{caption}\"" if caption else ""
|
|
|
|
# Log to inbox
|
|
with open(INBOX_FILE, 'a') as f:
|
|
f.write(f"[{timestamp}] {user.first_name}: [FILE: {filepath}]{caption_text}\n")
|
|
|
|
logger.info(f"Document saved from {user.first_name}: {filepath}")
|
|
|
|
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle unknown commands."""
|
|
if not is_authorized(update.effective_user.id):
|
|
return
|
|
await update.message.reply_text("Unknown command. Use /help for available commands.")
|
|
|
|
def main():
|
|
"""Start the bot."""
|
|
# Create application
|
|
app = Application.builder().token(TOKEN).build()
|
|
|
|
# Add handlers
|
|
app.add_handler(CommandHandler("start", start))
|
|
app.add_handler(CommandHandler("help", help_command))
|
|
app.add_handler(CommandHandler("chatid", chatid))
|
|
app.add_handler(CommandHandler("new", new_session))
|
|
app.add_handler(CommandHandler("session", switch_session_cmd))
|
|
app.add_handler(CommandHandler("archive", archive_session_cmd))
|
|
app.add_handler(CommandHandler("status", status))
|
|
app.add_handler(CommandHandler("pbs", pbs))
|
|
app.add_handler(CommandHandler("pbs_status", pbs_status))
|
|
app.add_handler(CommandHandler("backups", backups))
|
|
app.add_handler(CommandHandler("pbs_errors", pbs_errors))
|
|
app.add_handler(CommandHandler("pbs_gc", pbs_gc))
|
|
app.add_handler(CommandHandler("beszel", beszel))
|
|
app.add_handler(CommandHandler("kuma", kuma))
|
|
app.add_handler(CommandHandler("ping", ping_host))
|
|
|
|
# Unknown command handler
|
|
app.add_handler(MessageHandler(filters.COMMAND, unknown))
|
|
|
|
# Free text message handler (for messages to Claude)
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message, block=False))
|
|
|
|
# Photo handler
|
|
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
|
|
|
|
# Document/file handler
|
|
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
|
|
|
|
# Start polling
|
|
logger.info("Starting Homelab Bot...")
|
|
app.run_polling(allowed_updates=Update.ALL_TYPES)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|