Phase 01: Session & Process Foundation - 3 plan(s) in 2 wave(s) - 2 parallel (wave 1), 1 sequential (wave 2) - Ready for execution Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
252 lines
11 KiB
Markdown
252 lines
11 KiB
Markdown
---
|
|
phase: 01-session-process-foundation
|
|
plan: 02
|
|
type: execute
|
|
wave: 1
|
|
depends_on: []
|
|
files_modified:
|
|
- telegram/claude_subprocess.py
|
|
autonomous: true
|
|
|
|
must_haves:
|
|
truths:
|
|
- "ClaudeSubprocess spawns Claude Code CLI in a given session directory using asyncio.create_subprocess_exec"
|
|
- "Stdout and stderr are read concurrently via asyncio.gather -- no pipe deadlock occurs"
|
|
- "Process termination uses terminate() + wait_for() with timeout fallback to kill() -- no zombies"
|
|
- "Messages queued while Claude is processing are sent after current response completes"
|
|
- "If Claude Code crashes, it auto-restarts with --continue flag and a notification callback fires"
|
|
- "Stream-json output is parsed line-by-line, routing assistant/result/system events to callbacks"
|
|
artifacts:
|
|
- path: "telegram/claude_subprocess.py"
|
|
provides: "Claude Code subprocess lifecycle management"
|
|
min_lines: 120
|
|
contains: "class ClaudeSubprocess"
|
|
key_links:
|
|
- from: "telegram/claude_subprocess.py"
|
|
to: "claude CLI"
|
|
via: "asyncio.create_subprocess_exec with PIPE"
|
|
pattern: "create_subprocess_exec.*claude"
|
|
- from: "telegram/claude_subprocess.py"
|
|
to: "asyncio.gather"
|
|
via: "concurrent stdout/stderr reading"
|
|
pattern: "asyncio\\.gather"
|
|
- from: "telegram/claude_subprocess.py"
|
|
to: "process cleanup"
|
|
via: "terminate + wait_for + kill fallback"
|
|
pattern: "terminate.*wait_for|kill.*wait"
|
|
---
|
|
|
|
<objective>
|
|
Create the Claude Code subprocess engine that safely spawns, communicates with, and manages Claude Code CLI processes using asyncio.
|
|
|
|
Purpose: This module is the I/O bridge between session management and Claude Code. It handles the dangerous parts: pipe management without deadlocks, process lifecycle without zombies, message queueing during processing, and crash recovery with session resumption. The research (01-RESEARCH.md) has validated that pipes + stream-json is the correct approach over PTY.
|
|
|
|
Output: `telegram/claude_subprocess.py` module with `ClaudeSubprocess` class.
|
|
</objective>
|
|
|
|
<execution_context>
|
|
@/home/mikkel/.claude/get-shit-done/workflows/execute-plan.md
|
|
@/home/mikkel/.claude/get-shit-done/templates/summary.md
|
|
</execution_context>
|
|
|
|
<context>
|
|
@.planning/PROJECT.md
|
|
@.planning/ROADMAP.md
|
|
@.planning/STATE.md
|
|
@.planning/phases/01-session-process-foundation/01-CONTEXT.md
|
|
@.planning/phases/01-session-process-foundation/01-RESEARCH.md
|
|
</context>
|
|
|
|
<tasks>
|
|
|
|
<task type="auto">
|
|
<name>Task 1: Create ClaudeSubprocess module with spawn, I/O, and lifecycle management</name>
|
|
<files>telegram/claude_subprocess.py</files>
|
|
<action>
|
|
Create `telegram/claude_subprocess.py` with a `ClaudeSubprocess` class that manages a single Claude Code CLI subprocess.
|
|
|
|
**Class design:**
|
|
|
|
```python
|
|
class ClaudeSubprocess:
|
|
def __init__(self, session_dir: Path, persona: dict = None,
|
|
on_output: Callable = None, on_error: Callable = None,
|
|
on_complete: Callable = None, on_status: Callable = None):
|
|
"""
|
|
Args:
|
|
session_dir: Path to session directory (cwd for subprocess)
|
|
persona: Persona dict with system_prompt and settings
|
|
on_output: Callback(text: str) for assistant text output
|
|
on_error: Callback(error: str) for error messages
|
|
on_complete: Callback() when a turn completes
|
|
on_status: Callback(status: str) for status updates (e.g. "Claude restarted")
|
|
"""
|
|
```
|
|
|
|
**Required methods:**
|
|
|
|
1. `async send_message(message: str) -> None`
|
|
- If no process is running: spawn one with this message
|
|
- If process IS running and BUSY: queue message (append to internal asyncio.Queue)
|
|
- If process IS running and IDLE: send as new turn (spawn new `claude -p` invocation)
|
|
- Track processing state (busy vs idle) to know when to queue
|
|
|
|
2. `async _spawn(message: str) -> None`
|
|
- Build Claude Code command:
|
|
```
|
|
claude -p "<message>"
|
|
--output-format stream-json
|
|
--verbose
|
|
--max-turns <from persona settings, default 25>
|
|
--model <from persona settings, default claude-sonnet-4-20250514>
|
|
```
|
|
- If persona has system_prompt, add: `--system-prompt "<system_prompt>"`
|
|
- If `.claude/` exists in session_dir (prior session): add `--continue` flag for history
|
|
- Spawn with `asyncio.create_subprocess_exec()`:
|
|
- stdout=PIPE, stderr=PIPE
|
|
- cwd=str(session_dir)
|
|
- env: inherit current env, ensure PATH includes ~/bin and ~/.local/bin
|
|
- Store process reference in `self._process`
|
|
- Store PID for metadata updates
|
|
- Set `self._busy = True`
|
|
- Launch concurrent stream readers via `asyncio.create_task(self._read_streams())`
|
|
|
|
3. `async _read_streams() -> None`
|
|
- Use `asyncio.gather()` to read stdout and stderr concurrently (CRITICAL for deadlock prevention)
|
|
- stdout handler: `self._handle_stdout_line(line)`
|
|
- stderr handler: `self._handle_stderr_line(line)`
|
|
- After both streams end: `await self._process.wait()`
|
|
- Set `self._busy = False`
|
|
- Call `self.on_complete()` callback
|
|
- Process queued messages: if `self._message_queue` not empty, pop and `await self.send_message(msg)`
|
|
|
|
4. `_handle_stdout_line(line: str) -> None`
|
|
- Parse as JSON (stream-json format, one JSON object per line)
|
|
- Route by event type:
|
|
- `"assistant"`: Extract text blocks from `event["message"]["content"]`, call `self.on_output(text)` for each text block
|
|
- `"result"`: Turn complete. If `event.get("is_error")`, call `self.on_error(...)`. Log session_id if present.
|
|
- `"system"`: Log system events. If subtype is error, call `self.on_error(...)`.
|
|
- On `json.JSONDecodeError`: log warning, skip line (Claude Code may emit non-JSON lines)
|
|
|
|
5. `_handle_stderr_line(line: str) -> None`
|
|
- Log as warning (stderr from Claude Code is usually diagnostics, not errors)
|
|
- If line contains "error" (case-insensitive), also call `self.on_error(line)`
|
|
|
|
6. `async terminate(timeout: int = 10) -> None`
|
|
- If no process or already terminated (`returncode is not None`): return
|
|
- Call `self._process.terminate()` (SIGTERM)
|
|
- `await asyncio.wait_for(self._process.wait(), timeout=timeout)`
|
|
- On TimeoutError: `self._process.kill()` then `await self._process.wait()` (CRITICAL: always reap)
|
|
- Clear `self._process` reference
|
|
- Set `self._busy = False`
|
|
|
|
7. `async _handle_crash() -> None`
|
|
- Called when process exits with non-zero return code unexpectedly
|
|
- Call `self.on_status("Claude crashed, restarting with context preserved...")` if callback set
|
|
- Wait 1 second (backoff)
|
|
- Respawn with `--continue` flag (loads most recent session from .claude/ in session_dir)
|
|
- If respawn fails 3 times: call `self.on_error("Claude failed to restart after 3 attempts")`
|
|
|
|
8. `@property is_busy -> bool`
|
|
- Return whether subprocess is currently processing a message
|
|
|
|
9. `@property is_alive -> bool`
|
|
- Return whether subprocess process is running (process exists and returncode is None)
|
|
|
|
**Internal state:**
|
|
- `self._process: asyncio.subprocess.Process | None`
|
|
- `self._busy: bool`
|
|
- `self._message_queue: asyncio.Queue`
|
|
- `self._reader_task: asyncio.Task | None`
|
|
- `self._crash_count: int` (reset on successful completion)
|
|
- `self._session_dir: Path`
|
|
- `self._persona: dict | None`
|
|
|
|
**Implementation notes:**
|
|
- Use `asyncio.create_subprocess_exec` (NOT `shell=True` -- avoid shell injection)
|
|
- For the env, ensure PATH includes `/home/mikkel/bin:/home/mikkel/.local/bin`
|
|
- Add `logging` with `logging.getLogger(__name__)`
|
|
- Include type hints for all methods
|
|
- Add module docstring explaining the subprocess interaction model
|
|
- The `_read_streams` method must handle the case where stdout/stderr complete at different times
|
|
- Use `async for line in stream` pattern or `readline()` loop for line-by-line reading
|
|
|
|
**Read 01-RESEARCH.md** for verified code patterns (Pattern 1: Concurrent Stream Reading, Pattern 3: Stream-JSON Event Handling, Pattern 4: Process Lifecycle Management).
|
|
|
|
**DO NOT:**
|
|
- Import session_manager.py (that module manages metadata; this module manages processes)
|
|
- Add Telegram-specific imports (that's Plan 03)
|
|
- Implement idle timeout (that's Phase 3)
|
|
- Use PTY (research confirms pipes are correct for Claude Code CLI)
|
|
</action>
|
|
<verify>
|
|
```bash
|
|
cd ~/homelab
|
|
source ~/venv/bin/activate
|
|
|
|
# Verify module loads and class structure is correct
|
|
python3 -c "
|
|
import asyncio
|
|
from pathlib import Path
|
|
from telegram.claude_subprocess import ClaudeSubprocess
|
|
|
|
# Verify class exists and has required methods
|
|
sub = ClaudeSubprocess(
|
|
session_dir=Path('/tmp/test-claude-session'),
|
|
on_output=lambda text: print(f'OUTPUT: {text}'),
|
|
on_error=lambda err: print(f'ERROR: {err}'),
|
|
on_complete=lambda: print('COMPLETE'),
|
|
on_status=lambda s: print(f'STATUS: {s}')
|
|
)
|
|
|
|
assert hasattr(sub, 'send_message'), 'missing send_message'
|
|
assert hasattr(sub, 'terminate'), 'missing terminate'
|
|
assert hasattr(sub, 'is_busy'), 'missing is_busy'
|
|
assert hasattr(sub, 'is_alive'), 'missing is_alive'
|
|
assert not sub.is_busy, 'should start not busy'
|
|
assert not sub.is_alive, 'should start not alive'
|
|
|
|
print('ClaudeSubprocess class structure verified!')
|
|
"
|
|
|
|
# Verify concurrent stream reading implementation exists
|
|
python3 -c "
|
|
import inspect
|
|
from telegram.claude_subprocess import ClaudeSubprocess
|
|
source = inspect.getsource(ClaudeSubprocess)
|
|
assert 'asyncio.gather' in source, 'Missing asyncio.gather for concurrent stream reading'
|
|
assert 'create_subprocess_exec' in source, 'Missing create_subprocess_exec'
|
|
assert 'stream-json' in source, 'Missing stream-json output format'
|
|
assert 'terminate' in source, 'Missing terminate method'
|
|
assert 'wait_for' in source or 'wait(' in source, 'Missing process wait'
|
|
print('Implementation patterns verified!')
|
|
"
|
|
```
|
|
</verify>
|
|
<done>ClaudeSubprocess class spawns Claude Code CLI with stream-json output in session directories, reads stdout/stderr concurrently via asyncio.gather, handles process lifecycle with clean termination (no zombies), queues messages during processing, and auto-restarts on crash with --continue flag.</done>
|
|
</task>
|
|
|
|
</tasks>
|
|
|
|
<verification>
|
|
1. `telegram/claude_subprocess.py` exists with `ClaudeSubprocess` class
|
|
2. Class uses `asyncio.create_subprocess_exec` (not shell=True)
|
|
3. Stdout and stderr reading uses `asyncio.gather` for concurrent draining
|
|
4. Process termination implements terminate -> wait_for -> kill -> wait pattern
|
|
5. Message queue uses `asyncio.Queue` for thread-safe queueing
|
|
6. Crash recovery attempts respawn with `--continue` flag, max 3 retries
|
|
7. Stream-json parsing handles assistant, result, and system event types
|
|
8. No imports from session_manager or telegram bot modules
|
|
</verification>
|
|
|
|
<success_criteria>
|
|
- ClaudeSubprocess module loads without import errors
|
|
- Class has all required methods and properties
|
|
- Implementation uses asyncio.gather for concurrent stream reading (verified via source inspection)
|
|
- Process lifecycle follows terminate -> wait pattern (verified via source inspection)
|
|
- Module is self-contained with callback-based communication (no tight coupling to session manager or bot)
|
|
</success_criteria>
|
|
|
|
<output>
|
|
After completion, create `.planning/phases/01-session-process-foundation/01-02-SUMMARY.md`
|
|
</output>
|