--- phase: 01-session-process-foundation plan: 02 type: execute wave: 1 depends_on: [] files_modified: - telegram/claude_subprocess.py autonomous: true must_haves: truths: - "ClaudeSubprocess spawns Claude Code CLI in a given session directory using asyncio.create_subprocess_exec" - "Stdout and stderr are read concurrently via asyncio.gather -- no pipe deadlock occurs" - "Process termination uses terminate() + wait_for() with timeout fallback to kill() -- no zombies" - "Messages queued while Claude is processing are sent after current response completes" - "If Claude Code crashes, it auto-restarts with --continue flag and a notification callback fires" - "Stream-json output is parsed line-by-line, routing assistant/result/system events to callbacks" artifacts: - path: "telegram/claude_subprocess.py" provides: "Claude Code subprocess lifecycle management" min_lines: 120 contains: "class ClaudeSubprocess" key_links: - from: "telegram/claude_subprocess.py" to: "claude CLI" via: "asyncio.create_subprocess_exec with PIPE" pattern: "create_subprocess_exec.*claude" - from: "telegram/claude_subprocess.py" to: "asyncio.gather" via: "concurrent stdout/stderr reading" pattern: "asyncio\\.gather" - from: "telegram/claude_subprocess.py" to: "process cleanup" via: "terminate + wait_for + kill fallback" pattern: "terminate.*wait_for|kill.*wait" --- Create the Claude Code subprocess engine that safely spawns, communicates with, and manages Claude Code CLI processes using asyncio. Purpose: This module is the I/O bridge between session management and Claude Code. It handles the dangerous parts: pipe management without deadlocks, process lifecycle without zombies, message queueing during processing, and crash recovery with session resumption. The research (01-RESEARCH.md) has validated that pipes + stream-json is the correct approach over PTY. Output: `telegram/claude_subprocess.py` module with `ClaudeSubprocess` class. @/home/mikkel/.claude/get-shit-done/workflows/execute-plan.md @/home/mikkel/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-session-process-foundation/01-CONTEXT.md @.planning/phases/01-session-process-foundation/01-RESEARCH.md Task 1: Create ClaudeSubprocess module with spawn, I/O, and lifecycle management telegram/claude_subprocess.py Create `telegram/claude_subprocess.py` with a `ClaudeSubprocess` class that manages a single Claude Code CLI subprocess. **Class design:** ```python class ClaudeSubprocess: def __init__(self, session_dir: Path, persona: dict = None, on_output: Callable = None, on_error: Callable = None, on_complete: Callable = None, on_status: Callable = None): """ Args: session_dir: Path to session directory (cwd for subprocess) persona: Persona dict with system_prompt and settings on_output: Callback(text: str) for assistant text output on_error: Callback(error: str) for error messages on_complete: Callback() when a turn completes on_status: Callback(status: str) for status updates (e.g. "Claude restarted") """ ``` **Required methods:** 1. `async send_message(message: str) -> None` - If no process is running: spawn one with this message - If process IS running and BUSY: queue message (append to internal asyncio.Queue) - If process IS running and IDLE: send as new turn (spawn new `claude -p` invocation) - Track processing state (busy vs idle) to know when to queue 2. `async _spawn(message: str) -> None` - Build Claude Code command: ``` claude -p "" --output-format stream-json --verbose --max-turns --model ``` - If persona has system_prompt, add: `--system-prompt ""` - If `.claude/` exists in session_dir (prior session): add `--continue` flag for history - Spawn with `asyncio.create_subprocess_exec()`: - stdout=PIPE, stderr=PIPE - cwd=str(session_dir) - env: inherit current env, ensure PATH includes ~/bin and ~/.local/bin - Store process reference in `self._process` - Store PID for metadata updates - Set `self._busy = True` - Launch concurrent stream readers via `asyncio.create_task(self._read_streams())` 3. `async _read_streams() -> None` - Use `asyncio.gather()` to read stdout and stderr concurrently (CRITICAL for deadlock prevention) - stdout handler: `self._handle_stdout_line(line)` - stderr handler: `self._handle_stderr_line(line)` - After both streams end: `await self._process.wait()` - Set `self._busy = False` - Call `self.on_complete()` callback - Process queued messages: if `self._message_queue` not empty, pop and `await self.send_message(msg)` 4. `_handle_stdout_line(line: str) -> None` - Parse as JSON (stream-json format, one JSON object per line) - Route by event type: - `"assistant"`: Extract text blocks from `event["message"]["content"]`, call `self.on_output(text)` for each text block - `"result"`: Turn complete. If `event.get("is_error")`, call `self.on_error(...)`. Log session_id if present. - `"system"`: Log system events. If subtype is error, call `self.on_error(...)`. - On `json.JSONDecodeError`: log warning, skip line (Claude Code may emit non-JSON lines) 5. `_handle_stderr_line(line: str) -> None` - Log as warning (stderr from Claude Code is usually diagnostics, not errors) - If line contains "error" (case-insensitive), also call `self.on_error(line)` 6. `async terminate(timeout: int = 10) -> None` - If no process or already terminated (`returncode is not None`): return - Call `self._process.terminate()` (SIGTERM) - `await asyncio.wait_for(self._process.wait(), timeout=timeout)` - On TimeoutError: `self._process.kill()` then `await self._process.wait()` (CRITICAL: always reap) - Clear `self._process` reference - Set `self._busy = False` 7. `async _handle_crash() -> None` - Called when process exits with non-zero return code unexpectedly - Call `self.on_status("Claude crashed, restarting with context preserved...")` if callback set - Wait 1 second (backoff) - Respawn with `--continue` flag (loads most recent session from .claude/ in session_dir) - If respawn fails 3 times: call `self.on_error("Claude failed to restart after 3 attempts")` 8. `@property is_busy -> bool` - Return whether subprocess is currently processing a message 9. `@property is_alive -> bool` - Return whether subprocess process is running (process exists and returncode is None) **Internal state:** - `self._process: asyncio.subprocess.Process | None` - `self._busy: bool` - `self._message_queue: asyncio.Queue` - `self._reader_task: asyncio.Task | None` - `self._crash_count: int` (reset on successful completion) - `self._session_dir: Path` - `self._persona: dict | None` **Implementation notes:** - Use `asyncio.create_subprocess_exec` (NOT `shell=True` -- avoid shell injection) - For the env, ensure PATH includes `/home/mikkel/bin:/home/mikkel/.local/bin` - Add `logging` with `logging.getLogger(__name__)` - Include type hints for all methods - Add module docstring explaining the subprocess interaction model - The `_read_streams` method must handle the case where stdout/stderr complete at different times - Use `async for line in stream` pattern or `readline()` loop for line-by-line reading **Read 01-RESEARCH.md** for verified code patterns (Pattern 1: Concurrent Stream Reading, Pattern 3: Stream-JSON Event Handling, Pattern 4: Process Lifecycle Management). **DO NOT:** - Import session_manager.py (that module manages metadata; this module manages processes) - Add Telegram-specific imports (that's Plan 03) - Implement idle timeout (that's Phase 3) - Use PTY (research confirms pipes are correct for Claude Code CLI) ```bash cd ~/homelab source ~/venv/bin/activate # Verify module loads and class structure is correct python3 -c " import asyncio from pathlib import Path from telegram.claude_subprocess import ClaudeSubprocess # Verify class exists and has required methods sub = ClaudeSubprocess( session_dir=Path('/tmp/test-claude-session'), on_output=lambda text: print(f'OUTPUT: {text}'), on_error=lambda err: print(f'ERROR: {err}'), on_complete=lambda: print('COMPLETE'), on_status=lambda s: print(f'STATUS: {s}') ) assert hasattr(sub, 'send_message'), 'missing send_message' assert hasattr(sub, 'terminate'), 'missing terminate' assert hasattr(sub, 'is_busy'), 'missing is_busy' assert hasattr(sub, 'is_alive'), 'missing is_alive' assert not sub.is_busy, 'should start not busy' assert not sub.is_alive, 'should start not alive' print('ClaudeSubprocess class structure verified!') " # Verify concurrent stream reading implementation exists python3 -c " import inspect from telegram.claude_subprocess import ClaudeSubprocess source = inspect.getsource(ClaudeSubprocess) assert 'asyncio.gather' in source, 'Missing asyncio.gather for concurrent stream reading' assert 'create_subprocess_exec' in source, 'Missing create_subprocess_exec' assert 'stream-json' in source, 'Missing stream-json output format' assert 'terminate' in source, 'Missing terminate method' assert 'wait_for' in source or 'wait(' in source, 'Missing process wait' print('Implementation patterns verified!') " ``` ClaudeSubprocess class spawns Claude Code CLI with stream-json output in session directories, reads stdout/stderr concurrently via asyncio.gather, handles process lifecycle with clean termination (no zombies), queues messages during processing, and auto-restarts on crash with --continue flag. 1. `telegram/claude_subprocess.py` exists with `ClaudeSubprocess` class 2. Class uses `asyncio.create_subprocess_exec` (not shell=True) 3. Stdout and stderr reading uses `asyncio.gather` for concurrent draining 4. Process termination implements terminate -> wait_for -> kill -> wait pattern 5. Message queue uses `asyncio.Queue` for thread-safe queueing 6. Crash recovery attempts respawn with `--continue` flag, max 3 retries 7. Stream-json parsing handles assistant, result, and system event types 8. No imports from session_manager or telegram bot modules - ClaudeSubprocess module loads without import errors - Class has all required methods and properties - Implementation uses asyncio.gather for concurrent stream reading (verified via source inspection) - Process lifecycle follows terminate -> wait pattern (verified via source inspection) - Module is self-contained with callback-based communication (no tight coupling to session manager or bot) After completion, create `.planning/phases/01-session-process-foundation/01-02-SUMMARY.md`