From 8fce10c5ba8f632c53c0531c47ef5e42f5ca5a16 Mon Sep 17 00:00:00 2001 From: Mikkel Georgsen Date: Wed, 4 Feb 2026 17:33:49 +0000 Subject: [PATCH] feat(01-02): create ClaudeSubprocess module for process management - Spawns Claude Code CLI with stream-json output in session directories - Reads stdout/stderr concurrently via asyncio.gather (no pipe deadlock) - Handles process lifecycle with clean termination (no zombies) - Queues messages during processing using asyncio.Queue - Auto-restarts on crash with --continue flag (max 3 retries) - Parses stream-json events (assistant, result, system) to callbacks - Supports persona configuration (system_prompt, model, max_turns) - Uses terminate() + wait_for() + kill() fallback pattern Based on research: .planning/phases/01-session-process-foundation/01-RESEARCH.md --- telegram/claude_subprocess.py | 467 ++++++++++++++++++++++++++++++++++ 1 file changed, 467 insertions(+) create mode 100644 telegram/claude_subprocess.py diff --git a/telegram/claude_subprocess.py b/telegram/claude_subprocess.py new file mode 100644 index 0000000..5834af7 --- /dev/null +++ b/telegram/claude_subprocess.py @@ -0,0 +1,467 @@ +""" +Claude Code subprocess management for Telegram bot. + +This module provides the ClaudeSubprocess class that safely spawns, communicates +with, and manages Claude Code CLI processes using asyncio. It handles the dangerous +parts: pipe management without deadlocks, process lifecycle without zombies, +message queueing during processing, and crash recovery with session resumption. + +The subprocess runs in a session directory and outputs stream-json format for +structured, line-by-line parsing. Stdout and stderr are read concurrently via +asyncio.gather to prevent pipe buffer deadlocks. + +Key features: +- Concurrent stdout/stderr reading (no pipe deadlock) +- Clean process termination (no zombie processes) +- Message queuing during processing +- Automatic crash recovery with --continue flag +- Stream-json event routing to callbacks + +Based on research in: .planning/phases/01-session-process-foundation/01-RESEARCH.md +""" + +import asyncio +import json +import logging +import os +from pathlib import Path +from typing import Callable, Optional + +logger = logging.getLogger(__name__) + + +class ClaudeSubprocess: + """ + Manages a single Claude Code CLI subprocess. + + Spawns Claude Code in a session directory, handles I/O via pipes with + concurrent stream reading, queues messages during processing, and auto-restarts + on crash with context preservation. + + Example: + sub = ClaudeSubprocess( + session_dir=Path("/home/mikkel/telegram/sessions/my-session"), + persona={"system_prompt": "You are a helpful assistant"}, + on_output=lambda text: print(f"Claude: {text}"), + on_error=lambda err: print(f"Error: {err}"), + on_complete=lambda: print("Turn complete"), + on_status=lambda status: print(f"Status: {status}") + ) + await sub.send_message("Hello, Claude!") + # ... later ... + await sub.terminate() + """ + + MAX_CRASH_RETRIES = 3 + CRASH_BACKOFF_SECONDS = 1 + + def __init__( + self, + session_dir: Path, + persona: Optional[dict] = None, + on_output: Optional[Callable[[str], None]] = None, + on_error: Optional[Callable[[str], None]] = None, + on_complete: Optional[Callable[[], None]] = None, + on_status: Optional[Callable[[str], None]] = None, + ): + """ + Initialize ClaudeSubprocess. + + Args: + session_dir: Path to session directory (cwd for subprocess) + persona: Persona dict with system_prompt and settings (model, max_turns) + on_output: Callback(text: str) for assistant text output + on_error: Callback(error: str) for error messages + on_complete: Callback() when a turn completes + on_status: Callback(status: str) for status updates (e.g. "Claude restarted") + """ + self._session_dir = Path(session_dir) + self._persona = persona or {} + self.on_output = on_output + self.on_error = on_error + self.on_complete = on_complete + self.on_status = on_status + + # Process state + self._process: Optional[asyncio.subprocess.Process] = None + self._busy = False + self._message_queue: asyncio.Queue = asyncio.Queue() + self._reader_task: Optional[asyncio.Task] = None + self._crash_count = 0 + + logger.debug( + f"ClaudeSubprocess initialized: session_dir={session_dir}, " + f"persona={persona.get('system_prompt', 'none')[:50] if persona else 'none'}" + ) + + @property + def is_busy(self) -> bool: + """Return whether subprocess is currently processing a message.""" + return self._busy + + @property + def is_alive(self) -> bool: + """Return whether subprocess process is running.""" + return self._process is not None and self._process.returncode is None + + async def send_message(self, message: str) -> None: + """ + Send a message to Claude Code. + + If no process is running: spawn one with this message. + If process IS running and BUSY: queue message. + If process IS running and IDLE: send as new turn (spawn new invocation). + + Args: + message: Message to send to Claude + """ + if not self.is_alive: + # No process running - spawn new one + await self._spawn(message) + elif self._busy: + # Process is busy - queue message + logger.debug(f"Queueing message (process busy): {message[:50]}...") + await self._message_queue.put(message) + else: + # Process is idle - spawn new turn + # Note: For simplicity, we spawn a fresh process per turn + # Phase 2+ might use --input-format stream-json for live piping + await self._spawn(message) + + async def _spawn(self, message: str) -> None: + """ + Spawn Claude Code subprocess with a message. + + Builds command with persona settings, spawns process with pipes, + and launches concurrent stream readers. + + Args: + message: Initial message to send + """ + # Build command + cmd = [ + "claude", + "-p", + message, + "--output-format", + "stream-json", + "--verbose", + ] + + # Add persona settings + if self._persona: + if "system_prompt" in self._persona: + cmd.extend(["--system-prompt", self._persona["system_prompt"]]) + if "max_turns" in self._persona: + cmd.extend(["--max-turns", str(self._persona["max_turns"])]) + if "model" in self._persona: + cmd.extend(["--model", self._persona["model"]]) + + # Add --continue if prior session exists + if (self._session_dir / ".claude").exists(): + cmd.append("--continue") + logger.debug(f"Using --continue flag (found existing .claude/ directory)") + + # Prepare environment + env = os.environ.copy() + # Ensure PATH includes ~/.local/bin and ~/bin (for claude CLI) + path_parts = env.get("PATH", "").split(":") + home_bin = str(Path.home() / "bin") + local_bin = str(Path.home() / ".local" / "bin") + if home_bin not in path_parts: + path_parts.insert(0, home_bin) + if local_bin not in path_parts: + path_parts.insert(0, local_bin) + env["PATH"] = ":".join(path_parts) + + # Ensure session directory exists + self._session_dir.mkdir(parents=True, exist_ok=True) + + logger.info( + f"Spawning Claude Code: cwd={self._session_dir}, " + f"cmd={' '.join(cmd[:4])}... (truncated)" + ) + + try: + # Spawn subprocess + self._process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(self._session_dir), + env=env, + ) + + self._busy = True + logger.info(f"Claude Code spawned: PID={self._process.pid}") + + # Launch concurrent stream readers + self._reader_task = asyncio.create_task(self._read_streams()) + + except Exception as e: + logger.error(f"Failed to spawn Claude Code: {e}") + if self.on_error: + self.on_error(f"Failed to start Claude: {e}") + self._busy = False + + async def _read_streams(self) -> None: + """ + Read stdout and stderr concurrently until both complete. + + This is CRITICAL for deadlock prevention. Both streams are read + concurrently using asyncio.gather() to prevent pipe buffer overflow. + + After streams end, waits for process termination, marks as not busy, + calls completion callback, and processes any queued messages. + """ + if not self._process: + return + + try: + # Read both streams concurrently (CRITICAL: prevents deadlock) + await asyncio.gather( + self._read_stdout(), + self._read_stderr(), + return_exceptions=True, + ) + + # Wait for process to terminate + returncode = await self._process.wait() + logger.info(f"Claude Code process exited: returncode={returncode}") + + # Handle crash (non-zero exit) + if returncode != 0: + await self._handle_crash() + return + + # Reset crash count on successful completion + self._crash_count = 0 + + except Exception as e: + logger.error(f"Error reading streams: {e}") + if self.on_error: + self.on_error(f"Stream reading error: {e}") + + finally: + # Mark as not busy + self._busy = False + + # Call completion callback + if self.on_complete: + try: + self.on_complete() + except Exception as e: + logger.error(f"Error in on_complete callback: {e}") + + # Process queued messages + if not self._message_queue.empty(): + next_message = await self._message_queue.get() + logger.debug(f"Processing queued message: {next_message[:50]}...") + await self.send_message(next_message) + + async def _read_stdout(self) -> None: + """Read stdout line-by-line and handle each line.""" + if not self._process or not self._process.stdout: + return + + try: + while True: + line = await self._process.stdout.readline() + if not line: + break + line_str = line.decode().rstrip() + if line_str: + self._handle_stdout_line(line_str) + except Exception as e: + logger.error(f"Error reading stdout: {e}") + + async def _read_stderr(self) -> None: + """Read stderr line-by-line and handle each line.""" + if not self._process or not self._process.stderr: + return + + try: + while True: + line = await self._process.stderr.readline() + if not line: + break + line_str = line.decode().rstrip() + if line_str: + self._handle_stderr_line(line_str) + except Exception as e: + logger.error(f"Error reading stderr: {e}") + + def _handle_stdout_line(self, line: str) -> None: + """ + Parse and route stream-json events from stdout. + + Handles three event types: + - "assistant": Extract text blocks and call on_output + - "result": Turn complete, check for errors + - "system": System events, log and check for errors + + Args: + line: Single line of stream-json output + """ + try: + event = json.loads(line) + event_type = event.get("type") + + if event_type == "assistant": + # Extract text from assistant message + message = event.get("message", {}) + content = message.get("content", []) + for block in content: + if block.get("type") == "text": + text = block.get("text", "") + if text and self.on_output: + try: + self.on_output(text) + except Exception as e: + logger.error(f"Error in on_output callback: {e}") + + elif event_type == "result": + # Turn complete + session_id = event.get("session_id") + if session_id: + logger.debug(f"Turn complete: session_id={session_id}") + + # Check for error + if event.get("is_error") and self.on_error: + error_msg = event.get("error", "Unknown error") + try: + self.on_error(f"Claude error: {error_msg}") + except Exception as e: + logger.error(f"Error in on_error callback: {e}") + + elif event_type == "system": + # System event + subtype = event.get("subtype") + logger.debug(f"System event: subtype={subtype}") + + # Check for error subtype + if subtype == "error" and self.on_error: + error_msg = event.get("message", "System error") + try: + self.on_error(f"System error: {error_msg}") + except Exception as e: + logger.error(f"Error in on_error callback: {e}") + + else: + logger.debug(f"Unknown event type: {event_type}") + + except json.JSONDecodeError: + # Non-JSON line (Claude Code may emit diagnostics) + logger.warning(f"Non-JSON stdout line: {line[:100]}") + + def _handle_stderr_line(self, line: str) -> None: + """ + Handle stderr output from Claude Code. + + Logs as warning. If line contains "error" (case-insensitive), + also calls on_error callback. + + Args: + line: Single line of stderr output + """ + logger.warning(f"Claude Code stderr: {line}") + + # If line contains error, notify via callback + if "error" in line.lower() and self.on_error: + try: + self.on_error(f"Claude stderr: {line}") + except Exception as e: + logger.error(f"Error in on_error callback: {e}") + + async def _handle_crash(self) -> None: + """ + Handle Claude Code crash (non-zero exit). + + Attempts to restart with --continue flag up to MAX_CRASH_RETRIES times. + Notifies user via on_status callback. + """ + self._crash_count += 1 + logger.error( + f"Claude Code crashed (attempt {self._crash_count}/{self.MAX_CRASH_RETRIES})" + ) + + if self._crash_count >= self.MAX_CRASH_RETRIES: + error_msg = f"Claude failed to restart after {self.MAX_CRASH_RETRIES} attempts" + logger.error(error_msg) + if self.on_error: + self.on_error(error_msg) + self._crash_count = 0 # Reset for next session + return + + # Notify user + if self.on_status: + try: + self.on_status("Claude crashed, restarting with context preserved...") + except Exception as e: + logger.error(f"Error in on_status callback: {e}") + + # Wait before retrying + await asyncio.sleep(self.CRASH_BACKOFF_SECONDS) + + # Respawn with --continue flag (loads most recent session) + try: + # Get next queued message or use placeholder + if not self._message_queue.empty(): + next_message = await self._message_queue.get() + else: + next_message = "continue" # Minimal message to resume + + await self._spawn(next_message) + except Exception as e: + logger.error(f"Failed to respawn after crash: {e}") + if self.on_error: + self.on_error(f"Failed to restart Claude: {e}") + + async def terminate(self, timeout: int = 10) -> None: + """ + Terminate subprocess gracefully. + + Sends SIGTERM, waits for clean exit with timeout, then SIGKILL if needed. + Always reaps process to prevent zombies. + + Args: + timeout: Seconds to wait for graceful termination before SIGKILL + """ + if not self._process or self._process.returncode is not None: + logger.debug("No process to terminate or already terminated") + return + + logger.info(f"Terminating Claude Code process: PID={self._process.pid}") + + try: + # Send SIGTERM + self._process.terminate() + + # Wait for graceful exit with timeout + try: + await asyncio.wait_for(self._process.wait(), timeout=timeout) + logger.info("Claude Code terminated gracefully") + except asyncio.TimeoutError: + # Timeout - force kill + logger.warning( + f"Claude Code did not terminate within {timeout}s, sending SIGKILL" + ) + self._process.kill() + await self._process.wait() # CRITICAL: Always reap to prevent zombie + logger.info("Claude Code killed") + + except Exception as e: + logger.error(f"Error terminating process: {e}") + + finally: + # Clear process reference + self._process = None + self._busy = False + + # Cancel reader task if still running + if self._reader_task and not self._reader_task.done(): + self._reader_task.cancel() + try: + await self._reader_task + except asyncio.CancelledError: + pass