refactor(02-01): persistent subprocess with stream-json I/O

- Replace fresh-process-per-turn with persistent subprocess model
- Accept NDJSON messages on stdin via --input-format stream-json
- Emit stream-json events on stdout including tool_use events
- Add on_tool_use callback for progress notifications
- Persistent stdout/stderr readers run for process lifetime
- Result events mark not busy but don't exit reader loop
- stdin.write + drain pattern prevents pipe buffer deadlock
- Auto-start process if not running when send_message called
- Crash recovery restarts persistent process with --continue
This commit is contained in:
Mikkel Georgsen 2026-02-04 19:15:46 +00:00
parent 92318e79ab
commit 6a115a4947

View file

@ -1,23 +1,23 @@
""" """
Claude Code subprocess management for Telegram bot. Claude Code subprocess management for Telegram bot.
This module provides the ClaudeSubprocess class that safely spawns, communicates This module provides the ClaudeSubprocess class that maintains a persistent
with, and manages Claude Code CLI processes using asyncio. It handles the dangerous Claude Code CLI subprocess using stream-json I/O for efficient multi-turn
parts: pipe management without deadlocks, process lifecycle without zombies, conversations without respawning per turn.
message queueing during processing, and crash recovery with session resumption.
The subprocess runs in a session directory and outputs stream-json format for The persistent subprocess accepts NDJSON messages on stdin, emits stream-json
structured, line-by-line parsing. Stdout and stderr are read concurrently via events on stdout, and maintains conversation context across turns. Stdout and
asyncio.gather to prevent pipe buffer deadlocks. stderr are read concurrently via asyncio.gather to prevent pipe buffer deadlocks.
Key features: Key features:
- Persistent subprocess with stream-json stdin/stdout (eliminates ~1s spawn overhead)
- Concurrent stdout/stderr reading (no pipe deadlock) - Concurrent stdout/stderr reading (no pipe deadlock)
- Clean process termination (no zombie processes) - Clean process termination (no zombie processes)
- Message queuing during processing - Message queuing during processing
- Automatic crash recovery with --continue flag - Automatic crash recovery with --continue flag
- Stream-json event routing to callbacks - Stream-json event routing to callbacks (including tool_use events)
Based on research in: .planning/phases/01-session-process-foundation/01-RESEARCH.md Based on research in: .planning/phases/02-telegram-integration/02-RESEARCH.md
""" """
import asyncio import asyncio
@ -34,11 +34,11 @@ logger = logging.getLogger(__name__)
class ClaudeSubprocess: class ClaudeSubprocess:
""" """
Manages a single Claude Code CLI subprocess. Manages a persistent Claude Code CLI subprocess with stream-json I/O.
Spawns Claude Code in a session directory, handles I/O via pipes with Spawns Claude Code once and maintains the process across multiple turns,
concurrent stream reading, queues messages during processing, and auto-restarts accepting NDJSON messages on stdin. This eliminates ~1s spawn overhead per
on crash with context preservation. message and preserves conversation context.
Example: Example:
sub = ClaudeSubprocess( sub = ClaudeSubprocess(
@ -47,9 +47,12 @@ class ClaudeSubprocess:
on_output=lambda text: print(f"Claude: {text}"), on_output=lambda text: print(f"Claude: {text}"),
on_error=lambda err: print(f"Error: {err}"), on_error=lambda err: print(f"Error: {err}"),
on_complete=lambda: print("Turn complete"), on_complete=lambda: print("Turn complete"),
on_status=lambda status: print(f"Status: {status}") on_status=lambda status: print(f"Status: {status}"),
on_tool_use=lambda name, inp: print(f"Tool: {name} {inp}")
) )
await sub.start()
await sub.send_message("Hello, Claude!") await sub.send_message("Hello, Claude!")
await sub.send_message("What's the weather?")
# ... later ... # ... later ...
await sub.terminate() await sub.terminate()
""" """
@ -65,6 +68,7 @@ class ClaudeSubprocess:
on_error: Optional[Callable[[str], None]] = None, on_error: Optional[Callable[[str], None]] = None,
on_complete: Optional[Callable[[], None]] = None, on_complete: Optional[Callable[[], None]] = None,
on_status: Optional[Callable[[str], None]] = None, on_status: Optional[Callable[[str], None]] = None,
on_tool_use: Optional[Callable[[str, dict], None]] = None,
): ):
""" """
Initialize ClaudeSubprocess. Initialize ClaudeSubprocess.
@ -76,6 +80,7 @@ class ClaudeSubprocess:
on_error: Callback(error: str) for error messages on_error: Callback(error: str) for error messages
on_complete: Callback() when a turn completes on_complete: Callback() when a turn completes
on_status: Callback(status: str) for status updates (e.g. "Claude restarted") on_status: Callback(status: str) for status updates (e.g. "Claude restarted")
on_tool_use: Callback(tool_name: str, tool_input: dict) for tool call progress
""" """
self._session_dir = Path(session_dir) self._session_dir = Path(session_dir)
self._persona = persona or {} self._persona = persona or {}
@ -83,12 +88,14 @@ class ClaudeSubprocess:
self.on_error = on_error self.on_error = on_error
self.on_complete = on_complete self.on_complete = on_complete
self.on_status = on_status self.on_status = on_status
self.on_tool_use = on_tool_use
# Process state # Process state
self._process: Optional[asyncio.subprocess.Process] = None self._process: Optional[asyncio.subprocess.Process] = None
self._busy = False self._busy = False
self._message_queue: asyncio.Queue = asyncio.Queue() self._message_queue: asyncio.Queue = asyncio.Queue()
self._reader_task: Optional[asyncio.Task] = None self._stdout_reader_task: Optional[asyncio.Task] = None
self._stderr_reader_task: Optional[asyncio.Task] = None
self._crash_count = 0 self._crash_count = 0
logger.debug( logger.debug(
@ -106,53 +113,34 @@ class ClaudeSubprocess:
"""Return whether subprocess process is running.""" """Return whether subprocess process is running."""
return self._process is not None and self._process.returncode is None return self._process is not None and self._process.returncode is None
async def send_message(self, message: str) -> None: async def start(self) -> None:
""" """
Send a message to Claude Code. Start the persistent Claude Code subprocess.
If no process is running: spawn one with this message. Spawns process with stream-json I/O and launches background readers.
If process IS running and BUSY: queue message. Must be called before send_message().
If process IS running and IDLE: send as new turn (spawn new invocation).
Args:
message: Message to send to Claude
""" """
if not self.is_alive: if self.is_alive:
# No process running - spawn new one logger.warning("Subprocess already running")
await self._spawn(message) return
elif self._busy:
# Process is busy - queue message
logger.debug(f"Queueing message (process busy): {message[:50]}...")
await self._message_queue.put(message)
else:
# Process is idle - spawn new turn
# Note: For simplicity, we spawn a fresh process per turn
# Phase 2+ might use --input-format stream-json for live piping
await self._spawn(message)
async def _spawn(self, message: str) -> None:
"""
Spawn Claude Code subprocess with a message.
Builds command with persona settings, spawns process with pipes,
and launches concurrent stream readers.
Args:
message: Initial message to send
"""
self._spawn_time = time.monotonic() self._spawn_time = time.monotonic()
self._first_output_time = None self._first_output_time = None
# Build command # Build command for persistent process
cmd = [ cmd = [
"claude", "claude",
"-p", "-p",
message, "--input-format", "stream-json",
"--output-format", "--output-format", "stream-json",
"stream-json",
"--verbose", "--verbose",
] ]
# Add --continue if prior session exists
if (self._session_dir / ".claude").exists():
cmd.append("--continue")
logger.debug("Using --continue flag (found existing .claude/ directory)")
# Add persona settings # Add persona settings
if self._persona: if self._persona:
if "system_prompt" in self._persona: if "system_prompt" in self._persona:
@ -163,11 +151,6 @@ class ClaudeSubprocess:
if "model" in settings: if "model" in settings:
cmd.extend(["--model", settings["model"]]) cmd.extend(["--model", settings["model"]])
# Add --continue if prior session exists
if (self._session_dir / ".claude").exists():
cmd.append("--continue")
logger.debug(f"Using --continue flag (found existing .claude/ directory)")
# Prepare environment # Prepare environment
env = os.environ.copy() env = os.environ.copy()
# Ensure PATH includes ~/.local/bin and ~/bin (for claude CLI) # Ensure PATH includes ~/.local/bin and ~/bin (for claude CLI)
@ -183,99 +166,85 @@ class ClaudeSubprocess:
# Ensure session directory exists # Ensure session directory exists
self._session_dir.mkdir(parents=True, exist_ok=True) self._session_dir.mkdir(parents=True, exist_ok=True)
# Log full command (with model) for debugging # Log command
cmd_flags = [c for c in cmd if c.startswith("--")] cmd_flags = [c for c in cmd if c.startswith("--")]
logger.info( logger.info(
f"[TIMING] Spawning: cwd={self._session_dir.name}, " f"[TIMING] Starting persistent subprocess: cwd={self._session_dir.name}, "
f"flags={' '.join(cmd_flags)}, msg={message[:60]}..." f"flags={' '.join(cmd_flags)}"
) )
try: try:
# Spawn subprocess # Spawn subprocess
self._process = await asyncio.create_subprocess_exec( self._process = await asyncio.create_subprocess_exec(
*cmd, *cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
cwd=str(self._session_dir), cwd=str(self._session_dir),
env=env, env=env,
) )
self._busy = True
elapsed = time.monotonic() - self._spawn_time elapsed = time.monotonic() - self._spawn_time
logger.info(f"[TIMING] Process spawned: PID={self._process.pid} (+{elapsed:.3f}s)") logger.info(f"[TIMING] Persistent subprocess started: PID={self._process.pid} (+{elapsed:.3f}s)")
# Launch concurrent stream readers # Launch background readers (persistent, not per-turn)
self._reader_task = asyncio.create_task(self._read_streams()) self._stdout_reader_task = asyncio.create_task(self._read_stdout())
self._stderr_reader_task = asyncio.create_task(self._read_stderr())
except Exception as e: except Exception as e:
logger.error(f"Failed to spawn Claude Code: {e}") logger.error(f"Failed to spawn Claude Code: {e}")
if self.on_error: if self.on_error:
self.on_error(f"Failed to start Claude: {e}") self.on_error(f"Failed to start Claude: {e}")
self._busy = False
async def _read_streams(self) -> None: async def send_message(self, message: str) -> None:
""" """
Read stdout and stderr concurrently until both complete. Send a message to the persistent Claude Code subprocess.
This is CRITICAL for deadlock prevention. Both streams are read Writes NDJSON message to stdin. If process not started, starts it first.
concurrently using asyncio.gather() to prevent pipe buffer overflow. If busy, queues the message.
After streams end, waits for process termination, marks as not busy, Args:
calls completion callback, and processes any queued messages. message: Message to send to Claude
""" """
if not self._process: # Auto-start if not running
if not self.is_alive:
logger.debug("Process not running, starting it first")
await self.start()
if not self._process or not self._process.stdin:
raise RuntimeError("Subprocess not running or stdin not available")
# Queue if busy
if self._busy:
logger.debug(f"Queueing message (process busy): {message[:50]}...")
await self._message_queue.put(message)
return return
# Mark as busy
self._busy = True
self._send_time = time.monotonic()
# Write NDJSON to stdin
try: try:
# Read both streams concurrently (CRITICAL: prevents deadlock) msg_dict = {"type": "user", "content": message}
await asyncio.gather( ndjson_line = json.dumps(msg_dict) + '\n'
self._read_stdout(), self._process.stdin.write(ndjson_line.encode())
self._read_stderr(), await self._process.stdin.drain() # CRITICAL: flush buffer
return_exceptions=True, logger.debug(f"Sent message to stdin: {message[:60]}...")
)
# Wait for process to terminate
returncode = await self._process.wait()
elapsed = time.monotonic() - self._spawn_time
logger.info(f"[TIMING] Process exited: returncode={returncode}, total={elapsed:.3f}s")
# Handle crash (non-zero exit, but not signals from our own termination)
# Negative return codes are signals (e.g., -15 = SIGTERM, -9 = SIGKILL)
# Only auto-restart on genuine crashes (positive non-zero exit codes)
if returncode is not None and returncode > 0:
await self._handle_crash()
return
# Reset crash count on successful completion
self._crash_count = 0
except Exception as e: except Exception as e:
logger.error(f"Error reading streams: {e}") logger.error(f"Failed to send message to stdin: {e}")
if self.on_error:
self.on_error(f"Stream reading error: {e}")
finally:
# Mark as not busy
self._busy = False self._busy = False
if self.on_error:
self.on_error(f"Failed to send message: {e}")
# Call completion callback
if self.on_complete:
try:
if inspect.iscoroutinefunction(self.on_complete):
asyncio.create_task(self.on_complete())
else:
self.on_complete()
except Exception as e:
logger.error(f"Error in on_complete callback: {e}")
# Process queued messages
if not self._message_queue.empty():
next_message = await self._message_queue.get()
logger.debug(f"Processing queued message: {next_message[:50]}...")
await self.send_message(next_message)
async def _read_stdout(self) -> None: async def _read_stdout(self) -> None:
"""Read stdout line-by-line and handle each line.""" """
Read stdout stream-json events persistently.
Runs for the lifetime of the process, not per-turn. Exits only when
process dies (readline returns empty bytes).
"""
if not self._process or not self._process.stdout: if not self._process or not self._process.stdout:
return return
@ -284,19 +253,34 @@ class ClaudeSubprocess:
while True: while True:
line = await self._process.stdout.readline() line = await self._process.stdout.readline()
if not line: if not line:
# Process died
logger.warning("Stdout stream ended (process died)")
break break
line_str = line.decode().rstrip() line_str = line.decode().rstrip()
if line_str: if line_str:
if first_line: if first_line:
elapsed = time.monotonic() - self._spawn_time elapsed = time.monotonic() - self._spawn_time
logger.info(f"[TIMING] First stdout line: +{elapsed:.3f}s") logger.info(f"[TIMING] First stdout line: +{elapsed:.3f}s")
first_line = False first_line = False
self._handle_stdout_line(line_str) await self._handle_stdout_line(line_str)
except Exception as e: except Exception as e:
logger.error(f"Error reading stdout: {e}") logger.error(f"Error reading stdout: {e}")
finally:
# Process died unexpectedly - trigger crash recovery
if self.is_alive and self._process.returncode is None:
# Process still running but stdout closed - unusual
logger.error("Stdout closed but process still running")
elif self._process and self._process.returncode is not None and self._process.returncode > 0:
# Process crashed
await self._handle_crash()
async def _read_stderr(self) -> None: async def _read_stderr(self) -> None:
"""Read stderr line-by-line and handle each line.""" """
Read stderr persistently.
Runs for the lifetime of the process. Exits only when process dies.
"""
if not self._process or not self._process.stderr: if not self._process or not self._process.stderr:
return return
@ -304,6 +288,7 @@ class ClaudeSubprocess:
while True: while True:
line = await self._process.stderr.readline() line = await self._process.stderr.readline()
if not line: if not line:
# Process died
break break
line_str = line.decode().rstrip() line_str = line.decode().rstrip()
if line_str: if line_str:
@ -311,13 +296,15 @@ class ClaudeSubprocess:
except Exception as e: except Exception as e:
logger.error(f"Error reading stderr: {e}") logger.error(f"Error reading stderr: {e}")
def _handle_stdout_line(self, line: str) -> None: async def _handle_stdout_line(self, line: str) -> None:
""" """
Parse and route stream-json events from stdout. Parse and route stream-json events from stdout.
Handles three event types: Handles event types:
- "assistant": Extract text blocks and call on_output - "assistant": Extract text blocks and call on_output
- "result": Turn complete, check for errors - "content_block_start": Extract tool_use events and call on_tool_use
- "content_block_delta": Handle tool input streaming (future)
- "result": Turn complete, mark not busy, call on_complete
- "system": System events, log and check for errors - "system": System events, log and check for errors
Args: Args:
@ -347,9 +334,33 @@ class ClaudeSubprocess:
except Exception as e: except Exception as e:
logger.error(f"Error in on_output callback: {e}") logger.error(f"Error in on_output callback: {e}")
elif event_type == "content_block_start":
# Check for tool_use block
content_block = event.get("content_block", {})
if content_block.get("type") == "tool_use":
tool_name = content_block.get("name", "unknown")
tool_input = content_block.get("input", {})
logger.debug(f"Tool use started: {tool_name} with input {tool_input}")
if self.on_tool_use:
try:
if inspect.iscoroutinefunction(self.on_tool_use):
asyncio.create_task(self.on_tool_use(tool_name, tool_input))
else:
self.on_tool_use(tool_name, tool_input)
except Exception as e:
logger.error(f"Error in on_tool_use callback: {e}")
elif event_type == "content_block_delta":
# Tool input streaming (not needed for initial implementation)
pass
elif event_type == "result": elif event_type == "result":
# Turn complete # Turn complete - mark not busy and call on_complete
elapsed = time.monotonic() - self._spawn_time if hasattr(self, '_send_time'):
elapsed = time.monotonic() - self._send_time
else:
elapsed = time.monotonic() - self._spawn_time
session_id = event.get("session_id") session_id = event.get("session_id")
logger.info(f"[TIMING] Result event: +{elapsed:.3f}s (session={session_id})") logger.info(f"[TIMING] Result event: +{elapsed:.3f}s (session={session_id})")
@ -364,6 +375,25 @@ class ClaudeSubprocess:
except Exception as e: except Exception as e:
logger.error(f"Error in on_error callback: {e}") logger.error(f"Error in on_error callback: {e}")
# Mark not busy
self._busy = False
# Call completion callback
if self.on_complete:
try:
if inspect.iscoroutinefunction(self.on_complete):
asyncio.create_task(self.on_complete())
else:
self.on_complete()
except Exception as e:
logger.error(f"Error in on_complete callback: {e}")
# Process queued messages
if not self._message_queue.empty():
next_message = await self._message_queue.get()
logger.debug(f"Processing queued message: {next_message[:50]}...")
await self.send_message(next_message)
elif event_type == "system": elif event_type == "system":
# System event # System event
subtype = event.get("subtype") subtype = event.get("subtype")
@ -413,8 +443,8 @@ class ClaudeSubprocess:
""" """
Handle Claude Code crash (non-zero exit). Handle Claude Code crash (non-zero exit).
Attempts to restart with --continue flag up to MAX_CRASH_RETRIES times. Attempts to restart persistent process with --continue flag up to
Notifies user via on_status callback. MAX_CRASH_RETRIES times. Notifies user via on_status callback.
""" """
self._crash_count += 1 self._crash_count += 1
logger.error( logger.error(
@ -430,6 +460,7 @@ class ClaudeSubprocess:
else: else:
self.on_error(error_msg) self.on_error(error_msg)
self._crash_count = 0 # Reset for next session self._crash_count = 0 # Reset for next session
self._busy = False
return return
# Notify user # Notify user
@ -445,26 +476,28 @@ class ClaudeSubprocess:
# Wait before retrying # Wait before retrying
await asyncio.sleep(self.CRASH_BACKOFF_SECONDS) await asyncio.sleep(self.CRASH_BACKOFF_SECONDS)
# Respawn with --continue flag (loads most recent session) # Restart persistent process
try: try:
# Get next queued message or use placeholder await self.start()
# Resend queued messages if any
if not self._message_queue.empty(): if not self._message_queue.empty():
next_message = await self._message_queue.get() next_message = await self._message_queue.get()
else: logger.debug(f"Resending queued message after crash: {next_message[:50]}...")
next_message = "continue" # Minimal message to resume await self.send_message(next_message)
await self._spawn(next_message)
except Exception as e: except Exception as e:
logger.error(f"Failed to respawn after crash: {e}") logger.error(f"Failed to restart after crash: {e}")
if self.on_error: if self.on_error:
self.on_error(f"Failed to restart Claude: {e}") self.on_error(f"Failed to restart Claude: {e}")
self._busy = False
async def terminate(self, timeout: int = 10) -> None: async def terminate(self, timeout: int = 10) -> None:
""" """
Terminate subprocess gracefully. Terminate persistent subprocess gracefully.
Sends SIGTERM, waits for clean exit with timeout, then SIGKILL if needed. Closes stdin, sends SIGTERM, waits for clean exit with timeout, then
Always reaps process to prevent zombies. SIGKILL if needed. Always reaps process to prevent zombies.
Args: Args:
timeout: Seconds to wait for graceful termination before SIGKILL timeout: Seconds to wait for graceful termination before SIGKILL
@ -476,6 +509,11 @@ class ClaudeSubprocess:
logger.info(f"Terminating Claude Code process: PID={self._process.pid}") logger.info(f"Terminating Claude Code process: PID={self._process.pid}")
try: try:
# Close stdin to signal end of input
if self._process.stdin:
self._process.stdin.close()
await self._process.stdin.wait_closed()
# Send SIGTERM # Send SIGTERM
self._process.terminate() self._process.terminate()
@ -500,10 +538,11 @@ class ClaudeSubprocess:
self._process = None self._process = None
self._busy = False self._busy = False
# Cancel reader task if still running # Cancel reader tasks if still running
if self._reader_task and not self._reader_task.done(): for task in [self._stdout_reader_task, self._stderr_reader_task]:
self._reader_task.cancel() if task and not task.done():
try: task.cancel()
await self._reader_task try:
except asyncio.CancelledError: await task
pass except asyncio.CancelledError:
pass