homelab/telegram/claude_subprocess.py
Mikkel Georgsen 0bca340920 feat(02-02): add /model command and fix stdout buffer overflow
- Add /model command to switch models per-session (persisted in metadata)
- Support aliases: sonnet, opus, haiku → full model IDs
- Add load_persona_for_session() helper that applies model override
- Increase asyncio subprocess stdout buffer to 10MB (fixes crash on
  large stream-json lines from image tool results)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 22:42:23 +00:00

551 lines
22 KiB
Python

"""
Claude Code subprocess management for Telegram bot.
This module provides the ClaudeSubprocess class that maintains a persistent
Claude Code CLI subprocess using stream-json I/O for efficient multi-turn
conversations without respawning per turn.
The persistent subprocess accepts NDJSON messages on stdin, emits stream-json
events on stdout, and maintains conversation context across turns. Stdout and
stderr are read concurrently via asyncio.gather to prevent pipe buffer deadlocks.
Key features:
- Persistent subprocess with stream-json stdin/stdout (eliminates ~1s spawn overhead)
- Concurrent stdout/stderr reading (no pipe deadlock)
- Clean process termination (no zombie processes)
- Message queuing during processing
- Automatic crash recovery with --continue flag
- Stream-json event routing to callbacks (including tool_use events)
Based on research in: .planning/phases/02-telegram-integration/02-RESEARCH.md
"""
import asyncio
import inspect
import json
import logging
import os
import time
from pathlib import Path
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class ClaudeSubprocess:
"""
Manages a persistent Claude Code CLI subprocess with stream-json I/O.
Spawns Claude Code once and maintains the process across multiple turns,
accepting NDJSON messages on stdin. This eliminates ~1s spawn overhead per
message and preserves conversation context.
Example:
sub = ClaudeSubprocess(
session_dir=Path("/home/mikkel/telegram/sessions/my-session"),
persona={"system_prompt": "You are a helpful assistant"},
on_output=lambda text: print(f"Claude: {text}"),
on_error=lambda err: print(f"Error: {err}"),
on_complete=lambda: print("Turn complete"),
on_status=lambda status: print(f"Status: {status}"),
on_tool_use=lambda name, inp: print(f"Tool: {name} {inp}")
)
await sub.start()
await sub.send_message("Hello, Claude!")
await sub.send_message("What's the weather?")
# ... later ...
await sub.terminate()
"""
MAX_CRASH_RETRIES = 3
CRASH_BACKOFF_SECONDS = 1
def __init__(
self,
session_dir: Path,
persona: Optional[dict] = None,
on_output: Optional[Callable[[str], None]] = None,
on_error: Optional[Callable[[str], None]] = None,
on_complete: Optional[Callable[[], None]] = None,
on_status: Optional[Callable[[str], None]] = None,
on_tool_use: Optional[Callable[[str, dict], None]] = None,
):
"""
Initialize ClaudeSubprocess.
Args:
session_dir: Path to session directory (cwd for subprocess)
persona: Persona dict with system_prompt and settings (model, max_turns)
on_output: Callback(text: str) for assistant text output
on_error: Callback(error: str) for error messages
on_complete: Callback() when a turn completes
on_status: Callback(status: str) for status updates (e.g. "Claude restarted")
on_tool_use: Callback(tool_name: str, tool_input: dict) for tool call progress
"""
self._session_dir = Path(session_dir)
self._persona = persona or {}
self.on_output = on_output
self.on_error = on_error
self.on_complete = on_complete
self.on_status = on_status
self.on_tool_use = on_tool_use
# Process state
self._process: Optional[asyncio.subprocess.Process] = None
self._busy = False
self._message_queue: asyncio.Queue = asyncio.Queue()
self._stdout_reader_task: Optional[asyncio.Task] = None
self._stderr_reader_task: Optional[asyncio.Task] = None
self._crash_count = 0
logger.debug(
f"ClaudeSubprocess initialized: session_dir={session_dir}, "
f"persona={persona.get('system_prompt', 'none')[:50] if persona else 'none'}"
)
@property
def is_busy(self) -> bool:
"""Return whether subprocess is currently processing a message."""
return self._busy
@property
def is_alive(self) -> bool:
"""Return whether subprocess process is running."""
return self._process is not None and self._process.returncode is None
async def start(self) -> None:
"""
Start the persistent Claude Code subprocess.
Spawns process with stream-json I/O and launches background readers.
Must be called before send_message().
"""
if self.is_alive:
logger.warning("Subprocess already running")
return
self._spawn_time = time.monotonic()
self._first_output_time = None
# Build command for persistent process
cmd = [
"claude",
"-p",
"--input-format", "stream-json",
"--output-format", "stream-json",
"--verbose",
"--dangerously-skip-permissions",
]
# Add --continue if prior session exists
if (self._session_dir / ".claude").exists():
cmd.append("--continue")
logger.debug("Using --continue flag (found existing .claude/ directory)")
# Add persona settings (model FIRST, then system prompt)
if self._persona:
settings = self._persona.get("settings", {})
if "model" in settings:
cmd.extend(["--model", settings["model"]])
if "max_turns" in settings:
cmd.extend(["--max-turns", str(settings["max_turns"])])
if "system_prompt" in self._persona:
cmd.extend(["--append-system-prompt", self._persona["system_prompt"]])
# Prepare environment
env = os.environ.copy()
# Ensure PATH includes ~/.local/bin and ~/bin (for claude CLI)
path_parts = env.get("PATH", "").split(":")
home_bin = str(Path.home() / "bin")
local_bin = str(Path.home() / ".local" / "bin")
if home_bin not in path_parts:
path_parts.insert(0, home_bin)
if local_bin not in path_parts:
path_parts.insert(0, local_bin)
env["PATH"] = ":".join(path_parts)
# Ensure session directory exists
self._session_dir.mkdir(parents=True, exist_ok=True)
# Log full command
logger.info(
f"[TIMING] Starting persistent subprocess: cwd={self._session_dir.name}, "
f"cmd={' '.join(cmd)}"
)
try:
# Spawn subprocess (10MB stdout limit for large stream-json lines e.g. image tool results)
self._process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self._session_dir),
env=env,
limit=10 * 1024 * 1024,
)
elapsed = time.monotonic() - self._spawn_time
logger.info(f"[TIMING] Persistent subprocess started: PID={self._process.pid} (+{elapsed:.3f}s)")
# Launch background readers (persistent, not per-turn)
self._stdout_reader_task = asyncio.create_task(self._read_stdout())
self._stderr_reader_task = asyncio.create_task(self._read_stderr())
except Exception as e:
logger.error(f"Failed to spawn Claude Code: {e}")
if self.on_error:
self.on_error(f"Failed to start Claude: {e}")
async def send_message(self, message: str) -> None:
"""
Send a message to the persistent Claude Code subprocess.
Writes NDJSON message to stdin. If process not started, starts it first.
If busy, queues the message.
Args:
message: Message to send to Claude
"""
# Auto-start if not running
if not self.is_alive:
logger.debug("Process not running, starting it first")
await self.start()
if not self._process or not self._process.stdin:
raise RuntimeError("Subprocess not running or stdin not available")
# Queue if busy
if self._busy:
logger.debug(f"Queueing message (process busy): {message[:50]}...")
await self._message_queue.put(message)
return
# Mark as busy
self._busy = True
self._send_time = time.monotonic()
# Write NDJSON to stdin
try:
msg_dict = {"type": "user", "message": {"role": "user", "content": message}}
ndjson_line = json.dumps(msg_dict) + '\n'
self._process.stdin.write(ndjson_line.encode())
await self._process.stdin.drain() # CRITICAL: flush buffer
logger.debug(f"Sent message to stdin: {message[:60]}...")
except Exception as e:
logger.error(f"Failed to send message to stdin: {e}")
self._busy = False
if self.on_error:
self.on_error(f"Failed to send message: {e}")
async def _read_stdout(self) -> None:
"""
Read stdout stream-json events persistently.
Runs for the lifetime of the process, not per-turn. Exits only when
process dies (readline returns empty bytes).
"""
if not self._process or not self._process.stdout:
return
first_line = True
try:
while True:
line = await self._process.stdout.readline()
if not line:
# Process died
logger.warning("Stdout stream ended (process died)")
break
line_str = line.decode().rstrip()
if line_str:
if first_line:
elapsed = time.monotonic() - self._spawn_time
logger.info(f"[TIMING] First stdout line: +{elapsed:.3f}s")
first_line = False
await self._handle_stdout_line(line_str)
except Exception as e:
logger.error(f"Error reading stdout: {e}")
finally:
# Process died unexpectedly - trigger crash recovery
if self.is_alive and self._process.returncode is None:
# Process still running but stdout closed - unusual
logger.error("Stdout closed but process still running")
elif self._process and self._process.returncode is not None and self._process.returncode > 0:
# Process crashed
await self._handle_crash()
async def _read_stderr(self) -> None:
"""
Read stderr persistently.
Runs for the lifetime of the process. Exits only when process dies.
"""
if not self._process or not self._process.stderr:
return
try:
while True:
line = await self._process.stderr.readline()
if not line:
# Process died
break
line_str = line.decode().rstrip()
if line_str:
self._handle_stderr_line(line_str)
except Exception as e:
logger.error(f"Error reading stderr: {e}")
async def _handle_stdout_line(self, line: str) -> None:
"""
Parse and route stream-json events from stdout.
Handles event types:
- "assistant": Extract text blocks and call on_output
- "content_block_start": Extract tool_use events and call on_tool_use
- "content_block_delta": Handle tool input streaming (future)
- "result": Turn complete, mark not busy, call on_complete
- "system": System events, log and check for errors
Args:
line: Single line of stream-json output
"""
try:
event = json.loads(line)
event_type = event.get("type")
if event_type == "assistant":
# Extract text from assistant message
message = event.get("message", {})
model = message.get("model", "unknown")
logger.info(f"Assistant response model: {model}")
content = message.get("content", [])
for block in content:
if block.get("type") == "text":
text = block.get("text", "")
if text and self.on_output:
if not self._first_output_time:
self._first_output_time = time.monotonic()
elapsed = self._first_output_time - self._spawn_time
logger.info(f"[TIMING] First assistant text: +{elapsed:.3f}s ({len(text)} chars)")
try:
if inspect.iscoroutinefunction(self.on_output):
asyncio.create_task(self.on_output(text))
else:
self.on_output(text)
except Exception as e:
logger.error(f"Error in on_output callback: {e}")
elif event_type == "content_block_start":
# Check for tool_use block
content_block = event.get("content_block", {})
if content_block.get("type") == "tool_use":
tool_name = content_block.get("name", "unknown")
tool_input = content_block.get("input", {})
logger.debug(f"Tool use started: {tool_name} with input {tool_input}")
if self.on_tool_use:
try:
if inspect.iscoroutinefunction(self.on_tool_use):
asyncio.create_task(self.on_tool_use(tool_name, tool_input))
else:
self.on_tool_use(tool_name, tool_input)
except Exception as e:
logger.error(f"Error in on_tool_use callback: {e}")
elif event_type == "content_block_delta":
# Tool input streaming (not needed for initial implementation)
pass
elif event_type == "result":
# Turn complete - mark not busy and call on_complete
if hasattr(self, '_send_time'):
elapsed = time.monotonic() - self._send_time
else:
elapsed = time.monotonic() - self._spawn_time
session_id = event.get("session_id")
logger.info(f"[TIMING] Result event: +{elapsed:.3f}s (session={session_id})")
# Check for error
if event.get("is_error") and self.on_error:
error_msg = event.get("error", "Unknown error")
try:
if inspect.iscoroutinefunction(self.on_error):
asyncio.create_task(self.on_error(f"Claude error: {error_msg}"))
else:
self.on_error(f"Claude error: {error_msg}")
except Exception as e:
logger.error(f"Error in on_error callback: {e}")
# Mark not busy
self._busy = False
# Call completion callback
if self.on_complete:
try:
if inspect.iscoroutinefunction(self.on_complete):
asyncio.create_task(self.on_complete())
else:
self.on_complete()
except Exception as e:
logger.error(f"Error in on_complete callback: {e}")
# Process queued messages
if not self._message_queue.empty():
next_message = await self._message_queue.get()
logger.debug(f"Processing queued message: {next_message[:50]}...")
await self.send_message(next_message)
elif event_type == "system":
# System event
subtype = event.get("subtype")
logger.debug(f"System event: subtype={subtype}")
# Check for error subtype
if subtype == "error" and self.on_error:
error_msg = event.get("message", "System error")
try:
if inspect.iscoroutinefunction(self.on_error):
asyncio.create_task(self.on_error(f"System error: {error_msg}"))
else:
self.on_error(f"System error: {error_msg}")
except Exception as e:
logger.error(f"Error in on_error callback: {e}")
else:
logger.debug(f"Unknown event type: {event_type}")
except json.JSONDecodeError:
# Non-JSON line (Claude Code may emit diagnostics)
logger.warning(f"Non-JSON stdout line: {line[:100]}")
def _handle_stderr_line(self, line: str) -> None:
"""
Handle stderr output from Claude Code.
Logs as warning. If line contains "error" (case-insensitive),
also calls on_error callback.
Args:
line: Single line of stderr output
"""
logger.warning(f"Claude Code stderr: {line}")
# If line contains error, notify via callback
if "error" in line.lower() and self.on_error:
try:
if inspect.iscoroutinefunction(self.on_error):
asyncio.create_task(self.on_error(f"Claude stderr: {line}"))
else:
self.on_error(f"Claude stderr: {line}")
except Exception as e:
logger.error(f"Error in on_error callback: {e}")
async def _handle_crash(self) -> None:
"""
Handle Claude Code crash (non-zero exit).
Attempts to restart persistent process with --continue flag up to
MAX_CRASH_RETRIES times. Notifies user via on_status callback.
"""
self._crash_count += 1
logger.error(
f"Claude Code crashed (attempt {self._crash_count}/{self.MAX_CRASH_RETRIES})"
)
if self._crash_count >= self.MAX_CRASH_RETRIES:
error_msg = f"Claude failed to restart after {self.MAX_CRASH_RETRIES} attempts"
logger.error(error_msg)
if self.on_error:
if inspect.iscoroutinefunction(self.on_error):
asyncio.create_task(self.on_error(error_msg))
else:
self.on_error(error_msg)
self._crash_count = 0 # Reset for next session
self._busy = False
return
# Notify user
if self.on_status:
try:
if inspect.iscoroutinefunction(self.on_status):
asyncio.create_task(self.on_status("Claude crashed, restarting with context preserved..."))
else:
self.on_status("Claude crashed, restarting with context preserved...")
except Exception as e:
logger.error(f"Error in on_status callback: {e}")
# Wait before retrying
await asyncio.sleep(self.CRASH_BACKOFF_SECONDS)
# Restart persistent process
try:
await self.start()
# Resend queued messages if any
if not self._message_queue.empty():
next_message = await self._message_queue.get()
logger.debug(f"Resending queued message after crash: {next_message[:50]}...")
await self.send_message(next_message)
except Exception as e:
logger.error(f"Failed to restart after crash: {e}")
if self.on_error:
self.on_error(f"Failed to restart Claude: {e}")
self._busy = False
async def terminate(self, timeout: int = 10) -> None:
"""
Terminate persistent subprocess gracefully.
Closes stdin, sends SIGTERM, waits for clean exit with timeout, then
SIGKILL if needed. Always reaps process to prevent zombies.
Args:
timeout: Seconds to wait for graceful termination before SIGKILL
"""
if not self._process or self._process.returncode is not None:
logger.debug("No process to terminate or already terminated")
return
logger.info(f"Terminating Claude Code process: PID={self._process.pid}")
try:
# Close stdin to signal end of input
if self._process.stdin:
self._process.stdin.close()
await self._process.stdin.wait_closed()
# Send SIGTERM
self._process.terminate()
# Wait for graceful exit with timeout
try:
await asyncio.wait_for(self._process.wait(), timeout=timeout)
logger.info("Claude Code terminated gracefully")
except asyncio.TimeoutError:
# Timeout - force kill
logger.warning(
f"Claude Code did not terminate within {timeout}s, sending SIGKILL"
)
self._process.kill()
await self._process.wait() # CRITICAL: Always reap to prevent zombie
logger.info("Claude Code killed")
except Exception as e:
logger.error(f"Error terminating process: {e}")
finally:
# Clear process reference
self._process = None
self._busy = False
# Cancel reader tasks if still running
for task in [self._stdout_reader_task, self._stderr_reader_task]:
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass