homelab/telegram/claude_subprocess.py
Mikkel Georgsen 2302b75e14 fix(01-02): read model/max_turns from persona settings object
Persona JSON nests model and max_turns under "settings" but the
subprocess was looking for them at the top level, so --model and
--max-turns were never passed to claude CLI.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 18:18:38 +00:00

492 lines
18 KiB
Python

"""
Claude Code subprocess management for Telegram bot.
This module provides the ClaudeSubprocess class that safely spawns, communicates
with, and manages Claude Code CLI processes using asyncio. It handles the dangerous
parts: pipe management without deadlocks, process lifecycle without zombies,
message queueing during processing, and crash recovery with session resumption.
The subprocess runs in a session directory and outputs stream-json format for
structured, line-by-line parsing. Stdout and stderr are read concurrently via
asyncio.gather to prevent pipe buffer deadlocks.
Key features:
- Concurrent stdout/stderr reading (no pipe deadlock)
- Clean process termination (no zombie processes)
- Message queuing during processing
- Automatic crash recovery with --continue flag
- Stream-json event routing to callbacks
Based on research in: .planning/phases/01-session-process-foundation/01-RESEARCH.md
"""
import asyncio
import inspect
import json
import logging
import os
from pathlib import Path
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class ClaudeSubprocess:
"""
Manages a single Claude Code CLI subprocess.
Spawns Claude Code in a session directory, handles I/O via pipes with
concurrent stream reading, queues messages during processing, and auto-restarts
on crash with context preservation.
Example:
sub = ClaudeSubprocess(
session_dir=Path("/home/mikkel/telegram/sessions/my-session"),
persona={"system_prompt": "You are a helpful assistant"},
on_output=lambda text: print(f"Claude: {text}"),
on_error=lambda err: print(f"Error: {err}"),
on_complete=lambda: print("Turn complete"),
on_status=lambda status: print(f"Status: {status}")
)
await sub.send_message("Hello, Claude!")
# ... later ...
await sub.terminate()
"""
MAX_CRASH_RETRIES = 3
CRASH_BACKOFF_SECONDS = 1
def __init__(
self,
session_dir: Path,
persona: Optional[dict] = None,
on_output: Optional[Callable[[str], None]] = None,
on_error: Optional[Callable[[str], None]] = None,
on_complete: Optional[Callable[[], None]] = None,
on_status: Optional[Callable[[str], None]] = None,
):
"""
Initialize ClaudeSubprocess.
Args:
session_dir: Path to session directory (cwd for subprocess)
persona: Persona dict with system_prompt and settings (model, max_turns)
on_output: Callback(text: str) for assistant text output
on_error: Callback(error: str) for error messages
on_complete: Callback() when a turn completes
on_status: Callback(status: str) for status updates (e.g. "Claude restarted")
"""
self._session_dir = Path(session_dir)
self._persona = persona or {}
self.on_output = on_output
self.on_error = on_error
self.on_complete = on_complete
self.on_status = on_status
# Process state
self._process: Optional[asyncio.subprocess.Process] = None
self._busy = False
self._message_queue: asyncio.Queue = asyncio.Queue()
self._reader_task: Optional[asyncio.Task] = None
self._crash_count = 0
logger.debug(
f"ClaudeSubprocess initialized: session_dir={session_dir}, "
f"persona={persona.get('system_prompt', 'none')[:50] if persona else 'none'}"
)
@property
def is_busy(self) -> bool:
"""Return whether subprocess is currently processing a message."""
return self._busy
@property
def is_alive(self) -> bool:
"""Return whether subprocess process is running."""
return self._process is not None and self._process.returncode is None
async def send_message(self, message: str) -> None:
"""
Send a message to Claude Code.
If no process is running: spawn one with this message.
If process IS running and BUSY: queue message.
If process IS running and IDLE: send as new turn (spawn new invocation).
Args:
message: Message to send to Claude
"""
if not self.is_alive:
# No process running - spawn new one
await self._spawn(message)
elif self._busy:
# Process is busy - queue message
logger.debug(f"Queueing message (process busy): {message[:50]}...")
await self._message_queue.put(message)
else:
# Process is idle - spawn new turn
# Note: For simplicity, we spawn a fresh process per turn
# Phase 2+ might use --input-format stream-json for live piping
await self._spawn(message)
async def _spawn(self, message: str) -> None:
"""
Spawn Claude Code subprocess with a message.
Builds command with persona settings, spawns process with pipes,
and launches concurrent stream readers.
Args:
message: Initial message to send
"""
# Build command
cmd = [
"claude",
"-p",
message,
"--output-format",
"stream-json",
"--verbose",
]
# Add persona settings
if self._persona:
if "system_prompt" in self._persona:
cmd.extend(["--system-prompt", self._persona["system_prompt"]])
settings = self._persona.get("settings", {})
if "max_turns" in settings:
cmd.extend(["--max-turns", str(settings["max_turns"])])
if "model" in settings:
cmd.extend(["--model", settings["model"]])
# Add --continue if prior session exists
if (self._session_dir / ".claude").exists():
cmd.append("--continue")
logger.debug(f"Using --continue flag (found existing .claude/ directory)")
# Prepare environment
env = os.environ.copy()
# Ensure PATH includes ~/.local/bin and ~/bin (for claude CLI)
path_parts = env.get("PATH", "").split(":")
home_bin = str(Path.home() / "bin")
local_bin = str(Path.home() / ".local" / "bin")
if home_bin not in path_parts:
path_parts.insert(0, home_bin)
if local_bin not in path_parts:
path_parts.insert(0, local_bin)
env["PATH"] = ":".join(path_parts)
# Ensure session directory exists
self._session_dir.mkdir(parents=True, exist_ok=True)
logger.info(
f"Spawning Claude Code: cwd={self._session_dir}, "
f"cmd={' '.join(cmd[:4])}... (truncated)"
)
try:
# Spawn subprocess
self._process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self._session_dir),
env=env,
)
self._busy = True
logger.info(f"Claude Code spawned: PID={self._process.pid}")
# Launch concurrent stream readers
self._reader_task = asyncio.create_task(self._read_streams())
except Exception as e:
logger.error(f"Failed to spawn Claude Code: {e}")
if self.on_error:
self.on_error(f"Failed to start Claude: {e}")
self._busy = False
async def _read_streams(self) -> None:
"""
Read stdout and stderr concurrently until both complete.
This is CRITICAL for deadlock prevention. Both streams are read
concurrently using asyncio.gather() to prevent pipe buffer overflow.
After streams end, waits for process termination, marks as not busy,
calls completion callback, and processes any queued messages.
"""
if not self._process:
return
try:
# Read both streams concurrently (CRITICAL: prevents deadlock)
await asyncio.gather(
self._read_stdout(),
self._read_stderr(),
return_exceptions=True,
)
# Wait for process to terminate
returncode = await self._process.wait()
logger.info(f"Claude Code process exited: returncode={returncode}")
# Handle crash (non-zero exit, but not signals from our own termination)
# Negative return codes are signals (e.g., -15 = SIGTERM, -9 = SIGKILL)
# Only auto-restart on genuine crashes (positive non-zero exit codes)
if returncode is not None and returncode > 0:
await self._handle_crash()
return
# Reset crash count on successful completion
self._crash_count = 0
except Exception as e:
logger.error(f"Error reading streams: {e}")
if self.on_error:
self.on_error(f"Stream reading error: {e}")
finally:
# Mark as not busy
self._busy = False
# Call completion callback
if self.on_complete:
try:
if inspect.iscoroutinefunction(self.on_complete):
asyncio.create_task(self.on_complete())
else:
self.on_complete()
except Exception as e:
logger.error(f"Error in on_complete callback: {e}")
# Process queued messages
if not self._message_queue.empty():
next_message = await self._message_queue.get()
logger.debug(f"Processing queued message: {next_message[:50]}...")
await self.send_message(next_message)
async def _read_stdout(self) -> None:
"""Read stdout line-by-line and handle each line."""
if not self._process or not self._process.stdout:
return
try:
while True:
line = await self._process.stdout.readline()
if not line:
break
line_str = line.decode().rstrip()
if line_str:
self._handle_stdout_line(line_str)
except Exception as e:
logger.error(f"Error reading stdout: {e}")
async def _read_stderr(self) -> None:
"""Read stderr line-by-line and handle each line."""
if not self._process or not self._process.stderr:
return
try:
while True:
line = await self._process.stderr.readline()
if not line:
break
line_str = line.decode().rstrip()
if line_str:
self._handle_stderr_line(line_str)
except Exception as e:
logger.error(f"Error reading stderr: {e}")
def _handle_stdout_line(self, line: str) -> None:
"""
Parse and route stream-json events from stdout.
Handles three event types:
- "assistant": Extract text blocks and call on_output
- "result": Turn complete, check for errors
- "system": System events, log and check for errors
Args:
line: Single line of stream-json output
"""
try:
event = json.loads(line)
event_type = event.get("type")
if event_type == "assistant":
# Extract text from assistant message
message = event.get("message", {})
content = message.get("content", [])
for block in content:
if block.get("type") == "text":
text = block.get("text", "")
if text and self.on_output:
try:
if inspect.iscoroutinefunction(self.on_output):
asyncio.create_task(self.on_output(text))
else:
self.on_output(text)
except Exception as e:
logger.error(f"Error in on_output callback: {e}")
elif event_type == "result":
# Turn complete
session_id = event.get("session_id")
if session_id:
logger.debug(f"Turn complete: session_id={session_id}")
# Check for error
if event.get("is_error") and self.on_error:
error_msg = event.get("error", "Unknown error")
try:
if inspect.iscoroutinefunction(self.on_error):
asyncio.create_task(self.on_error(f"Claude error: {error_msg}"))
else:
self.on_error(f"Claude error: {error_msg}")
except Exception as e:
logger.error(f"Error in on_error callback: {e}")
elif event_type == "system":
# System event
subtype = event.get("subtype")
logger.debug(f"System event: subtype={subtype}")
# Check for error subtype
if subtype == "error" and self.on_error:
error_msg = event.get("message", "System error")
try:
if inspect.iscoroutinefunction(self.on_error):
asyncio.create_task(self.on_error(f"System error: {error_msg}"))
else:
self.on_error(f"System error: {error_msg}")
except Exception as e:
logger.error(f"Error in on_error callback: {e}")
else:
logger.debug(f"Unknown event type: {event_type}")
except json.JSONDecodeError:
# Non-JSON line (Claude Code may emit diagnostics)
logger.warning(f"Non-JSON stdout line: {line[:100]}")
def _handle_stderr_line(self, line: str) -> None:
"""
Handle stderr output from Claude Code.
Logs as warning. If line contains "error" (case-insensitive),
also calls on_error callback.
Args:
line: Single line of stderr output
"""
logger.warning(f"Claude Code stderr: {line}")
# If line contains error, notify via callback
if "error" in line.lower() and self.on_error:
try:
if inspect.iscoroutinefunction(self.on_error):
asyncio.create_task(self.on_error(f"Claude stderr: {line}"))
else:
self.on_error(f"Claude stderr: {line}")
except Exception as e:
logger.error(f"Error in on_error callback: {e}")
async def _handle_crash(self) -> None:
"""
Handle Claude Code crash (non-zero exit).
Attempts to restart with --continue flag up to MAX_CRASH_RETRIES times.
Notifies user via on_status callback.
"""
self._crash_count += 1
logger.error(
f"Claude Code crashed (attempt {self._crash_count}/{self.MAX_CRASH_RETRIES})"
)
if self._crash_count >= self.MAX_CRASH_RETRIES:
error_msg = f"Claude failed to restart after {self.MAX_CRASH_RETRIES} attempts"
logger.error(error_msg)
if self.on_error:
if inspect.iscoroutinefunction(self.on_error):
asyncio.create_task(self.on_error(error_msg))
else:
self.on_error(error_msg)
self._crash_count = 0 # Reset for next session
return
# Notify user
if self.on_status:
try:
if inspect.iscoroutinefunction(self.on_status):
asyncio.create_task(self.on_status("Claude crashed, restarting with context preserved..."))
else:
self.on_status("Claude crashed, restarting with context preserved...")
except Exception as e:
logger.error(f"Error in on_status callback: {e}")
# Wait before retrying
await asyncio.sleep(self.CRASH_BACKOFF_SECONDS)
# Respawn with --continue flag (loads most recent session)
try:
# Get next queued message or use placeholder
if not self._message_queue.empty():
next_message = await self._message_queue.get()
else:
next_message = "continue" # Minimal message to resume
await self._spawn(next_message)
except Exception as e:
logger.error(f"Failed to respawn after crash: {e}")
if self.on_error:
self.on_error(f"Failed to restart Claude: {e}")
async def terminate(self, timeout: int = 10) -> None:
"""
Terminate subprocess gracefully.
Sends SIGTERM, waits for clean exit with timeout, then SIGKILL if needed.
Always reaps process to prevent zombies.
Args:
timeout: Seconds to wait for graceful termination before SIGKILL
"""
if not self._process or self._process.returncode is not None:
logger.debug("No process to terminate or already terminated")
return
logger.info(f"Terminating Claude Code process: PID={self._process.pid}")
try:
# Send SIGTERM
self._process.terminate()
# Wait for graceful exit with timeout
try:
await asyncio.wait_for(self._process.wait(), timeout=timeout)
logger.info("Claude Code terminated gracefully")
except asyncio.TimeoutError:
# Timeout - force kill
logger.warning(
f"Claude Code did not terminate within {timeout}s, sending SIGKILL"
)
self._process.kill()
await self._process.wait() # CRITICAL: Always reap to prevent zombie
logger.info("Claude Code killed")
except Exception as e:
logger.error(f"Error terminating process: {e}")
finally:
# Clear process reference
self._process = None
self._busy = False
# Cancel reader task if still running
if self._reader_task and not self._reader_task.done():
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass