homelab/telegram/bot.py
Mikkel Georgsen c50c348004 Add Telegram bot and shared storage documentation
- Telegram bot (@georgsen_homelab_bot) for two-way communication
  - Commands: /status, /pbs, /backups, /beszel, /kuma, /ping
  - Photos and files saved to inbox for Claude to read
  - Runs as systemd user service
- Shared storage via ZFS bind mounts
  - rpool/shared/mikkel on PVE host
  - Mounted to ~/stuff in mgmt, dev, general containers
  - SMB access via \\mgmt\stuff (Tailscale MagicDNS)
- Updated helper scripts list in CLAUDE.md

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 14:21:51 +00:00

374 lines
12 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Homelab Telegram Bot
Two-way interactive bot for homelab management and notifications.
"""
import asyncio
import logging
import os
import subprocess
from datetime import datetime
from pathlib import Path
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
# Setup logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Load credentials
CREDENTIALS_FILE = Path(__file__).parent / 'credentials'
config = {}
with open(CREDENTIALS_FILE) as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config[key] = value
TOKEN = config['TELEGRAM_BOT_TOKEN']
# Authorized users file (stores chat IDs that are allowed to use the bot)
AUTHORIZED_FILE = Path(__file__).parent / 'authorized_users'
# Inbox file for messages to Claude
INBOX_FILE = Path(__file__).parent / 'inbox'
def get_authorized_users():
"""Load authorized user IDs."""
if AUTHORIZED_FILE.exists():
with open(AUTHORIZED_FILE) as f:
return set(int(line.strip()) for line in f if line.strip())
return set()
def add_authorized_user(user_id: int):
"""Add a user to authorized list."""
users = get_authorized_users()
users.add(user_id)
with open(AUTHORIZED_FILE, 'w') as f:
for uid in users:
f.write(f"{uid}\n")
def is_authorized(user_id: int) -> bool:
"""Check if user is authorized."""
return user_id in get_authorized_users()
def run_command(cmd: list, timeout: int = 30) -> str:
"""Run a shell command and return output."""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env={**os.environ, 'PATH': f"/home/mikkel/bin:{os.environ.get('PATH', '')}"}
)
output = result.stdout or result.stderr or "No output"
# Telegram has 4096 char limit per message
if len(output) > 4000:
output = output[:4000] + "\n... (truncated)"
return output
except subprocess.TimeoutExpired:
return "Command timed out"
except Exception as e:
return f"Error: {e}"
# Command handlers
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command - first contact with bot."""
user = update.effective_user
chat_id = update.effective_chat.id
if not is_authorized(user.id):
# First user becomes authorized automatically
if not get_authorized_users():
add_authorized_user(user.id)
await update.message.reply_text(
f"Welcome {user.first_name}! You're now authorized as the admin.\n"
f"Your chat ID: {chat_id}\n\n"
f"Use /help to see available commands."
)
else:
await update.message.reply_text(
f"Unauthorized. Your user ID: {user.id}\n"
"Contact the admin to get access."
)
return
await update.message.reply_text(
f"Welcome back {user.first_name}!\n"
f"Use /help to see available commands."
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /help command."""
if not is_authorized(update.effective_user.id):
return
help_text = """
*Homelab Bot Commands*
*Status & Monitoring:*
/status - Quick service overview
/pbs - PBS backup status
/backups - Last backup per VM/CT
/beszel - Server metrics
/kuma - Uptime Kuma status
*PBS Commands:*
/pbs\\_status - Full PBS overview
/pbs\\_errors - Recent errors
/pbs\\_gc - Garbage collection status
*System:*
/ping <host> - Ping a host
/help - This help message
/chatid - Show your chat ID
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def chatid(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Show chat ID (useful for notifications)."""
await update.message.reply_text(f"Chat ID: `{update.effective_chat.id}`", parse_mode='Markdown')
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Quick status overview."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Checking services...")
# Quick checks
output_lines = ["*Homelab Status*\n"]
# Check key services via ping
services = [
("NPM", "10.5.0.1"),
("DNS", "10.5.0.2"),
("PBS", "10.5.0.6"),
("Dockge", "10.5.0.10"),
("Forgejo", "10.5.0.14"),
]
for name, ip in services:
result = subprocess.run(
["ping", "-c", "1", "-W", "1", ip],
capture_output=True
)
status = "up" if result.returncode == 0 else "down"
output_lines.append(f"{'' if status == 'up' else ''} {name} ({ip})")
await update.message.reply_text("\n".join(output_lines), parse_mode='Markdown')
async def pbs(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS quick status."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Fetching PBS status...")
output = run_command(["/home/mikkel/bin/pbs", "status"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def pbs_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS full status."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Fetching PBS status...")
output = run_command(["/home/mikkel/bin/pbs", "status"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def backups(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS backups per VM/CT."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Fetching backup status...")
output = run_command(["/home/mikkel/bin/pbs", "backups"], timeout=60)
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def pbs_errors(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS errors."""
if not is_authorized(update.effective_user.id):
return
output = run_command(["/home/mikkel/bin/pbs", "errors"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def pbs_gc(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""PBS garbage collection status."""
if not is_authorized(update.effective_user.id):
return
output = run_command(["/home/mikkel/bin/pbs", "gc"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def beszel(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Beszel server metrics."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Fetching server metrics...")
output = run_command(["/home/mikkel/bin/beszel", "status"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def kuma(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Uptime Kuma status."""
if not is_authorized(update.effective_user.id):
return
output = run_command(["/home/mikkel/bin/kuma", "list"])
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def ping_host(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Ping a host."""
if not is_authorized(update.effective_user.id):
return
if not context.args:
await update.message.reply_text("Usage: /ping <host>")
return
host = context.args[0]
# Basic validation
if not host.replace('.', '').replace('-', '').isalnum():
await update.message.reply_text("Invalid hostname")
return
result = subprocess.run(
["ping", "-c", "3", "-W", "2", host],
capture_output=True,
text=True,
timeout=10
)
output = result.stdout or result.stderr
await update.message.reply_text(f"```\n{output}\n```", parse_mode='Markdown')
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle free text messages - save to inbox for Claude."""
if not is_authorized(update.effective_user.id):
return
user = update.effective_user
message = update.message.text
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Append to inbox file
with open(INBOX_FILE, 'a') as f:
f.write(f"[{timestamp}] {user.first_name}: {message}\n")
# Silent save - no reply needed
logger.info(f"Inbox message from {user.first_name}: {message[:50]}...")
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle photo messages - download and save for Claude."""
if not is_authorized(update.effective_user.id):
return
user = update.effective_user
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Create images directory
images_dir = Path(__file__).parent / 'images'
images_dir.mkdir(exist_ok=True)
# Get the largest photo (best quality)
photo = update.message.photo[-1]
file = await context.bot.get_file(photo.file_id)
# Download the image
filename = f"{file_timestamp}.jpg"
filepath = images_dir / filename
await file.download_to_drive(filepath)
# Get caption if any
caption = update.message.caption or ""
caption_text = f" - \"{caption}\"" if caption else ""
# Log to inbox
with open(INBOX_FILE, 'a') as f:
f.write(f"[{timestamp}] {user.first_name}: [IMAGE: {filepath}]{caption_text}\n")
logger.info(f"Photo saved from {user.first_name}: {filepath}")
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle document/file messages - download and save for Claude."""
if not is_authorized(update.effective_user.id):
return
user = update.effective_user
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Create files directory
files_dir = Path(__file__).parent / 'files'
files_dir.mkdir(exist_ok=True)
# Get document info
doc = update.message.document
original_name = doc.file_name or "unknown"
file = await context.bot.get_file(doc.file_id)
# Download with timestamp prefix to avoid collisions
filename = f"{file_timestamp}_{original_name}"
filepath = files_dir / filename
await file.download_to_drive(filepath)
# Get caption if any
caption = update.message.caption or ""
caption_text = f" - \"{caption}\"" if caption else ""
# Log to inbox
with open(INBOX_FILE, 'a') as f:
f.write(f"[{timestamp}] {user.first_name}: [FILE: {filepath}]{caption_text}\n")
logger.info(f"Document saved from {user.first_name}: {filepath}")
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle unknown commands."""
if not is_authorized(update.effective_user.id):
return
await update.message.reply_text("Unknown command. Use /help for available commands.")
def main():
"""Start the bot."""
# Create application
app = Application.builder().token(TOKEN).build()
# Add handlers
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("help", help_command))
app.add_handler(CommandHandler("chatid", chatid))
app.add_handler(CommandHandler("status", status))
app.add_handler(CommandHandler("pbs", pbs))
app.add_handler(CommandHandler("pbs_status", pbs_status))
app.add_handler(CommandHandler("backups", backups))
app.add_handler(CommandHandler("pbs_errors", pbs_errors))
app.add_handler(CommandHandler("pbs_gc", pbs_gc))
app.add_handler(CommandHandler("beszel", beszel))
app.add_handler(CommandHandler("kuma", kuma))
app.add_handler(CommandHandler("ping", ping_host))
# Unknown command handler
app.add_handler(MessageHandler(filters.COMMAND, unknown))
# Free text message handler (for messages to Claude)
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Photo handler
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
# Document/file handler
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
# Start polling
logger.info("Starting Homelab Bot...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()