homelab/.planning/phases/01-session-process-foundation/01-02-PLAN.md
Mikkel Georgsen 0baaeb26b5 docs(01): create phase plan
Phase 01: Session & Process Foundation
- 3 plan(s) in 2 wave(s)
- 2 parallel (wave 1), 1 sequential (wave 2)
- Ready for execution

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 14:32:10 +00:00

11 KiB

phase plan type wave depends_on files_modified autonomous must_haves
01-session-process-foundation 02 execute 1
telegram/claude_subprocess.py
true
truths artifacts key_links
ClaudeSubprocess spawns Claude Code CLI in a given session directory using asyncio.create_subprocess_exec
Stdout and stderr are read concurrently via asyncio.gather -- no pipe deadlock occurs
Process termination uses terminate() + wait_for() with timeout fallback to kill() -- no zombies
Messages queued while Claude is processing are sent after current response completes
If Claude Code crashes, it auto-restarts with --continue flag and a notification callback fires
Stream-json output is parsed line-by-line, routing assistant/result/system events to callbacks
path provides min_lines contains
telegram/claude_subprocess.py Claude Code subprocess lifecycle management 120 class ClaudeSubprocess
from to via pattern
telegram/claude_subprocess.py claude CLI asyncio.create_subprocess_exec with PIPE create_subprocess_exec.*claude
from to via pattern
telegram/claude_subprocess.py asyncio.gather concurrent stdout/stderr reading asyncio.gather
from to via pattern
telegram/claude_subprocess.py process cleanup terminate + wait_for + kill fallback terminate.*wait_for|kill.*wait
Create the Claude Code subprocess engine that safely spawns, communicates with, and manages Claude Code CLI processes using asyncio.

Purpose: This module is the I/O bridge between session management and Claude Code. It handles the dangerous parts: pipe management without deadlocks, process lifecycle without zombies, message queueing during processing, and crash recovery with session resumption. The research (01-RESEARCH.md) has validated that pipes + stream-json is the correct approach over PTY.

Output: telegram/claude_subprocess.py module with ClaudeSubprocess class.

<execution_context> @/home/mikkel/.claude/get-shit-done/workflows/execute-plan.md @/home/mikkel/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-session-process-foundation/01-CONTEXT.md @.planning/phases/01-session-process-foundation/01-RESEARCH.md Task 1: Create ClaudeSubprocess module with spawn, I/O, and lifecycle management telegram/claude_subprocess.py Create `telegram/claude_subprocess.py` with a `ClaudeSubprocess` class that manages a single Claude Code CLI subprocess.

Class design:

class ClaudeSubprocess:
    def __init__(self, session_dir: Path, persona: dict = None,
                 on_output: Callable = None, on_error: Callable = None,
                 on_complete: Callable = None, on_status: Callable = None):
        """
        Args:
            session_dir: Path to session directory (cwd for subprocess)
            persona: Persona dict with system_prompt and settings
            on_output: Callback(text: str) for assistant text output
            on_error: Callback(error: str) for error messages
            on_complete: Callback() when a turn completes
            on_status: Callback(status: str) for status updates (e.g. "Claude restarted")
        """

Required methods:

  1. async send_message(message: str) -> None

    • If no process is running: spawn one with this message
    • If process IS running and BUSY: queue message (append to internal asyncio.Queue)
    • If process IS running and IDLE: send as new turn (spawn new claude -p invocation)
    • Track processing state (busy vs idle) to know when to queue
  2. async _spawn(message: str) -> None

    • Build Claude Code command:
      claude -p "<message>"
        --output-format stream-json
        --verbose
        --max-turns <from persona settings, default 25>
        --model <from persona settings, default claude-sonnet-4-20250514>
      
    • If persona has system_prompt, add: --system-prompt "<system_prompt>"
    • If .claude/ exists in session_dir (prior session): add --continue flag for history
    • Spawn with asyncio.create_subprocess_exec():
      • stdout=PIPE, stderr=PIPE
      • cwd=str(session_dir)
      • env: inherit current env, ensure PATH includes ~/bin and ~/.local/bin
    • Store process reference in self._process
    • Store PID for metadata updates
    • Set self._busy = True
    • Launch concurrent stream readers via asyncio.create_task(self._read_streams())
  3. async _read_streams() -> None

    • Use asyncio.gather() to read stdout and stderr concurrently (CRITICAL for deadlock prevention)
    • stdout handler: self._handle_stdout_line(line)
    • stderr handler: self._handle_stderr_line(line)
    • After both streams end: await self._process.wait()
    • Set self._busy = False
    • Call self.on_complete() callback
    • Process queued messages: if self._message_queue not empty, pop and await self.send_message(msg)
  4. _handle_stdout_line(line: str) -> None

    • Parse as JSON (stream-json format, one JSON object per line)
    • Route by event type:
      • "assistant": Extract text blocks from event["message"]["content"], call self.on_output(text) for each text block
      • "result": Turn complete. If event.get("is_error"), call self.on_error(...). Log session_id if present.
      • "system": Log system events. If subtype is error, call self.on_error(...).
    • On json.JSONDecodeError: log warning, skip line (Claude Code may emit non-JSON lines)
  5. _handle_stderr_line(line: str) -> None

    • Log as warning (stderr from Claude Code is usually diagnostics, not errors)
    • If line contains "error" (case-insensitive), also call self.on_error(line)
  6. async terminate(timeout: int = 10) -> None

    • If no process or already terminated (returncode is not None): return
    • Call self._process.terminate() (SIGTERM)
    • await asyncio.wait_for(self._process.wait(), timeout=timeout)
    • On TimeoutError: self._process.kill() then await self._process.wait() (CRITICAL: always reap)
    • Clear self._process reference
    • Set self._busy = False
  7. async _handle_crash() -> None

    • Called when process exits with non-zero return code unexpectedly
    • Call self.on_status("Claude crashed, restarting with context preserved...") if callback set
    • Wait 1 second (backoff)
    • Respawn with --continue flag (loads most recent session from .claude/ in session_dir)
    • If respawn fails 3 times: call self.on_error("Claude failed to restart after 3 attempts")
  8. @property is_busy -> bool

    • Return whether subprocess is currently processing a message
  9. @property is_alive -> bool

    • Return whether subprocess process is running (process exists and returncode is None)

Internal state:

  • self._process: asyncio.subprocess.Process | None
  • self._busy: bool
  • self._message_queue: asyncio.Queue
  • self._reader_task: asyncio.Task | None
  • self._crash_count: int (reset on successful completion)
  • self._session_dir: Path
  • self._persona: dict | None

Implementation notes:

  • Use asyncio.create_subprocess_exec (NOT shell=True -- avoid shell injection)
  • For the env, ensure PATH includes /home/mikkel/bin:/home/mikkel/.local/bin
  • Add logging with logging.getLogger(__name__)
  • Include type hints for all methods
  • Add module docstring explaining the subprocess interaction model
  • The _read_streams method must handle the case where stdout/stderr complete at different times
  • Use async for line in stream pattern or readline() loop for line-by-line reading

Read 01-RESEARCH.md for verified code patterns (Pattern 1: Concurrent Stream Reading, Pattern 3: Stream-JSON Event Handling, Pattern 4: Process Lifecycle Management).

DO NOT:

  • Import session_manager.py (that module manages metadata; this module manages processes)
  • Add Telegram-specific imports (that's Plan 03)
  • Implement idle timeout (that's Phase 3)
  • Use PTY (research confirms pipes are correct for Claude Code CLI)
cd ~/homelab
source ~/venv/bin/activate

# Verify module loads and class structure is correct
python3 -c "
import asyncio
from pathlib import Path
from telegram.claude_subprocess import ClaudeSubprocess

# Verify class exists and has required methods
sub = ClaudeSubprocess(
    session_dir=Path('/tmp/test-claude-session'),
    on_output=lambda text: print(f'OUTPUT: {text}'),
    on_error=lambda err: print(f'ERROR: {err}'),
    on_complete=lambda: print('COMPLETE'),
    on_status=lambda s: print(f'STATUS: {s}')
)

assert hasattr(sub, 'send_message'), 'missing send_message'
assert hasattr(sub, 'terminate'), 'missing terminate'
assert hasattr(sub, 'is_busy'), 'missing is_busy'
assert hasattr(sub, 'is_alive'), 'missing is_alive'
assert not sub.is_busy, 'should start not busy'
assert not sub.is_alive, 'should start not alive'

print('ClaudeSubprocess class structure verified!')
"

# Verify concurrent stream reading implementation exists
python3 -c "
import inspect
from telegram.claude_subprocess import ClaudeSubprocess
source = inspect.getsource(ClaudeSubprocess)
assert 'asyncio.gather' in source, 'Missing asyncio.gather for concurrent stream reading'
assert 'create_subprocess_exec' in source, 'Missing create_subprocess_exec'
assert 'stream-json' in source, 'Missing stream-json output format'
assert 'terminate' in source, 'Missing terminate method'
assert 'wait_for' in source or 'wait(' in source, 'Missing process wait'
print('Implementation patterns verified!')
"
ClaudeSubprocess class spawns Claude Code CLI with stream-json output in session directories, reads stdout/stderr concurrently via asyncio.gather, handles process lifecycle with clean termination (no zombies), queues messages during processing, and auto-restarts on crash with --continue flag. 1. `telegram/claude_subprocess.py` exists with `ClaudeSubprocess` class 2. Class uses `asyncio.create_subprocess_exec` (not shell=True) 3. Stdout and stderr reading uses `asyncio.gather` for concurrent draining 4. Process termination implements terminate -> wait_for -> kill -> wait pattern 5. Message queue uses `asyncio.Queue` for thread-safe queueing 6. Crash recovery attempts respawn with `--continue` flag, max 3 retries 7. Stream-json parsing handles assistant, result, and system event types 8. No imports from session_manager or telegram bot modules

<success_criteria>

  • ClaudeSubprocess module loads without import errors
  • Class has all required methods and properties
  • Implementation uses asyncio.gather for concurrent stream reading (verified via source inspection)
  • Process lifecycle follows terminate -> wait pattern (verified via source inspection)
  • Module is self-contained with callback-based communication (no tight coupling to session manager or bot) </success_criteria>
After completion, create `.planning/phases/01-session-process-foundation/01-02-SUMMARY.md`