""" Claude Code subprocess management for Telegram bot. This module provides the ClaudeSubprocess class that safely spawns, communicates with, and manages Claude Code CLI processes using asyncio. It handles the dangerous parts: pipe management without deadlocks, process lifecycle without zombies, message queueing during processing, and crash recovery with session resumption. The subprocess runs in a session directory and outputs stream-json format for structured, line-by-line parsing. Stdout and stderr are read concurrently via asyncio.gather to prevent pipe buffer deadlocks. Key features: - Concurrent stdout/stderr reading (no pipe deadlock) - Clean process termination (no zombie processes) - Message queuing during processing - Automatic crash recovery with --continue flag - Stream-json event routing to callbacks Based on research in: .planning/phases/01-session-process-foundation/01-RESEARCH.md """ import asyncio import inspect import json import logging import os from pathlib import Path from typing import Callable, Optional logger = logging.getLogger(__name__) class ClaudeSubprocess: """ Manages a single Claude Code CLI subprocess. Spawns Claude Code in a session directory, handles I/O via pipes with concurrent stream reading, queues messages during processing, and auto-restarts on crash with context preservation. Example: sub = ClaudeSubprocess( session_dir=Path("/home/mikkel/telegram/sessions/my-session"), persona={"system_prompt": "You are a helpful assistant"}, on_output=lambda text: print(f"Claude: {text}"), on_error=lambda err: print(f"Error: {err}"), on_complete=lambda: print("Turn complete"), on_status=lambda status: print(f"Status: {status}") ) await sub.send_message("Hello, Claude!") # ... later ... await sub.terminate() """ MAX_CRASH_RETRIES = 3 CRASH_BACKOFF_SECONDS = 1 def __init__( self, session_dir: Path, persona: Optional[dict] = None, on_output: Optional[Callable[[str], None]] = None, on_error: Optional[Callable[[str], None]] = None, on_complete: Optional[Callable[[], None]] = None, on_status: Optional[Callable[[str], None]] = None, ): """ Initialize ClaudeSubprocess. Args: session_dir: Path to session directory (cwd for subprocess) persona: Persona dict with system_prompt and settings (model, max_turns) on_output: Callback(text: str) for assistant text output on_error: Callback(error: str) for error messages on_complete: Callback() when a turn completes on_status: Callback(status: str) for status updates (e.g. "Claude restarted") """ self._session_dir = Path(session_dir) self._persona = persona or {} self.on_output = on_output self.on_error = on_error self.on_complete = on_complete self.on_status = on_status # Process state self._process: Optional[asyncio.subprocess.Process] = None self._busy = False self._message_queue: asyncio.Queue = asyncio.Queue() self._reader_task: Optional[asyncio.Task] = None self._crash_count = 0 logger.debug( f"ClaudeSubprocess initialized: session_dir={session_dir}, " f"persona={persona.get('system_prompt', 'none')[:50] if persona else 'none'}" ) @property def is_busy(self) -> bool: """Return whether subprocess is currently processing a message.""" return self._busy @property def is_alive(self) -> bool: """Return whether subprocess process is running.""" return self._process is not None and self._process.returncode is None async def send_message(self, message: str) -> None: """ Send a message to Claude Code. If no process is running: spawn one with this message. If process IS running and BUSY: queue message. If process IS running and IDLE: send as new turn (spawn new invocation). Args: message: Message to send to Claude """ if not self.is_alive: # No process running - spawn new one await self._spawn(message) elif self._busy: # Process is busy - queue message logger.debug(f"Queueing message (process busy): {message[:50]}...") await self._message_queue.put(message) else: # Process is idle - spawn new turn # Note: For simplicity, we spawn a fresh process per turn # Phase 2+ might use --input-format stream-json for live piping await self._spawn(message) async def _spawn(self, message: str) -> None: """ Spawn Claude Code subprocess with a message. Builds command with persona settings, spawns process with pipes, and launches concurrent stream readers. Args: message: Initial message to send """ # Build command cmd = [ "claude", "-p", message, "--output-format", "stream-json", "--verbose", ] # Add persona settings if self._persona: if "system_prompt" in self._persona: cmd.extend(["--system-prompt", self._persona["system_prompt"]]) if "max_turns" in self._persona: cmd.extend(["--max-turns", str(self._persona["max_turns"])]) if "model" in self._persona: cmd.extend(["--model", self._persona["model"]]) # Add --continue if prior session exists if (self._session_dir / ".claude").exists(): cmd.append("--continue") logger.debug(f"Using --continue flag (found existing .claude/ directory)") # Prepare environment env = os.environ.copy() # Ensure PATH includes ~/.local/bin and ~/bin (for claude CLI) path_parts = env.get("PATH", "").split(":") home_bin = str(Path.home() / "bin") local_bin = str(Path.home() / ".local" / "bin") if home_bin not in path_parts: path_parts.insert(0, home_bin) if local_bin not in path_parts: path_parts.insert(0, local_bin) env["PATH"] = ":".join(path_parts) # Ensure session directory exists self._session_dir.mkdir(parents=True, exist_ok=True) logger.info( f"Spawning Claude Code: cwd={self._session_dir}, " f"cmd={' '.join(cmd[:4])}... (truncated)" ) try: # Spawn subprocess self._process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(self._session_dir), env=env, ) self._busy = True logger.info(f"Claude Code spawned: PID={self._process.pid}") # Launch concurrent stream readers self._reader_task = asyncio.create_task(self._read_streams()) except Exception as e: logger.error(f"Failed to spawn Claude Code: {e}") if self.on_error: self.on_error(f"Failed to start Claude: {e}") self._busy = False async def _read_streams(self) -> None: """ Read stdout and stderr concurrently until both complete. This is CRITICAL for deadlock prevention. Both streams are read concurrently using asyncio.gather() to prevent pipe buffer overflow. After streams end, waits for process termination, marks as not busy, calls completion callback, and processes any queued messages. """ if not self._process: return try: # Read both streams concurrently (CRITICAL: prevents deadlock) await asyncio.gather( self._read_stdout(), self._read_stderr(), return_exceptions=True, ) # Wait for process to terminate returncode = await self._process.wait() logger.info(f"Claude Code process exited: returncode={returncode}") # Handle crash (non-zero exit) if returncode != 0: await self._handle_crash() return # Reset crash count on successful completion self._crash_count = 0 except Exception as e: logger.error(f"Error reading streams: {e}") if self.on_error: self.on_error(f"Stream reading error: {e}") finally: # Mark as not busy self._busy = False # Call completion callback if self.on_complete: try: if inspect.iscoroutinefunction(self.on_complete): asyncio.create_task(self.on_complete()) else: self.on_complete() except Exception as e: logger.error(f"Error in on_complete callback: {e}") # Process queued messages if not self._message_queue.empty(): next_message = await self._message_queue.get() logger.debug(f"Processing queued message: {next_message[:50]}...") await self.send_message(next_message) async def _read_stdout(self) -> None: """Read stdout line-by-line and handle each line.""" if not self._process or not self._process.stdout: return try: while True: line = await self._process.stdout.readline() if not line: break line_str = line.decode().rstrip() if line_str: self._handle_stdout_line(line_str) except Exception as e: logger.error(f"Error reading stdout: {e}") async def _read_stderr(self) -> None: """Read stderr line-by-line and handle each line.""" if not self._process or not self._process.stderr: return try: while True: line = await self._process.stderr.readline() if not line: break line_str = line.decode().rstrip() if line_str: self._handle_stderr_line(line_str) except Exception as e: logger.error(f"Error reading stderr: {e}") def _handle_stdout_line(self, line: str) -> None: """ Parse and route stream-json events from stdout. Handles three event types: - "assistant": Extract text blocks and call on_output - "result": Turn complete, check for errors - "system": System events, log and check for errors Args: line: Single line of stream-json output """ try: event = json.loads(line) event_type = event.get("type") if event_type == "assistant": # Extract text from assistant message message = event.get("message", {}) content = message.get("content", []) for block in content: if block.get("type") == "text": text = block.get("text", "") if text and self.on_output: try: if inspect.iscoroutinefunction(self.on_output): asyncio.create_task(self.on_output(text)) else: self.on_output(text) except Exception as e: logger.error(f"Error in on_output callback: {e}") elif event_type == "result": # Turn complete session_id = event.get("session_id") if session_id: logger.debug(f"Turn complete: session_id={session_id}") # Check for error if event.get("is_error") and self.on_error: error_msg = event.get("error", "Unknown error") try: if inspect.iscoroutinefunction(self.on_error): asyncio.create_task(self.on_error(f"Claude error: {error_msg}")) else: self.on_error(f"Claude error: {error_msg}") except Exception as e: logger.error(f"Error in on_error callback: {e}") elif event_type == "system": # System event subtype = event.get("subtype") logger.debug(f"System event: subtype={subtype}") # Check for error subtype if subtype == "error" and self.on_error: error_msg = event.get("message", "System error") try: if inspect.iscoroutinefunction(self.on_error): asyncio.create_task(self.on_error(f"System error: {error_msg}")) else: self.on_error(f"System error: {error_msg}") except Exception as e: logger.error(f"Error in on_error callback: {e}") else: logger.debug(f"Unknown event type: {event_type}") except json.JSONDecodeError: # Non-JSON line (Claude Code may emit diagnostics) logger.warning(f"Non-JSON stdout line: {line[:100]}") def _handle_stderr_line(self, line: str) -> None: """ Handle stderr output from Claude Code. Logs as warning. If line contains "error" (case-insensitive), also calls on_error callback. Args: line: Single line of stderr output """ logger.warning(f"Claude Code stderr: {line}") # If line contains error, notify via callback if "error" in line.lower() and self.on_error: try: if inspect.iscoroutinefunction(self.on_error): asyncio.create_task(self.on_error(f"Claude stderr: {line}")) else: self.on_error(f"Claude stderr: {line}") except Exception as e: logger.error(f"Error in on_error callback: {e}") async def _handle_crash(self) -> None: """ Handle Claude Code crash (non-zero exit). Attempts to restart with --continue flag up to MAX_CRASH_RETRIES times. Notifies user via on_status callback. """ self._crash_count += 1 logger.error( f"Claude Code crashed (attempt {self._crash_count}/{self.MAX_CRASH_RETRIES})" ) if self._crash_count >= self.MAX_CRASH_RETRIES: error_msg = f"Claude failed to restart after {self.MAX_CRASH_RETRIES} attempts" logger.error(error_msg) if self.on_error: self.on_error(error_msg) self._crash_count = 0 # Reset for next session return # Notify user if self.on_status: try: if inspect.iscoroutinefunction(self.on_status): asyncio.create_task(self.on_status("Claude crashed, restarting with context preserved...")) else: self.on_status("Claude crashed, restarting with context preserved...") except Exception as e: logger.error(f"Error in on_status callback: {e}") # Wait before retrying await asyncio.sleep(self.CRASH_BACKOFF_SECONDS) # Respawn with --continue flag (loads most recent session) try: # Get next queued message or use placeholder if not self._message_queue.empty(): next_message = await self._message_queue.get() else: next_message = "continue" # Minimal message to resume await self._spawn(next_message) except Exception as e: logger.error(f"Failed to respawn after crash: {e}") if self.on_error: self.on_error(f"Failed to restart Claude: {e}") async def terminate(self, timeout: int = 10) -> None: """ Terminate subprocess gracefully. Sends SIGTERM, waits for clean exit with timeout, then SIGKILL if needed. Always reaps process to prevent zombies. Args: timeout: Seconds to wait for graceful termination before SIGKILL """ if not self._process or self._process.returncode is not None: logger.debug("No process to terminate or already terminated") return logger.info(f"Terminating Claude Code process: PID={self._process.pid}") try: # Send SIGTERM self._process.terminate() # Wait for graceful exit with timeout try: await asyncio.wait_for(self._process.wait(), timeout=timeout) logger.info("Claude Code terminated gracefully") except asyncio.TimeoutError: # Timeout - force kill logger.warning( f"Claude Code did not terminate within {timeout}s, sending SIGKILL" ) self._process.kill() await self._process.wait() # CRITICAL: Always reap to prevent zombie logger.info("Claude Code killed") except Exception as e: logger.error(f"Error terminating process: {e}") finally: # Clear process reference self._process = None self._busy = False # Cancel reader task if still running if self._reader_task and not self._reader_task.done(): self._reader_task.cancel() try: await self._reader_task except asyncio.CancelledError: pass