""" Claude Code subprocess management for Telegram bot. This module provides the ClaudeSubprocess class that maintains a persistent Claude Code CLI subprocess using stream-json I/O for efficient multi-turn conversations without respawning per turn. The persistent subprocess accepts NDJSON messages on stdin, emits stream-json events on stdout, and maintains conversation context across turns. Stdout and stderr are read concurrently via asyncio.gather to prevent pipe buffer deadlocks. Key features: - Persistent subprocess with stream-json stdin/stdout (eliminates ~1s spawn overhead) - Concurrent stdout/stderr reading (no pipe deadlock) - Clean process termination (no zombie processes) - Message queuing during processing - Automatic crash recovery with --continue flag - Stream-json event routing to callbacks (including tool_use events) Based on research in: .planning/phases/02-telegram-integration/02-RESEARCH.md """ import asyncio import inspect import json import logging import os import time from pathlib import Path from typing import Callable, Optional logger = logging.getLogger(__name__) class ClaudeSubprocess: """ Manages a persistent Claude Code CLI subprocess with stream-json I/O. Spawns Claude Code once and maintains the process across multiple turns, accepting NDJSON messages on stdin. This eliminates ~1s spawn overhead per message and preserves conversation context. Example: sub = ClaudeSubprocess( session_dir=Path("/home/mikkel/telegram/sessions/my-session"), persona={"system_prompt": "You are a helpful assistant"}, on_output=lambda text: print(f"Claude: {text}"), on_error=lambda err: print(f"Error: {err}"), on_complete=lambda: print("Turn complete"), on_status=lambda status: print(f"Status: {status}"), on_tool_use=lambda name, inp: print(f"Tool: {name} {inp}") ) await sub.start() await sub.send_message("Hello, Claude!") await sub.send_message("What's the weather?") # ... later ... await sub.terminate() """ MAX_CRASH_RETRIES = 3 CRASH_BACKOFF_SECONDS = 1 def __init__( self, session_dir: Path, persona: Optional[dict] = None, on_output: Optional[Callable[[str], None]] = None, on_error: Optional[Callable[[str], None]] = None, on_complete: Optional[Callable[[], None]] = None, on_status: Optional[Callable[[str], None]] = None, on_tool_use: Optional[Callable[[str, dict], None]] = None, ): """ Initialize ClaudeSubprocess. Args: session_dir: Path to session directory (cwd for subprocess) persona: Persona dict with system_prompt and settings (model, max_turns) on_output: Callback(text: str) for assistant text output on_error: Callback(error: str) for error messages on_complete: Callback() when a turn completes on_status: Callback(status: str) for status updates (e.g. "Claude restarted") on_tool_use: Callback(tool_name: str, tool_input: dict) for tool call progress """ self._session_dir = Path(session_dir) self._persona = persona or {} self.on_output = on_output self.on_error = on_error self.on_complete = on_complete self.on_status = on_status self.on_tool_use = on_tool_use # Process state self._process: Optional[asyncio.subprocess.Process] = None self._busy = False self._message_queue: asyncio.Queue = asyncio.Queue() self._stdout_reader_task: Optional[asyncio.Task] = None self._stderr_reader_task: Optional[asyncio.Task] = None self._crash_count = 0 logger.debug( f"ClaudeSubprocess initialized: session_dir={session_dir}, " f"persona={persona.get('system_prompt', 'none')[:50] if persona else 'none'}" ) @property def is_busy(self) -> bool: """Return whether subprocess is currently processing a message.""" return self._busy @property def is_alive(self) -> bool: """Return whether subprocess process is running.""" return self._process is not None and self._process.returncode is None @property def pid(self) -> Optional[int]: """ Return process ID of running subprocess. Returns: PID if process is running, None otherwise """ return self._process.pid if self._process and self._process.returncode is None else None async def start(self) -> None: """ Start the persistent Claude Code subprocess. Spawns process with stream-json I/O and launches background readers. Must be called before send_message(). """ if self.is_alive: logger.warning("Subprocess already running") return self._spawn_time = time.monotonic() self._first_output_time = None # Build command for persistent process cmd = [ "claude", "-p", "--input-format", "stream-json", "--output-format", "stream-json", "--verbose", "--dangerously-skip-permissions", ] # Add --continue if prior session exists if (self._session_dir / ".claude").exists(): cmd.append("--continue") logger.debug("Using --continue flag (found existing .claude/ directory)") # Add persona settings (model FIRST, then system prompt) if self._persona: settings = self._persona.get("settings", {}) if "model" in settings: cmd.extend(["--model", settings["model"]]) if "max_turns" in settings: cmd.extend(["--max-turns", str(settings["max_turns"])]) if "system_prompt" in self._persona: cmd.extend(["--append-system-prompt", self._persona["system_prompt"]]) # Prepare environment env = os.environ.copy() # Ensure PATH includes ~/.local/bin and ~/bin (for claude CLI) path_parts = env.get("PATH", "").split(":") home_bin = str(Path.home() / "bin") local_bin = str(Path.home() / ".local" / "bin") if home_bin not in path_parts: path_parts.insert(0, home_bin) if local_bin not in path_parts: path_parts.insert(0, local_bin) env["PATH"] = ":".join(path_parts) # Ensure session directory exists self._session_dir.mkdir(parents=True, exist_ok=True) # Log full command logger.info( f"[TIMING] Starting persistent subprocess: cwd={self._session_dir.name}, " f"cmd={' '.join(cmd)}" ) try: # Spawn subprocess (10MB stdout limit for large stream-json lines e.g. image tool results) self._process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(self._session_dir), env=env, limit=10 * 1024 * 1024, ) elapsed = time.monotonic() - self._spawn_time logger.info(f"[TIMING] Persistent subprocess started: PID={self._process.pid} (+{elapsed:.3f}s)") # Launch background readers (persistent, not per-turn) self._stdout_reader_task = asyncio.create_task(self._read_stdout()) self._stderr_reader_task = asyncio.create_task(self._read_stderr()) except Exception as e: logger.error(f"Failed to spawn Claude Code: {e}") if self.on_error: self.on_error(f"Failed to start Claude: {e}") async def send_message(self, message: str) -> None: """ Send a message to the persistent Claude Code subprocess. Writes NDJSON message to stdin. If process not started, starts it first. If busy, queues the message. Args: message: Message to send to Claude """ # Auto-start if not running if not self.is_alive: logger.debug("Process not running, starting it first") await self.start() if not self._process or not self._process.stdin: raise RuntimeError("Subprocess not running or stdin not available") # Queue if busy if self._busy: logger.debug(f"Queueing message (process busy): {message[:50]}...") await self._message_queue.put(message) return # Mark as busy self._busy = True self._send_time = time.monotonic() # Write NDJSON to stdin try: msg_dict = {"type": "user", "message": {"role": "user", "content": message}} ndjson_line = json.dumps(msg_dict) + '\n' self._process.stdin.write(ndjson_line.encode()) await self._process.stdin.drain() # CRITICAL: flush buffer logger.debug(f"Sent message to stdin: {message[:60]}...") except Exception as e: logger.error(f"Failed to send message to stdin: {e}") self._busy = False if self.on_error: self.on_error(f"Failed to send message: {e}") async def _read_stdout(self) -> None: """ Read stdout stream-json events persistently. Runs for the lifetime of the process, not per-turn. Exits only when process dies (readline returns empty bytes). """ if not self._process or not self._process.stdout: return first_line = True try: while True: line = await self._process.stdout.readline() if not line: # Process died logger.warning("Stdout stream ended (process died)") break line_str = line.decode().rstrip() if line_str: if first_line: elapsed = time.monotonic() - self._spawn_time logger.info(f"[TIMING] First stdout line: +{elapsed:.3f}s") first_line = False await self._handle_stdout_line(line_str) except Exception as e: logger.error(f"Error reading stdout: {e}") finally: # Process died unexpectedly - trigger crash recovery if self.is_alive and self._process.returncode is None: # Process still running but stdout closed - unusual logger.error("Stdout closed but process still running") elif self._process and self._process.returncode is not None and self._process.returncode > 0: # Process crashed await self._handle_crash() async def _read_stderr(self) -> None: """ Read stderr persistently. Runs for the lifetime of the process. Exits only when process dies. """ if not self._process or not self._process.stderr: return try: while True: line = await self._process.stderr.readline() if not line: # Process died break line_str = line.decode().rstrip() if line_str: self._handle_stderr_line(line_str) except Exception as e: logger.error(f"Error reading stderr: {e}") async def _handle_stdout_line(self, line: str) -> None: """ Parse and route stream-json events from stdout. Handles event types: - "assistant": Extract text blocks and call on_output - "content_block_start": Extract tool_use events and call on_tool_use - "content_block_delta": Handle tool input streaming (future) - "result": Turn complete, mark not busy, call on_complete - "system": System events, log and check for errors Args: line: Single line of stream-json output """ try: event = json.loads(line) event_type = event.get("type") if event_type == "assistant": # Extract text from assistant message message = event.get("message", {}) model = message.get("model", "unknown") logger.info(f"Assistant response model: {model}") content = message.get("content", []) for block in content: if block.get("type") == "text": text = block.get("text", "") if text and self.on_output: if not self._first_output_time: self._first_output_time = time.monotonic() elapsed = self._first_output_time - self._spawn_time logger.info(f"[TIMING] First assistant text: +{elapsed:.3f}s ({len(text)} chars)") try: if inspect.iscoroutinefunction(self.on_output): asyncio.create_task(self.on_output(text)) else: self.on_output(text) except Exception as e: logger.error(f"Error in on_output callback: {e}") elif event_type == "content_block_start": # Check for tool_use block content_block = event.get("content_block", {}) if content_block.get("type") == "tool_use": tool_name = content_block.get("name", "unknown") tool_input = content_block.get("input", {}) logger.debug(f"Tool use started: {tool_name} with input {tool_input}") if self.on_tool_use: try: if inspect.iscoroutinefunction(self.on_tool_use): asyncio.create_task(self.on_tool_use(tool_name, tool_input)) else: self.on_tool_use(tool_name, tool_input) except Exception as e: logger.error(f"Error in on_tool_use callback: {e}") elif event_type == "content_block_delta": # Tool input streaming (not needed for initial implementation) pass elif event_type == "result": # Turn complete - mark not busy and call on_complete if hasattr(self, '_send_time'): elapsed = time.monotonic() - self._send_time else: elapsed = time.monotonic() - self._spawn_time session_id = event.get("session_id") logger.info(f"[TIMING] Result event: +{elapsed:.3f}s (session={session_id})") # Check for error if event.get("is_error") and self.on_error: error_msg = event.get("error", "Unknown error") try: if inspect.iscoroutinefunction(self.on_error): asyncio.create_task(self.on_error(f"Claude error: {error_msg}")) else: self.on_error(f"Claude error: {error_msg}") except Exception as e: logger.error(f"Error in on_error callback: {e}") # Mark not busy self._busy = False # Call completion callback if self.on_complete: try: if inspect.iscoroutinefunction(self.on_complete): asyncio.create_task(self.on_complete()) else: self.on_complete() except Exception as e: logger.error(f"Error in on_complete callback: {e}") # Process queued messages if not self._message_queue.empty(): next_message = await self._message_queue.get() logger.debug(f"Processing queued message: {next_message[:50]}...") await self.send_message(next_message) elif event_type == "system": # System event subtype = event.get("subtype") logger.debug(f"System event: subtype={subtype}") # Check for error subtype if subtype == "error" and self.on_error: error_msg = event.get("message", "System error") try: if inspect.iscoroutinefunction(self.on_error): asyncio.create_task(self.on_error(f"System error: {error_msg}")) else: self.on_error(f"System error: {error_msg}") except Exception as e: logger.error(f"Error in on_error callback: {e}") else: logger.debug(f"Unknown event type: {event_type}") except json.JSONDecodeError: # Non-JSON line (Claude Code may emit diagnostics) logger.warning(f"Non-JSON stdout line: {line[:100]}") def _handle_stderr_line(self, line: str) -> None: """ Handle stderr output from Claude Code. Logs as warning. If line contains "error" (case-insensitive), also calls on_error callback. Args: line: Single line of stderr output """ logger.warning(f"Claude Code stderr: {line}") # If line contains error, notify via callback if "error" in line.lower() and self.on_error: try: if inspect.iscoroutinefunction(self.on_error): asyncio.create_task(self.on_error(f"Claude stderr: {line}")) else: self.on_error(f"Claude stderr: {line}") except Exception as e: logger.error(f"Error in on_error callback: {e}") async def _handle_crash(self) -> None: """ Handle Claude Code crash (non-zero exit). Attempts to restart persistent process with --continue flag up to MAX_CRASH_RETRIES times. Notifies user via on_status callback. """ self._crash_count += 1 logger.error( f"Claude Code crashed (attempt {self._crash_count}/{self.MAX_CRASH_RETRIES})" ) if self._crash_count >= self.MAX_CRASH_RETRIES: error_msg = f"Claude failed to restart after {self.MAX_CRASH_RETRIES} attempts" logger.error(error_msg) if self.on_error: if inspect.iscoroutinefunction(self.on_error): asyncio.create_task(self.on_error(error_msg)) else: self.on_error(error_msg) self._crash_count = 0 # Reset for next session self._busy = False return # Notify user if self.on_status: try: if inspect.iscoroutinefunction(self.on_status): asyncio.create_task(self.on_status("Claude crashed, restarting with context preserved...")) else: self.on_status("Claude crashed, restarting with context preserved...") except Exception as e: logger.error(f"Error in on_status callback: {e}") # Wait before retrying await asyncio.sleep(self.CRASH_BACKOFF_SECONDS) # Restart persistent process try: await self.start() # Resend queued messages if any if not self._message_queue.empty(): next_message = await self._message_queue.get() logger.debug(f"Resending queued message after crash: {next_message[:50]}...") await self.send_message(next_message) except Exception as e: logger.error(f"Failed to restart after crash: {e}") if self.on_error: self.on_error(f"Failed to restart Claude: {e}") self._busy = False async def terminate(self, timeout: int = 10) -> None: """ Terminate persistent subprocess gracefully. Closes stdin, sends SIGTERM, waits for clean exit with timeout, then SIGKILL if needed. Always reaps process to prevent zombies. Args: timeout: Seconds to wait for graceful termination before SIGKILL """ if not self._process or self._process.returncode is not None: logger.debug("No process to terminate or already terminated") return logger.info(f"Terminating Claude Code process: PID={self._process.pid}") try: # Close stdin to signal end of input if self._process.stdin: self._process.stdin.close() await self._process.stdin.wait_closed() # Send SIGTERM self._process.terminate() # Wait for graceful exit with timeout try: await asyncio.wait_for(self._process.wait(), timeout=timeout) logger.info("Claude Code terminated gracefully") except asyncio.TimeoutError: # Timeout - force kill logger.warning( f"Claude Code did not terminate within {timeout}s, sending SIGKILL" ) self._process.kill() await self._process.wait() # CRITICAL: Always reap to prevent zombie logger.info("Claude Code killed") except Exception as e: logger.error(f"Error terminating process: {e}") finally: # Clear process reference self._process = None self._busy = False # Cancel reader tasks if still running for task in [self._stdout_reader_task, self._stderr_reader_task]: if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass