# Architecture Research **Domain:** Telegram Bot with Claude Code CLI Session Management **Researched:** 2026-02-04 **Confidence:** HIGH ## Standard Architecture ### System Overview ``` ┌─────────────────────────────────────────────────────────────────────┐ │ Telegram API (External) │ └────────────────────────────────┬────────────────────────────────────┘ │ (webhooks or polling) ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ Bot Event Loop (asyncio) │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Message │ │ Photo │ │ Document │ │ │ │ Handler │ │ Handler │ │ Handler │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ │ └──────────────────┴──────────────────┘ │ │ ↓ │ │ ┌─────────────────┐ │ │ │ Route to │ │ │ │ Session │ │ │ │ (path-based) │ │ │ └────────┬────────┘ │ └────────────────────────────┼─────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ Session Manager │ │ │ │ ~/telegram/sessions// │ │ ├── metadata.json (state, timestamps, config) │ │ ├── conversation.jsonl (message history) │ │ ├── images/ (attachments) │ │ ├── files/ (documents) │ │ └── .claude_session_id (Claude session ID for --resume) │ │ │ │ Session States: │ │ [IDLE] → [SPAWNING] → [ACTIVE] → [IDLE] → [SUSPENDED] │ │ │ │ Idle Timeout: 10 minutes of inactivity → graceful suspend │ │ │ └────────────────────────────┬────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ Process Manager (per session) │ │ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ Claude Code CLI Process (subprocess) │ │ │ │ │ │ │ │ Command: claude --resume \ │ │ │ │ --model haiku \ │ │ │ │ --output-format stream-json \ │ │ │ │ --input-format stream-json \ │ │ │ │ --no-interactive \ │ │ │ │ --dangerously-skip-permissions │ │ │ │ │ │ │ │ stdin ←─────── Message Queue (async) │ │ │ │ stdout ─────→ Response Buffer (async readline) │ │ │ │ stderr ─────→ Error Logger │ │ │ │ │ │ │ │ State: RUNNING | PROCESSING | IDLE | TERMINATED │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ Process lifecycle: │ │ 1. create_subprocess_exec() with PIPE streams │ │ 2. asyncio tasks for stdout reader + stderr reader │ │ 3. Message queue feeds stdin writer │ │ 4. Idle timeout monitor (background task) │ │ 5. Graceful shutdown: close stdin, await process.wait() │ │ │ └────────────────────────────┬────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ Response Router │ │ │ │ Parses Claude Code --output-format stream-json: │ │ {"type": "text", "content": "..."} │ │ {"type": "tool_use", "name": "Read", "input": {...}} │ │ {"type": "tool_result", "tool_use_id": "...", "content": "..."} │ │ │ │ Routes output back to Telegram: │ │ - Buffers text chunks until complete message │ │ - Formats code blocks with Markdown │ │ - Splits long messages (4096 char Telegram limit) │ │ - Sends images via bot.send_photo() if Claude generates files │ │ │ └─────────────────────────────────────────────────────────────────────┘ ``` ### Component Responsibilities | Component | Responsibility | Typical Implementation | |-----------|----------------|------------------------| | **Bot Event Loop** | Receives Telegram updates (messages, photos, documents), dispatches to handlers | `python-telegram-bot` Application with async handlers | | **Message Router** | Maps Telegram chat_id to session path, creates session if needed, loads/saves metadata | Path-based directory structure: `~/telegram/sessions//` | | **Session Manager** | Owns session lifecycle: create, load, update metadata, check idle timeout, suspend/resume | Python class with async methods, uses file locks for concurrency safety | | **Process Manager** | Spawns/manages Claude Code CLI subprocess per session, handles stdin/stdout/stderr streams | `asyncio.create_subprocess_exec()` with PIPE streams, background reader tasks | | **Message Queue** | Buffers incoming messages from Telegram, feeds to Claude stdin as stream-json | `asyncio.Queue` per session, async writer task | | **Response Buffer** | Reads stdout line-by-line, parses stream-json, accumulates text chunks | Async reader task with `process.stdout.readline()`, JSON parsing | | **Response Router** | Formats Claude output for Telegram (Markdown, code blocks, chunking), sends via bot API | Telegram formatting helpers, message splitting logic | | **Idle Monitor** | Tracks last activity timestamp per session, triggers graceful shutdown after timeout | Background `asyncio.Task` checking timestamps, calls suspend on timeout | | **Cost Monitor** | Routes to Haiku for monitoring commands (/status, /pbs), switches to Opus for conversational messages | Model selection logic based on message type (command vs. text) | ## Recommended Project Structure ``` telegram/ ├── bot.py # Main entry point (systemd service) ├── credentials # Bot token (existing) ├── authorized_users # Allowed chat IDs (existing) ├── inbox # Old single-session inbox (deprecated, remove after migration) ├── images/ # Old images dir (deprecated) ├── files/ # Old files dir (deprecated) │ ├── sessions/ # NEW: Multi-session storage │ ├── main/ # Default session │ │ ├── metadata.json │ │ ├── conversation.jsonl │ │ ├── images/ │ │ ├── files/ │ │ └── .claude_session_id │ │ │ ├── homelab/ # Path-based session example │ │ └── ... │ │ │ └── dev/ # Another session │ └── ... │ └── lib/ # NEW: Modularized code ├── __init__.py ├── router.py # Message routing logic (chat_id → session) ├── session.py # Session class (metadata, state, paths) ├── process_manager.py # ProcessManager class (spawn, communicate, monitor) ├── stream_parser.py # Claude stream-json parser ├── telegram_formatter.py # Telegram response formatting ├── idle_monitor.py # Idle timeout background task └── cost_optimizer.py # Model selection (Haiku vs Opus) ``` ### Structure Rationale - **sessions/ directory:** Path-based isolation, one directory per conversation context. Allows multiple simultaneous sessions without state bleeding. Each session directory is self-contained for easy inspection, backup, and debugging. - **lib/ modularization:** Current bot.py is 375 lines with single-session logic. Multi-session with subprocess management will easily exceed 1000+ lines. Breaking into modules improves testability, readability, and allows incremental development. - **Metadata files:** `metadata.json` stores session state (IDLE/ACTIVE/SUSPENDED), last activity timestamp, Claude session ID, and configuration (model choice, custom prompts). `conversation.jsonl` is append-only message log (one JSON object per line) for audit trail and potential Claude context replay. - **Separation of concerns:** Each module has one job. Router doesn't know about processes. ProcessManager doesn't know about Telegram. Session class is pure data structure. This enables testing each component in isolation. ## Architectural Patterns ### Pattern 1: Path-Based Session Routing **What:** Map Telegram chat_id to filesystem path `~/telegram/sessions//` to isolate conversation contexts. Session name derived from explicit user command (`/session `) or defaults to "main". **When to use:** When a single bot needs to maintain multiple independent conversation contexts for the same user (e.g., "homelab" for infrastructure work, "dev" for coding, "personal" for notes). **Trade-offs:** - **Pro:** Filesystem provides natural isolation, easy to inspect/backup/delete sessions, no database needed - **Pro:** Path-based routing is conceptually simple and debuggable - **Con:** File locks needed for concurrent access (though Telegram updates are sequential per chat_id) - **Con:** Large number of sessions (1000+) could strain filesystem if poorly managed **Example:** ```python # router.py class SessionRouter: def __init__(self, base_path: Path): self.base_path = base_path self.chat_sessions = {} # chat_id → current session_name def get_session_path(self, chat_id: int) -> Path: """Get current session path for chat_id.""" session_name = self.chat_sessions.get(chat_id, "main") path = self.base_path / session_name path.mkdir(parents=True, exist_ok=True) return path def switch_session(self, chat_id: int, session_name: str): """Switch chat_id to a different session.""" self.chat_sessions[chat_id] = session_name ``` ### Pattern 2: Async Subprocess with Bidirectional Streams **What:** Use `asyncio.create_subprocess_exec()` with PIPE streams for stdin/stdout/stderr. Launch separate async tasks for reading stdout and stderr to avoid deadlocks. Feed stdin via async queue. **When to use:** When you need to interact with a long-running interactive CLI tool (like Claude Code) that reads from stdin and writes to stdout continuously. **Trade-offs:** - **Pro:** Python's asyncio subprocess module handles complex stream management - **Pro:** Non-blocking I/O allows bot to remain responsive while Claude processes - **Pro:** Separate reader tasks prevent buffer-full deadlocks - **Con:** More complex than simple `subprocess.run()` or `communicate()` - **Con:** Must manually manage process lifecycle (startup, shutdown, crashes) **Example:** ```python # process_manager.py class ProcessManager: async def spawn_claude(self, session_id: str, model: str = "haiku"): """Spawn Claude Code CLI subprocess.""" self.process = await asyncio.create_subprocess_exec( "claude", "--resume", session_id, "--model", model, "--output-format", "stream-json", "--input-format", "stream-json", "--no-interactive", "--dangerously-skip-permissions", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Launch reader tasks self.stdout_task = asyncio.create_task(self._read_stdout()) self.stderr_task = asyncio.create_task(self._read_stderr()) self.state = "RUNNING" async def _read_stdout(self): """Read stdout line-by-line, parse stream-json.""" while True: line = await self.process.stdout.readline() if not line: break # EOF try: event = json.loads(line.decode()) await self.output_queue.put(event) except json.JSONDecodeError as e: logger.error(f"Failed to parse Claude output: {e}") async def _read_stderr(self): """Log stderr output.""" while True: line = await self.process.stderr.readline() if not line: break logger.warning(f"Claude stderr: {line.decode().strip()}") async def send_message(self, message: str): """Send message to Claude stdin as stream-json.""" event = {"type": "message", "content": message} json_line = json.dumps(event) + "\n" self.process.stdin.write(json_line.encode()) await self.process.stdin.drain() ``` ### Pattern 3: State Machine for Session Lifecycle **What:** Define explicit states for each session (IDLE, SPAWNING, ACTIVE, PROCESSING, SUSPENDED) with transitions based on events (message_received, response_sent, timeout_reached, user_command). **When to use:** When managing complex lifecycle with timeouts, retries, and graceful shutdowns. State machine makes transitions explicit and debuggable. **Trade-offs:** - **Pro:** Clear semantics for what can happen in each state - **Pro:** Easier to add new states (e.g., PAUSED, ERROR) without breaking existing logic - **Pro:** Testable: can unit test state transitions independently - **Con:** Overhead for simple cases (but this is not a simple case) - **Con:** Requires discipline to update state consistently **Example:** ```python # session.py from enum import Enum class SessionState(Enum): IDLE = "idle" # No process running, session directory exists SPAWNING = "spawning" # Process being created ACTIVE = "active" # Process running, waiting for input PROCESSING = "processing" # Process running, handling a message SUSPENDED = "suspended" # Timed out, process terminated, state saved class Session: def __init__(self, path: Path): self.path = path self.state = SessionState.IDLE self.last_activity = datetime.now() self.process_manager = None self.claude_session_id = self._load_claude_session_id() async def transition(self, new_state: SessionState): """Transition to new state with logging.""" logger.info(f"Session {self.path.name}: {self.state.value} → {new_state.value}") self.state = new_state self._save_metadata() async def handle_message(self, message: str): """Main message handling logic.""" self.last_activity = datetime.now() if self.state == SessionState.IDLE: await self.transition(SessionState.SPAWNING) await self._spawn_process() await self.transition(SessionState.ACTIVE) if self.state == SessionState.ACTIVE: await self.transition(SessionState.PROCESSING) await self.process_manager.send_message(message) # Wait for response, transition back to ACTIVE when done async def check_idle_timeout(self, timeout_seconds: int = 600): """Check if session should be suspended.""" if self.state in [SessionState.ACTIVE, SessionState.PROCESSING]: idle_time = (datetime.now() - self.last_activity).total_seconds() if idle_time > timeout_seconds: await self.suspend() async def suspend(self): """Gracefully shut down process, save state.""" if self.process_manager: await self.process_manager.shutdown() await self.transition(SessionState.SUSPENDED) ``` ### Pattern 4: Cost Optimization with Model Switching **What:** Use Haiku (cheap, fast) for monitoring commands that invoke helper scripts (`/status`, `/pbs`, `/beszel`). Switch to Opus (expensive, smart) for open-ended conversational messages. **When to use:** When cost is a concern and some tasks don't need the most capable model. **Trade-offs:** - **Pro:** Significant cost savings (Haiku is 100x cheaper than Opus per million tokens) - **Pro:** Faster responses for simple monitoring queries - **Con:** Need to maintain routing logic for which messages use which model - **Con:** Risk of using wrong model if classification is incorrect **Example:** ```python # cost_optimizer.py class ModelSelector: MONITORING_COMMANDS = {"/status", "/pbs", "/backups", "/beszel", "/kuma", "/ping"} @staticmethod def select_model(message: str) -> str: """Choose model based on message type.""" # Command messages use Haiku if message.strip().startswith("/") and message.split()[0] in ModelSelector.MONITORING_COMMANDS: return "haiku" # Conversational messages use Opus return "opus" @staticmethod async def spawn_with_model(session: Session, message: str): """Spawn Claude process with appropriate model.""" model = ModelSelector.select_model(message) logger.info(f"Spawning Claude with model: {model}") await session.process_manager.spawn_claude( session_id=session.claude_session_id, model=model ) ``` ## Data Flow ### Request Flow ``` [User sends message in Telegram] ↓ [Bot receives Update via polling] ↓ [MessageHandler extracts text, chat_id] ↓ [SessionRouter maps chat_id → session_path] ↓ [Load Session from filesystem (metadata.json)] ↓ [Check session state] ↓ ┌───────────────────────────────────────┐ │ State: IDLE or SUSPENDED │ │ ↓ │ │ ModelSelector chooses Haiku or Opus │ │ ↓ │ │ ProcessManager spawns Claude CLI: │ │ claude --resume \ │ │ --model \ │ │ --output-format stream-json │ │ ↓ │ │ Session transitions to ACTIVE │ └───────────────────────────────────────┘ ↓ [Format message as stream-json] ↓ [Write to process.stdin, drain buffer] ↓ [Session transitions to PROCESSING] ↓ [Claude processes request...] ``` ### Response Flow ``` [Claude writes to stdout (stream-json events)] ↓ [AsyncIO reader task reads line-by-line] ↓ [Parse JSON: {"type": "text", "content": "..."}] ↓ [StreamParser accumulates text chunks] ↓ [Detect end-of-response marker] ↓ [ResponseFormatter applies Markdown, splits long messages] ↓ [Send to Telegram via bot.send_message()] ↓ [Session transitions to ACTIVE] ↓ [Update last_activity timestamp] ↓ [IdleMonitor background task checks timeout] ↓ ┌───────────────────────────────────────┐ │ If idle > 10 minutes: │ │ ↓ │ │ Session.suspend() │ │ ↓ │ │ ProcessManager.shutdown(): │ │ - close stdin │ │ - await process.wait(timeout=5s) │ │ - force kill if still running │ │ ↓ │ │ Session transitions to SUSPENDED │ │ ↓ │ │ Save metadata (state, timestamp) │ └───────────────────────────────────────┘ ``` ### Key Data Flows 1. **Message ingestion:** Telegram Update → Handler → Router → Session → ProcessManager → Claude stdin - Async all the way, no blocking calls - Each session has independent queue to avoid cross-session interference 2. **Response streaming:** Claude stdout → Reader task → StreamParser → Formatter → Telegram API - Line-by-line reading prevents memory issues with large responses - Chunking respects Telegram's 4096 character limit per message 3. **File attachments:** Telegram photo/document → Download to `sessions//images/` or `files/` → Log to conversation.jsonl → Available for Claude via file path - When user sends photo, log path to conversation so next message can reference it - Claude can read images via Read tool if path is mentioned 4. **Idle timeout:** Background task checks `last_activity` every 60 seconds → If >10 min idle → Trigger graceful shutdown - Prevents zombie processes accumulating and consuming resources - Session state saved to disk, resumes transparently when user returns ## Scaling Considerations | Scale | Architecture Adjustments | |-------|--------------------------| | 1-5 users (current) | Single LXC container, filesystem-based sessions, no database needed. Idle timeout prevents resource exhaustion. | | 5-20 users | Add session cleanup job (delete sessions inactive >30 days). Monitor disk space for sessions/ directory. Consider Redis for chat_id → session_name mapping if restarting bot frequently. | | 20-100 users | Move session storage to separate ZFS dataset with quota. Add metrics (Prometheus) for session count, process count, API cost. Implement rate limiting per user. Consider dedicated container for bot. | | 100+ users | Multi-bot deployment (shard by chat_id). Centralized session storage (S3/MinIO). Queue-based architecture (RabbitMQ) to decouple Telegram polling from processing. Separate Claude API keys per bot instance to avoid rate limits. | ### Scaling Priorities 1. **First bottleneck:** Disk I/O from many sessions writing conversation logs concurrently - **Fix:** Use ZFS with compression, optimize writes (batch metadata updates, async file I/O) 2. **Second bottleneck:** Claude API rate limits (multiple users sending messages simultaneously) - **Fix:** Queue messages per user, implement retry with exponential backoff, surface "API busy" message to user 3. **Third bottleneck:** Memory usage from many concurrent Claude processes (each process ~100-200MB) - **Fix:** Aggressive idle timeout (reduce from 10min to 5min), limit max concurrent sessions, queue requests if too many processes ## Anti-Patterns ### Anti-Pattern 1: Blocking I/O in Async Context **What people do:** Call blocking `subprocess.run()` or `open().read()` directly in async handlers, blocking the entire event loop. **Why it's wrong:** Telegram bot uses async event loop. Blocking call freezes all handlers until it completes, making bot unresponsive to other users. **Do this instead:** Use `asyncio.create_subprocess_exec()` for subprocess, `aiofiles` for file I/O, or wrap blocking calls in `asyncio.to_thread()` (Python 3.9+). ```python # ❌ BAD: Blocks event loop async def handle_message(update, context): result = subprocess.run(["long-command"], capture_output=True) # Blocks! await update.message.reply_text(result.stdout) # ✅ GOOD: Non-blocking async subprocess async def handle_message(update, context): process = await asyncio.create_subprocess_exec( "long-command", stdout=asyncio.subprocess.PIPE ) stdout, _ = await process.communicate() await update.message.reply_text(stdout.decode()) ``` ### Anti-Pattern 2: Using communicate() for Interactive Processes **What people do:** Spawn subprocess and call `await process.communicate(input=message)` for every message, expecting bidirectional interaction. **Why it's wrong:** `communicate()` sends input, closes stdin, and waits for process to exit. It's designed for one-shot commands, not interactive sessions. Process exits after first response. **Do this instead:** Keep process alive, manually manage stdin/stdout streams with separate reader/writer tasks. Never call `communicate()` on long-running processes. ```python # ❌ BAD: Process exits after first message async def send_message(self, message): stdout, stderr = await self.process.communicate(input=message.encode()) # Process is now dead, must spawn again for next message # ✅ GOOD: Keep process alive async def send_message(self, message): self.process.stdin.write(message.encode() + b"\n") await self.process.stdin.drain() # Process still running, can send more messages ``` ### Anti-Pattern 3: Ignoring Idle Processes **What people do:** Spawn subprocess when user sends message, never clean up when user goes idle. Accumulate processes indefinitely. **Why it's wrong:** Each Claude process consumes memory (~100-200MB). With 20 users, that's 4GB of RAM wasted on idle sessions. Container OOM kills bot. **Do this instead:** Implement idle timeout monitor. Track `last_activity` per session. Background task checks every 60s, suspends sessions idle >10min. ```python # ✅ GOOD: Idle monitoring class IdleMonitor: async def monitor_loop(self, sessions: dict[str, Session]): """Background task to check idle timeouts.""" while True: await asyncio.sleep(60) # Check every minute for session in sessions.values(): if session.state in [SessionState.ACTIVE, SessionState.PROCESSING]: idle_time = (datetime.now() - session.last_activity).total_seconds() if idle_time > 600: # 10 minutes logger.info(f"Suspending idle session: {session.path.name}") await session.suspend() ``` ### Anti-Pattern 4: Mixing Session State Across Chats **What people do:** Use single global conversation history for all chats, or use chat_id as session identifier without allowing multiple sessions per user. **Why it's wrong:** User can't maintain separate contexts (e.g., "homelab" session for infra, "dev" session for coding). All conversations bleed together, Claude gets confused by mixed context. **Do this instead:** Implement path-based routing with explicit session names. Allow user to switch sessions with `/session ` command. Each session has independent filesystem directory and Claude session ID. ```python # ✅ GOOD: Path-based session isolation class SessionRouter: def get_or_create_session(self, chat_id: int, session_name: str = "main") -> Session: """Get session by chat_id and name.""" key = f"{chat_id}:{session_name}" if key not in self.active_sessions: path = self.base_path / str(chat_id) / session_name self.active_sessions[key] = Session(path) return self.active_sessions[key] ``` ## Integration Points ### External Services | Service | Integration Pattern | Notes | |---------|---------------------|-------| | **Telegram Bot API** | Polling via `Application.run_polling()`, async handlers receive `Update` objects | Rate limit: 30 messages/second per bot. Use `python-telegram-bot` v21.8+ for native asyncio support. | | **Claude Code CLI** | Subprocess invocation with `--output-format stream-json`, bidirectional stdin/stdout communication | Must use `--no-interactive` flag for programmatic usage. `--dangerously-skip-permissions` required to avoid prompts blocking stdin. | | **Homelab Helper Scripts** | Called via subprocess by Claude when responding to monitoring commands (`/status` → `~/bin/pbs status`) | Claude has access via Bash tool. Output captured in stdout, returned to user. | | **Filesystem (Sessions)** | Direct file I/O for metadata, conversation logs, attachments. Use `aiofiles` for async file operations | Append-only `conversation.jsonl` provides audit trail and potential replay capability. | ### Internal Boundaries | Boundary | Communication | Notes | |----------|---------------|-------| | **Bot ↔ SessionRouter** | Function calls: `router.get_session(chat_id)` returns `Session` object | Router owns mapping of chat_id to session. Stateless, can be rebuilt from filesystem. | | **SessionRouter ↔ Session** | Function calls: `session.handle_message(text)` async method | Session encapsulates state machine, owns ProcessManager. | | **Session ↔ ProcessManager** | Function calls: `process_manager.spawn_claude()`, `send_message()`, `shutdown()` async methods | ProcessManager owns subprocess lifecycle. Session doesn't know about asyncio streams. | | **ProcessManager ↔ Claude CLI** | OS pipes: stdin (write), stdout (read), stderr (read) | Never use `communicate()` for interactive processes. Manual stream management required. | | **StreamParser ↔ ResponseFormatter** | Function calls: `parser.accumulate(event)` returns buffered text, `formatter.format_for_telegram(text)` returns list of message chunks | Parser handles stream-json protocol, Formatter handles Telegram-specific quirks (Markdown escaping, 4096 char limit). | | **IdleMonitor ↔ Session** | Background task calls `session.check_idle_timeout()` periodically | Monitor is global background task, iterates over all active sessions. | ## Build Order and Dependencies Based on the architecture, here's the suggested build order with dependency reasoning: ### Phase 1: Foundation (Sessions & Routing) **Goal:** Establish multi-session filesystem structure without subprocess management yet. 1. **Session class** (`lib/session.py`) - Implement metadata file format (JSON schema for state, timestamps, config) - Implement path-based directory creation - Add state enum and state machine skeleton (transitions without actions) - Add conversation.jsonl append logging - **No dependencies** - pure data structure 2. **SessionRouter** (`lib/router.py`) - Implement chat_id → session_name mapping - Implement session creation/loading - Add command parsing for `/session ` to switch sessions - **Depends on:** Session class 3. **Update bot.py** - Integrate SessionRouter into existing handlers - Route all messages through router to session - Add `/session` command handler - **Depends on:** SessionRouter - **Testing:** Can test routing without Claude integration by just logging messages to conversation.jsonl ### Phase 2: Process Management (Claude CLI Integration) **Goal:** Spawn and communicate with Claude Code subprocess. 4. **StreamParser** (`lib/stream_parser.py`) - Implement stream-json parsing (line-by-line JSON objects) - Handle {"type": "text", "content": "..."} events - Accumulate text chunks into complete messages - **No dependencies** - pure parser 5. **ProcessManager** (`lib/process_manager.py`) - Implement `spawn_claude()` with `asyncio.create_subprocess_exec()` - Implement async stdout reader task using StreamParser - Implement async stderr reader task for logging - Implement `send_message()` to write stdin - Implement graceful `shutdown()` (close stdin, wait, force kill if hung) - **Depends on:** StreamParser 6. **Integrate ProcessManager into Session** - Update state machine to spawn process on first message (IDLE → SPAWNING → ACTIVE) - Implement `handle_message()` to pipe to ProcessManager - Add response buffering and state transitions (PROCESSING → ACTIVE) - **Depends on:** ProcessManager - **Testing:** Send message to session, verify Claude responds, check process terminates on shutdown ### Phase 3: Response Formatting & Telegram Integration **Goal:** Format Claude output for Telegram and handle attachments. 7. **TelegramFormatter** (`lib/telegram_formatter.py`) - Implement Markdown escaping for Telegram Bot API - Implement message chunking (4096 char limit) - Implement code block detection and formatting - **No dependencies** - pure formatter 8. **Update Session to use formatter** - Pipe ProcessManager output through TelegramFormatter - Send formatted chunks to Telegram via bot API - **Depends on:** TelegramFormatter 9. **File attachment handling** - Update photo/document handlers to save to session-specific paths - Log file paths to conversation.jsonl - Mention file path in next message to Claude stdin (so Claude can read it) - **Depends on:** Session with path structure ### Phase 4: Cost Optimization & Monitoring **Goal:** Implement model selection and idle timeout. 10. **ModelSelector** (`lib/cost_optimizer.py`) - Implement command detection logic - Implement model selection (Haiku for commands, Opus for conversation) - **No dependencies** - pure routing logic 11. **Update Session to use ModelSelector** - Call ModelSelector before spawning process - Pass selected model to `spawn_claude(model=...)` - **Depends on:** ModelSelector 12. **IdleMonitor** (`lib/idle_monitor.py`) - Implement background task to check last_activity timestamps - Call `session.suspend()` on timeout - **Depends on:** Session with suspend() method 13. **Integrate IdleMonitor into bot.py** - Launch monitor as background task on bot startup - Pass sessions dict to monitor - **Depends on:** IdleMonitor - **Testing:** Send message, wait >10min (or reduce timeout for testing), verify process terminates ### Phase 5: Production Hardening **Goal:** Error handling, logging, recovery. 14. **Error handling** - Add try/except around all async operations - Implement retry logic for Claude spawn failures - Handle Claude process crashes (respawn on next message) - Log all errors to structured format (JSON logs for parsing) 15. **Session recovery** - On bot startup, scan sessions/ directory - Load all ACTIVE sessions, transition to SUSPENDED (processes are dead) - User's next message will respawn process transparently 16. **Monitoring & Metrics** - Add `/sessions` command to list active sessions - Add `/session_stats` to show process count, memory usage - Log session lifecycle events (spawn, suspend, terminate) for analysis ### Dependencies Summary ``` Phase 1 (Foundation): Session (no deps) ↓ SessionRouter (→ Session) ↓ bot.py integration (→ SessionRouter) Phase 2 (Process Management): StreamParser (no deps) ↓ ProcessManager (→ StreamParser) ↓ Session integration (→ ProcessManager) Phase 3 (Formatting): TelegramFormatter (no deps) ↓ Session integration (→ TelegramFormatter) ↓ File handling (→ Session paths) Phase 4 (Optimization): ModelSelector (no deps) → Session integration IdleMonitor (→ Session) → bot.py integration Phase 5 (Hardening): Error handling (all components) Session recovery (→ Session, SessionRouter) Monitoring (→ all components) ``` ## Critical Design Decisions ### 1. Why Not Use `communicate()` for Interactive Sessions? `asyncio` documentation is clear: `communicate()` is designed for one-shot commands. It sends input, **closes stdin**, reads output, and waits for process exit. For interactive sessions where we need to send multiple messages without restarting the process, we must manually manage streams with separate reader/writer tasks. **Source:** [Python asyncio subprocess documentation](https://docs.python.org/3/library/asyncio-subprocess.html) ### 2. Why Path-Based Sessions Instead of Database? For this scale (1-20 users), filesystem is simpler: - **Inspection:** `ls sessions/` shows all sessions, `cat sessions/main/metadata.json` shows state - **Backup:** `tar -czf sessions.tar.gz sessions/` is trivial - **Debugging:** Files are human-readable JSON/JSONL - **No dependencies:** No database server to run/maintain At 100+ users, reconsider. But for homelab use case, filesystem wins on simplicity. ### 3. Why Separate Sessions Instead of Single Conversation? User explicitly requested "path-based session management" in project context. Use case: separate "homelab" context from "dev" context. Single conversation would mix contexts and confuse Claude. Sessions provide clean isolation. ### 4. Why Idle Timeout Instead of Keeping Processes Forever? Each Claude process consumes ~100-200MB RAM. On LXC container with limited resources, 10 idle processes = 1-2GB wasted. Idle timeout ensures resources freed when not in use, process transparently respawns on next message. ### 5. Why Haiku for Monitoring Commands? Monitoring commands (`/status`, `/pbs`) invoke helper scripts that return structured data. Claude's role is minimal (format output, maybe add explanation). Haiku is sufficient and 100x cheaper. Save Opus for complex analysis and conversation. **Cost reference:** As of 2026, Claude 4.5 Haiku costs $0.80/$4.00 per million tokens (input/output), while Opus costs $15/$75 per million tokens. ## Sources ### High Confidence (Official Documentation) - [Python asyncio subprocess documentation](https://docs.python.org/3/library/asyncio-subprocess.html) - Process class methods, create_subprocess_exec, deadlock warnings - [Claude Code CLI reference](https://code.claude.com/docs/en/cli-reference) - All CLI flags, --resume usage, --output-format stream-json, --no-interactive mode - [python-telegram-bot documentation](https://docs.python-telegram-bot.org/) - Application class, async handlers, ConversationHandler for state management ### Medium Confidence (Implementation Guides & Community) - [Python subprocess bidirectional communication patterns](https://pymotw.com/3/asyncio/subprocesses.html) - Practical examples of PIPE usage - [Streaming subprocess stdin/stdout with asyncio](https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/) - Async stream management patterns - [Session management in Telegram bots](https://macaron.im/blog/openclaw-telegram-bot-setup) - Path-based routing, session key patterns - [Claude Code session management guide](https://stevekinney.com/courses/ai-development/claude-code-session-management) - --resume usage, session continuity - [Python multiprocessing best practices 2026](https://copyprogramming.com/howto/python-python-multiprocessing-process-terminate-code-example) - Process lifecycle, graceful shutdown ### Key Takeaways from Research 1. **Asyncio subprocess requires manual stream management** - Never use `communicate()` for interactive processes, must read stdout/stderr in separate tasks to avoid deadlocks 2. **Claude Code CLI supports programmatic usage** - `--output-format stream-json` + `--input-format stream-json` + `--no-interactive` enables subprocess integration 3. **Session isolation is standard pattern** - Path-based or key-based routing prevents context bleeding across conversations 4. **Idle timeout is essential** - Without cleanup, processes accumulate indefinitely, exhausting resources 5. **State machines make lifecycle explicit** - IDLE → SPAWNING → ACTIVE → PROCESSING → SUSPENDED transitions prevent race conditions and clarify behavior --- *Architecture research for: Telegram-to-Claude Code Bridge* *Researched: 2026-02-04*