From 6a115a4947939f4a512b11e0741938db4efff98f Mon Sep 17 00:00:00 2001 From: Mikkel Georgsen Date: Wed, 4 Feb 2026 19:15:46 +0000 Subject: [PATCH] refactor(02-01): persistent subprocess with stream-json I/O - Replace fresh-process-per-turn with persistent subprocess model - Accept NDJSON messages on stdin via --input-format stream-json - Emit stream-json events on stdout including tool_use events - Add on_tool_use callback for progress notifications - Persistent stdout/stderr readers run for process lifetime - Result events mark not busy but don't exit reader loop - stdin.write + drain pattern prevents pipe buffer deadlock - Auto-start process if not running when send_message called - Crash recovery restarts persistent process with --continue --- telegram/claude_subprocess.py | 315 +++++++++++++++++++--------------- 1 file changed, 177 insertions(+), 138 deletions(-) diff --git a/telegram/claude_subprocess.py b/telegram/claude_subprocess.py index 4edf804..66225cd 100644 --- a/telegram/claude_subprocess.py +++ b/telegram/claude_subprocess.py @@ -1,23 +1,23 @@ """ Claude Code subprocess management for Telegram bot. -This module provides the ClaudeSubprocess class that safely spawns, communicates -with, and manages Claude Code CLI processes using asyncio. It handles the dangerous -parts: pipe management without deadlocks, process lifecycle without zombies, -message queueing during processing, and crash recovery with session resumption. +This module provides the ClaudeSubprocess class that maintains a persistent +Claude Code CLI subprocess using stream-json I/O for efficient multi-turn +conversations without respawning per turn. -The subprocess runs in a session directory and outputs stream-json format for -structured, line-by-line parsing. Stdout and stderr are read concurrently via -asyncio.gather to prevent pipe buffer deadlocks. +The persistent subprocess accepts NDJSON messages on stdin, emits stream-json +events on stdout, and maintains conversation context across turns. Stdout and +stderr are read concurrently via asyncio.gather to prevent pipe buffer deadlocks. Key features: +- Persistent subprocess with stream-json stdin/stdout (eliminates ~1s spawn overhead) - Concurrent stdout/stderr reading (no pipe deadlock) - Clean process termination (no zombie processes) - Message queuing during processing - Automatic crash recovery with --continue flag -- Stream-json event routing to callbacks +- Stream-json event routing to callbacks (including tool_use events) -Based on research in: .planning/phases/01-session-process-foundation/01-RESEARCH.md +Based on research in: .planning/phases/02-telegram-integration/02-RESEARCH.md """ import asyncio @@ -34,11 +34,11 @@ logger = logging.getLogger(__name__) class ClaudeSubprocess: """ - Manages a single Claude Code CLI subprocess. + Manages a persistent Claude Code CLI subprocess with stream-json I/O. - Spawns Claude Code in a session directory, handles I/O via pipes with - concurrent stream reading, queues messages during processing, and auto-restarts - on crash with context preservation. + Spawns Claude Code once and maintains the process across multiple turns, + accepting NDJSON messages on stdin. This eliminates ~1s spawn overhead per + message and preserves conversation context. Example: sub = ClaudeSubprocess( @@ -47,9 +47,12 @@ class ClaudeSubprocess: on_output=lambda text: print(f"Claude: {text}"), on_error=lambda err: print(f"Error: {err}"), on_complete=lambda: print("Turn complete"), - on_status=lambda status: print(f"Status: {status}") + on_status=lambda status: print(f"Status: {status}"), + on_tool_use=lambda name, inp: print(f"Tool: {name} {inp}") ) + await sub.start() await sub.send_message("Hello, Claude!") + await sub.send_message("What's the weather?") # ... later ... await sub.terminate() """ @@ -65,6 +68,7 @@ class ClaudeSubprocess: on_error: Optional[Callable[[str], None]] = None, on_complete: Optional[Callable[[], None]] = None, on_status: Optional[Callable[[str], None]] = None, + on_tool_use: Optional[Callable[[str, dict], None]] = None, ): """ Initialize ClaudeSubprocess. @@ -76,6 +80,7 @@ class ClaudeSubprocess: on_error: Callback(error: str) for error messages on_complete: Callback() when a turn completes on_status: Callback(status: str) for status updates (e.g. "Claude restarted") + on_tool_use: Callback(tool_name: str, tool_input: dict) for tool call progress """ self._session_dir = Path(session_dir) self._persona = persona or {} @@ -83,12 +88,14 @@ class ClaudeSubprocess: self.on_error = on_error self.on_complete = on_complete self.on_status = on_status + self.on_tool_use = on_tool_use # Process state self._process: Optional[asyncio.subprocess.Process] = None self._busy = False self._message_queue: asyncio.Queue = asyncio.Queue() - self._reader_task: Optional[asyncio.Task] = None + self._stdout_reader_task: Optional[asyncio.Task] = None + self._stderr_reader_task: Optional[asyncio.Task] = None self._crash_count = 0 logger.debug( @@ -106,53 +113,34 @@ class ClaudeSubprocess: """Return whether subprocess process is running.""" return self._process is not None and self._process.returncode is None - async def send_message(self, message: str) -> None: + async def start(self) -> None: """ - Send a message to Claude Code. + Start the persistent Claude Code subprocess. - If no process is running: spawn one with this message. - If process IS running and BUSY: queue message. - If process IS running and IDLE: send as new turn (spawn new invocation). - - Args: - message: Message to send to Claude + Spawns process with stream-json I/O and launches background readers. + Must be called before send_message(). """ - if not self.is_alive: - # No process running - spawn new one - await self._spawn(message) - elif self._busy: - # Process is busy - queue message - logger.debug(f"Queueing message (process busy): {message[:50]}...") - await self._message_queue.put(message) - else: - # Process is idle - spawn new turn - # Note: For simplicity, we spawn a fresh process per turn - # Phase 2+ might use --input-format stream-json for live piping - await self._spawn(message) + if self.is_alive: + logger.warning("Subprocess already running") + return - async def _spawn(self, message: str) -> None: - """ - Spawn Claude Code subprocess with a message. - - Builds command with persona settings, spawns process with pipes, - and launches concurrent stream readers. - - Args: - message: Initial message to send - """ self._spawn_time = time.monotonic() self._first_output_time = None - # Build command + # Build command for persistent process cmd = [ "claude", "-p", - message, - "--output-format", - "stream-json", + "--input-format", "stream-json", + "--output-format", "stream-json", "--verbose", ] + # Add --continue if prior session exists + if (self._session_dir / ".claude").exists(): + cmd.append("--continue") + logger.debug("Using --continue flag (found existing .claude/ directory)") + # Add persona settings if self._persona: if "system_prompt" in self._persona: @@ -163,11 +151,6 @@ class ClaudeSubprocess: if "model" in settings: cmd.extend(["--model", settings["model"]]) - # Add --continue if prior session exists - if (self._session_dir / ".claude").exists(): - cmd.append("--continue") - logger.debug(f"Using --continue flag (found existing .claude/ directory)") - # Prepare environment env = os.environ.copy() # Ensure PATH includes ~/.local/bin and ~/bin (for claude CLI) @@ -183,99 +166,85 @@ class ClaudeSubprocess: # Ensure session directory exists self._session_dir.mkdir(parents=True, exist_ok=True) - # Log full command (with model) for debugging + # Log command cmd_flags = [c for c in cmd if c.startswith("--")] logger.info( - f"[TIMING] Spawning: cwd={self._session_dir.name}, " - f"flags={' '.join(cmd_flags)}, msg={message[:60]}..." + f"[TIMING] Starting persistent subprocess: cwd={self._session_dir.name}, " + f"flags={' '.join(cmd_flags)}" ) try: # Spawn subprocess self._process = await asyncio.create_subprocess_exec( *cmd, + stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(self._session_dir), env=env, ) - self._busy = True elapsed = time.monotonic() - self._spawn_time - logger.info(f"[TIMING] Process spawned: PID={self._process.pid} (+{elapsed:.3f}s)") + logger.info(f"[TIMING] Persistent subprocess started: PID={self._process.pid} (+{elapsed:.3f}s)") - # Launch concurrent stream readers - self._reader_task = asyncio.create_task(self._read_streams()) + # Launch background readers (persistent, not per-turn) + self._stdout_reader_task = asyncio.create_task(self._read_stdout()) + self._stderr_reader_task = asyncio.create_task(self._read_stderr()) except Exception as e: logger.error(f"Failed to spawn Claude Code: {e}") if self.on_error: self.on_error(f"Failed to start Claude: {e}") - self._busy = False - async def _read_streams(self) -> None: + async def send_message(self, message: str) -> None: """ - Read stdout and stderr concurrently until both complete. + Send a message to the persistent Claude Code subprocess. - This is CRITICAL for deadlock prevention. Both streams are read - concurrently using asyncio.gather() to prevent pipe buffer overflow. + Writes NDJSON message to stdin. If process not started, starts it first. + If busy, queues the message. - After streams end, waits for process termination, marks as not busy, - calls completion callback, and processes any queued messages. + Args: + message: Message to send to Claude """ - if not self._process: + # Auto-start if not running + if not self.is_alive: + logger.debug("Process not running, starting it first") + await self.start() + + if not self._process or not self._process.stdin: + raise RuntimeError("Subprocess not running or stdin not available") + + # Queue if busy + if self._busy: + logger.debug(f"Queueing message (process busy): {message[:50]}...") + await self._message_queue.put(message) return + # Mark as busy + self._busy = True + self._send_time = time.monotonic() + + # Write NDJSON to stdin try: - # Read both streams concurrently (CRITICAL: prevents deadlock) - await asyncio.gather( - self._read_stdout(), - self._read_stderr(), - return_exceptions=True, - ) - - # Wait for process to terminate - returncode = await self._process.wait() - elapsed = time.monotonic() - self._spawn_time - logger.info(f"[TIMING] Process exited: returncode={returncode}, total={elapsed:.3f}s") - - # Handle crash (non-zero exit, but not signals from our own termination) - # Negative return codes are signals (e.g., -15 = SIGTERM, -9 = SIGKILL) - # Only auto-restart on genuine crashes (positive non-zero exit codes) - if returncode is not None and returncode > 0: - await self._handle_crash() - return - - # Reset crash count on successful completion - self._crash_count = 0 - + msg_dict = {"type": "user", "content": message} + ndjson_line = json.dumps(msg_dict) + '\n' + self._process.stdin.write(ndjson_line.encode()) + await self._process.stdin.drain() # CRITICAL: flush buffer + logger.debug(f"Sent message to stdin: {message[:60]}...") except Exception as e: - logger.error(f"Error reading streams: {e}") - if self.on_error: - self.on_error(f"Stream reading error: {e}") - - finally: - # Mark as not busy + logger.error(f"Failed to send message to stdin: {e}") self._busy = False + if self.on_error: + self.on_error(f"Failed to send message: {e}") - # Call completion callback - if self.on_complete: - try: - if inspect.iscoroutinefunction(self.on_complete): - asyncio.create_task(self.on_complete()) - else: - self.on_complete() - except Exception as e: - logger.error(f"Error in on_complete callback: {e}") - - # Process queued messages - if not self._message_queue.empty(): - next_message = await self._message_queue.get() - logger.debug(f"Processing queued message: {next_message[:50]}...") - await self.send_message(next_message) async def _read_stdout(self) -> None: - """Read stdout line-by-line and handle each line.""" + """ + Read stdout stream-json events persistently. + + Runs for the lifetime of the process, not per-turn. Exits only when + process dies (readline returns empty bytes). + """ if not self._process or not self._process.stdout: return @@ -284,19 +253,34 @@ class ClaudeSubprocess: while True: line = await self._process.stdout.readline() if not line: + # Process died + logger.warning("Stdout stream ended (process died)") break + line_str = line.decode().rstrip() if line_str: if first_line: elapsed = time.monotonic() - self._spawn_time logger.info(f"[TIMING] First stdout line: +{elapsed:.3f}s") first_line = False - self._handle_stdout_line(line_str) + await self._handle_stdout_line(line_str) except Exception as e: logger.error(f"Error reading stdout: {e}") + finally: + # Process died unexpectedly - trigger crash recovery + if self.is_alive and self._process.returncode is None: + # Process still running but stdout closed - unusual + logger.error("Stdout closed but process still running") + elif self._process and self._process.returncode is not None and self._process.returncode > 0: + # Process crashed + await self._handle_crash() async def _read_stderr(self) -> None: - """Read stderr line-by-line and handle each line.""" + """ + Read stderr persistently. + + Runs for the lifetime of the process. Exits only when process dies. + """ if not self._process or not self._process.stderr: return @@ -304,6 +288,7 @@ class ClaudeSubprocess: while True: line = await self._process.stderr.readline() if not line: + # Process died break line_str = line.decode().rstrip() if line_str: @@ -311,13 +296,15 @@ class ClaudeSubprocess: except Exception as e: logger.error(f"Error reading stderr: {e}") - def _handle_stdout_line(self, line: str) -> None: + async def _handle_stdout_line(self, line: str) -> None: """ Parse and route stream-json events from stdout. - Handles three event types: + Handles event types: - "assistant": Extract text blocks and call on_output - - "result": Turn complete, check for errors + - "content_block_start": Extract tool_use events and call on_tool_use + - "content_block_delta": Handle tool input streaming (future) + - "result": Turn complete, mark not busy, call on_complete - "system": System events, log and check for errors Args: @@ -347,9 +334,33 @@ class ClaudeSubprocess: except Exception as e: logger.error(f"Error in on_output callback: {e}") + elif event_type == "content_block_start": + # Check for tool_use block + content_block = event.get("content_block", {}) + if content_block.get("type") == "tool_use": + tool_name = content_block.get("name", "unknown") + tool_input = content_block.get("input", {}) + logger.debug(f"Tool use started: {tool_name} with input {tool_input}") + + if self.on_tool_use: + try: + if inspect.iscoroutinefunction(self.on_tool_use): + asyncio.create_task(self.on_tool_use(tool_name, tool_input)) + else: + self.on_tool_use(tool_name, tool_input) + except Exception as e: + logger.error(f"Error in on_tool_use callback: {e}") + + elif event_type == "content_block_delta": + # Tool input streaming (not needed for initial implementation) + pass + elif event_type == "result": - # Turn complete - elapsed = time.monotonic() - self._spawn_time + # Turn complete - mark not busy and call on_complete + if hasattr(self, '_send_time'): + elapsed = time.monotonic() - self._send_time + else: + elapsed = time.monotonic() - self._spawn_time session_id = event.get("session_id") logger.info(f"[TIMING] Result event: +{elapsed:.3f}s (session={session_id})") @@ -364,6 +375,25 @@ class ClaudeSubprocess: except Exception as e: logger.error(f"Error in on_error callback: {e}") + # Mark not busy + self._busy = False + + # Call completion callback + if self.on_complete: + try: + if inspect.iscoroutinefunction(self.on_complete): + asyncio.create_task(self.on_complete()) + else: + self.on_complete() + except Exception as e: + logger.error(f"Error in on_complete callback: {e}") + + # Process queued messages + if not self._message_queue.empty(): + next_message = await self._message_queue.get() + logger.debug(f"Processing queued message: {next_message[:50]}...") + await self.send_message(next_message) + elif event_type == "system": # System event subtype = event.get("subtype") @@ -413,8 +443,8 @@ class ClaudeSubprocess: """ Handle Claude Code crash (non-zero exit). - Attempts to restart with --continue flag up to MAX_CRASH_RETRIES times. - Notifies user via on_status callback. + Attempts to restart persistent process with --continue flag up to + MAX_CRASH_RETRIES times. Notifies user via on_status callback. """ self._crash_count += 1 logger.error( @@ -430,6 +460,7 @@ class ClaudeSubprocess: else: self.on_error(error_msg) self._crash_count = 0 # Reset for next session + self._busy = False return # Notify user @@ -445,26 +476,28 @@ class ClaudeSubprocess: # Wait before retrying await asyncio.sleep(self.CRASH_BACKOFF_SECONDS) - # Respawn with --continue flag (loads most recent session) + # Restart persistent process try: - # Get next queued message or use placeholder + await self.start() + + # Resend queued messages if any if not self._message_queue.empty(): next_message = await self._message_queue.get() - else: - next_message = "continue" # Minimal message to resume + logger.debug(f"Resending queued message after crash: {next_message[:50]}...") + await self.send_message(next_message) - await self._spawn(next_message) except Exception as e: - logger.error(f"Failed to respawn after crash: {e}") + logger.error(f"Failed to restart after crash: {e}") if self.on_error: self.on_error(f"Failed to restart Claude: {e}") + self._busy = False async def terminate(self, timeout: int = 10) -> None: """ - Terminate subprocess gracefully. + Terminate persistent subprocess gracefully. - Sends SIGTERM, waits for clean exit with timeout, then SIGKILL if needed. - Always reaps process to prevent zombies. + Closes stdin, sends SIGTERM, waits for clean exit with timeout, then + SIGKILL if needed. Always reaps process to prevent zombies. Args: timeout: Seconds to wait for graceful termination before SIGKILL @@ -476,6 +509,11 @@ class ClaudeSubprocess: logger.info(f"Terminating Claude Code process: PID={self._process.pid}") try: + # Close stdin to signal end of input + if self._process.stdin: + self._process.stdin.close() + await self._process.stdin.wait_closed() + # Send SIGTERM self._process.terminate() @@ -500,10 +538,11 @@ class ClaudeSubprocess: self._process = None self._busy = False - # Cancel reader task if still running - if self._reader_task and not self._reader_task.done(): - self._reader_task.cancel() - try: - await self._reader_task - except asyncio.CancelledError: - pass + # Cancel reader tasks if still running + for task in [self._stdout_reader_task, self._stderr_reader_task]: + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass