docs(01): research phase domain

Phase 1: Session & Process Foundation
- Standard stack identified (asyncio, python-telegram-bot 22.5, Claude Code CLI)
- Architecture patterns documented (concurrent stream reading, session isolation)
- Pitfalls catalogued (pipe deadlock, zombie processes, blocking event loop)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Mikkel Georgsen 2026-02-04 14:25:26 +00:00
parent 786468c98b
commit a522a108ca

View file

@ -0,0 +1,565 @@
# Phase 1: Session & Process Foundation - Research
**Researched:** 2026-02-04
**Domain:** Python asyncio subprocess management, Claude Code CLI integration, Telegram bot architecture
**Confidence:** HIGH
## Summary
Phase 1 requires spawning and managing Claude Code CLI subprocesses from a Telegram bot written in Python using python-telegram-bot 22.5 and asyncio. The core technical challenge is safely managing subprocess I/O without deadlocks while handling concurrent Telegram messages.
Research confirms that asyncio provides robust subprocess management primitives, and Claude Code CLI's `--output-format stream-json` provides structured, parseable output ideal for subprocess consumption. The standard pattern is pipes with concurrent stream readers using `asyncio.gather()`, not PTY, as Claude Code doesn't require interactive terminal features for this use case.
Key findings: (1) Always use `communicate()` or concurrent stream readers to avoid pipe deadlocks, (2) Claude Code sessions are directory-based and persistent via `--resume`, (3) python-telegram-bot 22.5 handles async natively but requires careful handler design to avoid blocking, (4) Process cleanup must use `terminate()` + `wait()` to prevent zombie processes.
**Primary recommendation:** Use `asyncio.create_subprocess_exec()` with `PIPE` for stdout/stderr, concurrent `asyncio.gather()` for stream reading, and Claude Code's `--output-format stream-json --verbose` for structured output. Skip PTY complexity unless future phases need interactive features.
## Standard Stack
The established libraries/tools for this domain:
### Core
| Library | Version | Purpose | Why Standard |
|---------|---------|---------|--------------|
| python-telegram-bot | 22.5 | Telegram bot framework | Industry standard for Python Telegram bots, native async/await, comprehensive API coverage |
| asyncio | stdlib (3.12+) | Async subprocess management | Python's official async framework, subprocess primitives prevent deadlocks |
| Claude Code CLI | 2.1.31+ | AI agent subprocess | Official CLI with --resume, session persistence, stream-json output |
### Supporting
| Library | Version | Purpose | When to Use |
|---------|---------|---------|-------------|
| json | stdlib | Parse stream-json output | Every subprocess output line (NDJSON format) |
| pathlib | stdlib | Session directory management | File/directory operations for `~/telegram/sessions/` |
| typing | stdlib | Type hints for session metadata | Code clarity and IDE support |
### Alternatives Considered
| Instead of | Could Use | Tradeoff |
|------------|-----------|----------|
| asyncio.create_subprocess_exec | pty.spawn + asyncio | PTY adds complexity (terminal emulation, signal handling) without benefit for non-interactive CLI |
| python-telegram-bot | aiogram | aiogram is also async but has smaller ecosystem, PTB is more mature |
| Pipes | PTY (pseudo-terminal) | PTY needed only for programs requiring terminal features (color codes, cursor control) - Claude Code works fine with pipes |
**Installation:**
```bash
# Already installed on mgmt container
source ~/venv/bin/activate
pip show python-telegram-bot # Version: 22.5
which claude # /home/mikkel/.local/bin/claude
claude --version # 2.1.31 (Claude Code)
```
## Architecture Patterns
### Recommended Project Structure
```
telegram/
├── bot.py # Existing bot entry point
├── sessions/ # NEW: Session storage
│ ├── <name>/ # Per-session directory
│ │ ├── metadata.json # Session state (PID, timestamps, persona)
│ │ └── .claude/ # Claude Code session data (auto-created)
├── personas/ # NEW: Persona library
│ ├── brainstorm.json # Shared persona templates
│ ├── planner.json
│ └── research.json
├── session_manager.py # NEW: Session lifecycle management
└── claude_subprocess.py # NEW: Subprocess I/O handling
```
### Pattern 1: Concurrent Stream Reading (CRITICAL)
**What:** Read stdout and stderr concurrently using `asyncio.gather()` to prevent pipe buffer overflow
**When to use:** Every subprocess with `PIPE` for stdout/stderr
**Example:**
```python
# Source: https://docs.python.org/3/library/asyncio-subprocess.html
import asyncio
async def read_stream(stream, callback):
"""Read stream line by line, invoke callback for each line."""
while True:
line = await stream.readline()
if not line:
break
callback(line.decode().rstrip())
async def run_claude(session_dir, message):
proc = await asyncio.create_subprocess_exec(
'claude', '-p', message,
'--output-format', 'stream-json',
'--verbose',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=session_dir
)
# Concurrent reading prevents deadlock
await asyncio.gather(
read_stream(proc.stdout, handle_stdout),
read_stream(proc.stderr, handle_stderr)
)
await proc.wait()
```
### Pattern 2: Session Directory Isolation
**What:** Each session gets its own directory; Claude Code automatically manages session state
**When to use:** Every session creation/switch
**Example:**
```python
# Source: Phase context + Claude Code CLI reference
from pathlib import Path
import json
def create_session(name: str, persona: str = None):
"""Create new session with isolated directory."""
session_dir = Path.home() / 'telegram' / 'sessions' / name
session_dir.mkdir(parents=True, exist_ok=True)
metadata = {
'name': name,
'created': datetime.now().isoformat(),
'persona': persona,
'pid': None,
'status': 'idle'
}
# Write metadata
(session_dir / 'metadata.json').write_text(json.dumps(metadata, indent=2))
# Copy persona if specified
if persona:
persona_file = Path.home() / 'telegram' / 'personas' / f'{persona}.json'
if persona_file.exists():
(session_dir / 'persona.json').write_text(persona_file.read_text())
return session_dir
```
### Pattern 3: Stream-JSON Event Handling
**What:** Parse newline-delimited JSON events from Claude Code output
**When to use:** Processing subprocess output in real-time
**Example:**
```python
# Source: https://code.claude.com/docs/en/headless + stream-json research
import json
def handle_stdout(line: str):
"""Parse and route stream-json events."""
try:
event = json.loads(line)
event_type = event.get('type')
if event_type == 'assistant':
# Claude's response
content = event['message']['content']
for block in content:
if block['type'] == 'text':
send_to_telegram(block['text'])
elif event_type == 'result':
# Task complete
session_id = event['session_id']
update_session_state(session_id, 'idle')
elif event_type == 'system':
# System events (hooks, init)
pass
except json.JSONDecodeError:
logger.warning(f"Invalid JSON: {line}")
```
### Pattern 4: Process Lifecycle Management
**What:** Spawn on session switch, suspend (don't kill), rely on Phase 3 timeout for cleanup
**When to use:** Session switching, process termination
**Example:**
```python
# Source: Asyncio subprocess best practices + Phase context decisions
import asyncio
import signal
async def switch_session(new_session: str):
"""Switch to new session, suspend current process."""
current = get_active_session()
# Mark current as suspended (don't kill)
if current and current.proc:
current.status = 'suspended'
save_metadata(current)
# Process stays alive, Phase 3 timeout handles cleanup
# Activate new session
new = load_session(new_session)
if not new.proc or new.proc.returncode is not None:
# No process or dead - spawn new one
new.proc = await spawn_claude(new.session_dir)
set_active_session(new)
async def terminate_gracefully(proc, timeout=10):
"""Terminate subprocess with timeout, prevent zombies."""
# Source: Python asyncio subprocess best practices research
try:
proc.terminate() # Send SIGTERM
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill() # Force SIGKILL
await proc.wait() # CRITICAL: Always await to prevent zombies
```
### Pattern 5: Non-Blocking Telegram Handlers
**What:** Use `block=False` for handlers that spawn long-running tasks
**When to use:** Message handlers that interact with Claude Code subprocess
**Example:**
```python
# Source: https://github.com/python-telegram-bot/python-telegram-bot/wiki/Concurrency
from telegram.ext import Application, MessageHandler, filters
async def handle_message(update, context):
"""Handle incoming Telegram messages."""
session = get_active_session()
if not session:
await update.message.reply_text("No active session. Use /new <name>")
return
# Queue message to subprocess (non-blocking)
await session.send_message(update.message.text)
# Register with block=False for concurrency
app.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
handle_message,
block=False
))
```
### Anti-Patterns to Avoid
- **Direct stream reading without concurrency:** Calling `await proc.stdout.read()` then `await proc.stderr.read()` sequentially will deadlock if stderr fills up first
- **Using `wait()` with pipes:** `await proc.wait()` deadlocks if stdout/stderr buffers fill; always use `communicate()` or concurrent stream readers
- **Killing processes without cleanup:** `proc.kill()` without `await proc.wait()` creates zombie processes
- **PTY for non-interactive programs:** PTY adds signal handling complexity; Claude Code CLI works fine with pipes
## Don't Hand-Roll
Problems that look simple but have existing solutions:
| Problem | Don't Build | Use Instead | Why |
|---------|-------------|-------------|-----|
| Concurrent stream reading | Manual threading or sequential reads | `asyncio.gather()` with StreamReader | Prevents deadlocks, handles backpressure, battle-tested |
| JSON Lines parsing | Custom line-by-line JSON parser | `json.loads()` per line with try/except | Standard library is fast, handles edge cases |
| Session ID generation | Custom UUID logic | `uuid.uuid4()` from stdlib | Cryptographically secure, standard format |
| Process termination | Manual signal handling | `proc.terminate()` + `asyncio.wait_for(proc.wait())` | Handles timeout, cleanup, zombie prevention |
**Key insight:** Asyncio subprocess management has well-documented pitfalls (deadlocks, zombies). Use standard patterns from official docs rather than custom solutions.
## Common Pitfalls
### Pitfall 1: Pipe Deadlock from Sequential Reading
**What goes wrong:** Reading stdout then stderr sequentially causes deadlock if stderr fills buffer first
**Why it happens:** OS pipe buffers are finite (~64KB). If stderr fills while code waits on stdout, child process blocks writing, parent blocks reading - deadlock.
**How to avoid:** Always read stdout and stderr concurrently using `asyncio.gather()`
**Warning signs:** Subprocess hangs indefinitely, no output, high CPU usage from blocked I/O
```python
# WRONG - Sequential reading
stdout_data = await proc.stdout.read() # Blocks forever if stderr fills first
stderr_data = await proc.stderr.read()
# RIGHT - Concurrent reading
async def read_all(stream):
return await stream.read()
stdout_data, stderr_data = await asyncio.gather(
read_all(proc.stdout),
read_all(proc.stderr)
)
```
### Pitfall 2: Zombie Processes from Missing wait()
**What goes wrong:** Process terminates but stays in zombie state (shows as `<defunct>` in ps)
**Why it happens:** Parent must call `wait()` to let OS reclaim process resources. Forgetting this after `terminate()`/`kill()` leaves zombies.
**How to avoid:** ALWAYS `await proc.wait()` after termination, even after `kill()`
**Warning signs:** `ps aux` shows increasing number of `<defunct>` processes, eventual resource exhaustion
```python
# WRONG - Zombie process
proc.terminate()
# Process is now zombie - resources not reclaimed
# RIGHT - Clean termination
proc.terminate()
await proc.wait() # CRITICAL - reaps zombie
```
### Pitfall 3: Blocking Telegram Bot Event Loop
**What goes wrong:** Long-running subprocess operations freeze bot, no messages processed
**Why it happens:** Telegram handlers run on main event loop. Blocking operations (like `communicate()` on long-running process) block all handlers.
**How to avoid:** Use `block=False` in handler registration, or spawn background tasks with `asyncio.create_task()`
**Warning signs:** Bot becomes unresponsive during Claude Code processing, commands queue up
```python
# WRONG - Blocks event loop
async def handle_message(update, context):
stdout, stderr = await proc.communicate() # Blocks for minutes
await update.message.reply_text(stdout)
# RIGHT - Non-blocking handler
app.add_handler(MessageHandler(
filters.TEXT,
handle_message,
block=False # Runs as asyncio.Task
))
```
### Pitfall 4: Assuming Claude Code Session Isolation
**What goes wrong:** Spawning multiple Claude Code processes in same directory causes session conflicts
**Why it happens:** Claude Code manages session state in `.claude/` subdirectory. Multiple processes in same directory share session state, corrupting history.
**How to avoid:** Each session must have its own directory (`~/telegram/sessions/<name>/`). Change `cwd` parameter when spawning subprocess.
**Warning signs:** Session history mixed between conversations, `--resume` loads wrong context
```python
# WRONG - Shared directory
proc = await asyncio.create_subprocess_exec('claude', '-p', msg)
# RIGHT - Isolated directory per session
session_dir = Path.home() / 'telegram' / 'sessions' / session_name
proc = await asyncio.create_subprocess_exec(
'claude', '-p', msg,
cwd=str(session_dir)
)
```
### Pitfall 5: Ignoring stream-json Event Types
**What goes wrong:** Only handling 'assistant' events misses errors, tool confirmations, completion status
**Why it happens:** stream-json emits multiple event types (system, assistant, result). Parsing only one type loses critical information.
**How to avoid:** Handle all event types in stream parser, especially 'result' for completion status and 'system' for errors
**Warning signs:** Missing error notifications, unclear when Claude finishes processing, tool use not tracked
```python
# WRONG - Only handles assistant messages
if event['type'] == 'assistant':
send_to_telegram(event['message'])
# RIGHT - Handle all event types
if event['type'] == 'assistant':
send_to_telegram(event['message'])
elif event['type'] == 'result':
mark_session_complete(event)
elif event['type'] == 'system' and event.get('subtype') == 'error':
notify_user_error(event)
```
## Code Examples
Verified patterns from official sources:
### Creating and Managing Subprocess
```python
# Source: https://docs.python.org/3/library/asyncio-subprocess.html
import asyncio
from pathlib import Path
async def spawn_claude_subprocess(session_dir: Path, initial_message: str):
"""Spawn Claude Code subprocess for session."""
proc = await asyncio.create_subprocess_exec(
'claude',
'-p', initial_message,
'--output-format', 'stream-json',
'--verbose',
'--continue', # Resume session if exists
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(session_dir)
)
return proc
```
### Concurrent Stream Reading
```python
# Source: https://docs.python.org/3/library/asyncio-subprocess.html
async def read_stream(stream, callback):
"""Read stream line-by-line, invoke callback for each line."""
while True:
line = await stream.readline()
if not line:
break
callback(line.decode().rstrip())
async def run_with_stream_handlers(proc, stdout_handler, stderr_handler):
"""Run subprocess with concurrent stdout/stderr reading."""
await asyncio.gather(
read_stream(proc.stdout, stdout_handler),
read_stream(proc.stderr, stderr_handler),
proc.wait()
)
```
### Graceful Process Termination
```python
# Source: Python asyncio subprocess research (multiple sources)
import asyncio
async def terminate_process(proc, timeout: int = 10):
"""Terminate subprocess gracefully, prevent zombie."""
if proc.returncode is not None:
return # Already terminated
try:
proc.terminate() # Send SIGTERM
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill() # Force SIGKILL
await proc.wait() # CRITICAL: Always reap zombie
```
### Session Directory Management
```python
# Source: Phase context + research
from pathlib import Path
import json
from datetime import datetime
def create_session_directory(name: str, persona: str = None) -> Path:
"""Create isolated session directory with metadata."""
session_dir = Path.home() / 'telegram' / 'sessions' / name
session_dir.mkdir(parents=True, exist_ok=True)
metadata = {
'name': name,
'created': datetime.now().isoformat(),
'persona': persona,
'pid': None,
'status': 'idle',
'last_active': None
}
metadata_file = session_dir / 'metadata.json'
metadata_file.write_text(json.dumps(metadata, indent=2))
return session_dir
```
### Parsing stream-json Output
```python
# Source: https://code.claude.com/docs/en/headless
import json
import logging
logger = logging.getLogger(__name__)
def parse_stream_json_line(line: str):
"""Parse single line of stream-json output."""
try:
event = json.loads(line)
return event
except json.JSONDecodeError:
logger.warning(f"Invalid JSON line: {line}")
return None
async def handle_claude_output(stream, telegram_chat_id, bot):
"""Handle Claude Code stream-json output."""
while True:
line = await stream.readline()
if not line:
break
event = parse_stream_json_line(line.decode().rstrip())
if not event:
continue
event_type = event.get('type')
if event_type == 'assistant':
# Extract text from assistant message
content = event.get('message', {}).get('content', [])
for block in content:
if block.get('type') == 'text':
text = block.get('text', '')
await bot.send_message(chat_id=telegram_chat_id, text=text)
elif event_type == 'result':
# Task completion
if event.get('is_error'):
await bot.send_message(
chat_id=telegram_chat_id,
text="Claude encountered an error."
)
```
## State of the Art
| Old Approach | Current Approach | When Changed | Impact |
|--------------|------------------|--------------|--------|
| PTY for all subprocess interaction | Pipes with concurrent reading for non-interactive programs | Python 3.6+ asyncio maturity | Simpler code, fewer edge cases, better error handling |
| Sequential stdout/stderr reading | Concurrent `asyncio.gather()` | Python 3.5 async/await | Eliminates deadlocks from buffer overflow |
| Manual signal handling for termination | `terminate()` + `wait_for()` with timeout | Python 3.7+ | Graceful shutdown with fallback to SIGKILL |
| Thread-based Telegram bots | Async python-telegram-bot 20.0+ | v20.0 (2023) | Native async/await, better performance |
| File-based Claude interaction | Stream-json subprocess with live parsing | Claude Code 2.0+ (2024) | Real-time responses, lower latency |
**Deprecated/outdated:**
- **python-telegram-bot sync mode (< v20):** Deprecated, removed in v20. All new code must use async/await.
- **subprocess.PIPE without concurrent reading:** Known deadlock risk since Python 3.4, documented as anti-pattern
- **PTY for Claude Code:** Unnecessary; Claude Code designed for pipe interaction, handles non-TTY gracefully
## Open Questions
Things that couldn't be fully resolved:
1. **Claude Code auto-restart behavior with --resume**
- What we know: `--resume` loads session by ID, `--continue` loads most recent in directory
- What's unclear: If Claude Code crashes mid-response, can we auto-restart with `--continue` and it resumes cleanly? Or do we need to track message history ourselves?
- Recommendation: Test crash recovery behavior. Likely safe to use `--continue` in session directory after crash - Claude Code manages history in `.claude/` subdirectory.
2. **Optimal buffer limit for long-running sessions**
- What we know: `limit` parameter on `create_subprocess_exec()` controls StreamReader buffer size (default 64KB)
- What's unclear: Should we increase for Claude Code's potentially long responses? What's the memory tradeoff?
- Recommendation: Start with default (64KB). Monitor in Phase 4. Claude Code stream-json outputs line-by-line, so readline() should prevent buffer buildup.
3. **Handling concurrent messages during Claude processing**
- What we know: User might send multiple messages while Claude is responding
- What's unclear: Queue to subprocess stdin (if using `--input-format stream-json`)? Or wait for completion and send as new turn?
- Recommendation: Phase context says "queue messages, send after response completes." For Phase 1, buffer messages in Python and send as new `claude -p` invocation after previous completes. Phase 2+ might use `--input-format stream-json` for live piping.
4. **Session metadata beyond basics**
- What we know: Need name, PID, timestamps, persona at minimum
- What's unclear: Should we track message count, last message timestamp, token usage, Claude Code session ID?
- Recommendation: Keep it minimal for Phase 1. Metadata schema:
```json
{
"name": "session-name",
"created": "2026-02-04T14:20:00Z",
"last_active": "2026-02-04T15:30:00Z",
"persona": "brainstorm",
"pid": 12345,
"status": "active|suspended|idle"
}
```
Add fields in later phases as needed (token tracking in Phase 4, etc.)
## Sources
### Primary (HIGH confidence)
- [Python asyncio subprocess documentation](https://docs.python.org/3/library/asyncio-subprocess.html) - Official Python 3.14 docs
- [Claude Code CLI reference](https://code.claude.com/docs/en/cli-reference) - Official Anthropic documentation
- [Claude Code headless mode](https://code.claude.com/docs/en/headless) - Official programmatic usage guide
- [python-telegram-bot Concurrency wiki](https://github.com/python-telegram-bot/python-telegram-bot/wiki/Concurrency) - Official PTB documentation
### Secondary (MEDIUM confidence)
- [Super Fast Python - Asyncio Subprocess](https://superfastpython.com/asyncio-subprocess/) - Practical examples verified against official docs
- [Python asyncio subprocess termination best practices](https://www.slingacademy.com/article/python-asyncio-how-to-stop-kill-a-child-process/) - Community best practices, verified with official docs
- [Claude Code session management guide](https://stevekinney.com/courses/ai-development/claude-code-session-management) - Educational resource on Claude sessions
- [Stream-JSON chaining wiki](https://github.com/ruvnet/claude-flow/wiki/Stream-Chaining) - Community documentation on stream-json format
### Tertiary (LOW confidence)
- WebSearch results on asyncio best practices - Multiple sources, cross-referenced but not deeply verified
## Metadata
**Confidence breakdown:**
- Standard stack: HIGH - All libraries verified in use on mgmt container, versions confirmed
- Architecture: HIGH - Patterns sourced from official Python and Claude Code documentation
- Pitfalls: HIGH - Documented in Python subprocess docs, verified through official warnings
**Research date:** 2026-02-04
**Valid until:** 2026-03-04 (30 days - Python asyncio and Claude Code are stable, slow-moving APIs)