Phase 1: Session & Process Foundation - Standard stack identified (asyncio, python-telegram-bot 22.5, Claude Code CLI) - Architecture patterns documented (concurrent stream reading, session isolation) - Pitfalls catalogued (pipe deadlock, zombie processes, blocking event loop) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
24 KiB
Phase 1: Session & Process Foundation - Research
Researched: 2026-02-04 Domain: Python asyncio subprocess management, Claude Code CLI integration, Telegram bot architecture Confidence: HIGH
Summary
Phase 1 requires spawning and managing Claude Code CLI subprocesses from a Telegram bot written in Python using python-telegram-bot 22.5 and asyncio. The core technical challenge is safely managing subprocess I/O without deadlocks while handling concurrent Telegram messages.
Research confirms that asyncio provides robust subprocess management primitives, and Claude Code CLI's --output-format stream-json provides structured, parseable output ideal for subprocess consumption. The standard pattern is pipes with concurrent stream readers using asyncio.gather(), not PTY, as Claude Code doesn't require interactive terminal features for this use case.
Key findings: (1) Always use communicate() or concurrent stream readers to avoid pipe deadlocks, (2) Claude Code sessions are directory-based and persistent via --resume, (3) python-telegram-bot 22.5 handles async natively but requires careful handler design to avoid blocking, (4) Process cleanup must use terminate() + wait() to prevent zombie processes.
Primary recommendation: Use asyncio.create_subprocess_exec() with PIPE for stdout/stderr, concurrent asyncio.gather() for stream reading, and Claude Code's --output-format stream-json --verbose for structured output. Skip PTY complexity unless future phases need interactive features.
Standard Stack
The established libraries/tools for this domain:
Core
| Library | Version | Purpose | Why Standard |
|---|---|---|---|
| python-telegram-bot | 22.5 | Telegram bot framework | Industry standard for Python Telegram bots, native async/await, comprehensive API coverage |
| asyncio | stdlib (3.12+) | Async subprocess management | Python's official async framework, subprocess primitives prevent deadlocks |
| Claude Code CLI | 2.1.31+ | AI agent subprocess | Official CLI with --resume, session persistence, stream-json output |
Supporting
| Library | Version | Purpose | When to Use |
|---|---|---|---|
| json | stdlib | Parse stream-json output | Every subprocess output line (NDJSON format) |
| pathlib | stdlib | Session directory management | File/directory operations for ~/telegram/sessions/ |
| typing | stdlib | Type hints for session metadata | Code clarity and IDE support |
Alternatives Considered
| Instead of | Could Use | Tradeoff |
|---|---|---|
| asyncio.create_subprocess_exec | pty.spawn + asyncio | PTY adds complexity (terminal emulation, signal handling) without benefit for non-interactive CLI |
| python-telegram-bot | aiogram | aiogram is also async but has smaller ecosystem, PTB is more mature |
| Pipes | PTY (pseudo-terminal) | PTY needed only for programs requiring terminal features (color codes, cursor control) - Claude Code works fine with pipes |
Installation:
# Already installed on mgmt container
source ~/venv/bin/activate
pip show python-telegram-bot # Version: 22.5
which claude # /home/mikkel/.local/bin/claude
claude --version # 2.1.31 (Claude Code)
Architecture Patterns
Recommended Project Structure
telegram/
├── bot.py # Existing bot entry point
├── sessions/ # NEW: Session storage
│ ├── <name>/ # Per-session directory
│ │ ├── metadata.json # Session state (PID, timestamps, persona)
│ │ └── .claude/ # Claude Code session data (auto-created)
├── personas/ # NEW: Persona library
│ ├── brainstorm.json # Shared persona templates
│ ├── planner.json
│ └── research.json
├── session_manager.py # NEW: Session lifecycle management
└── claude_subprocess.py # NEW: Subprocess I/O handling
Pattern 1: Concurrent Stream Reading (CRITICAL)
What: Read stdout and stderr concurrently using asyncio.gather() to prevent pipe buffer overflow
When to use: Every subprocess with PIPE for stdout/stderr
Example:
# Source: https://docs.python.org/3/library/asyncio-subprocess.html
import asyncio
async def read_stream(stream, callback):
"""Read stream line by line, invoke callback for each line."""
while True:
line = await stream.readline()
if not line:
break
callback(line.decode().rstrip())
async def run_claude(session_dir, message):
proc = await asyncio.create_subprocess_exec(
'claude', '-p', message,
'--output-format', 'stream-json',
'--verbose',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=session_dir
)
# Concurrent reading prevents deadlock
await asyncio.gather(
read_stream(proc.stdout, handle_stdout),
read_stream(proc.stderr, handle_stderr)
)
await proc.wait()
Pattern 2: Session Directory Isolation
What: Each session gets its own directory; Claude Code automatically manages session state When to use: Every session creation/switch Example:
# Source: Phase context + Claude Code CLI reference
from pathlib import Path
import json
def create_session(name: str, persona: str = None):
"""Create new session with isolated directory."""
session_dir = Path.home() / 'telegram' / 'sessions' / name
session_dir.mkdir(parents=True, exist_ok=True)
metadata = {
'name': name,
'created': datetime.now().isoformat(),
'persona': persona,
'pid': None,
'status': 'idle'
}
# Write metadata
(session_dir / 'metadata.json').write_text(json.dumps(metadata, indent=2))
# Copy persona if specified
if persona:
persona_file = Path.home() / 'telegram' / 'personas' / f'{persona}.json'
if persona_file.exists():
(session_dir / 'persona.json').write_text(persona_file.read_text())
return session_dir
Pattern 3: Stream-JSON Event Handling
What: Parse newline-delimited JSON events from Claude Code output When to use: Processing subprocess output in real-time Example:
# Source: https://code.claude.com/docs/en/headless + stream-json research
import json
def handle_stdout(line: str):
"""Parse and route stream-json events."""
try:
event = json.loads(line)
event_type = event.get('type')
if event_type == 'assistant':
# Claude's response
content = event['message']['content']
for block in content:
if block['type'] == 'text':
send_to_telegram(block['text'])
elif event_type == 'result':
# Task complete
session_id = event['session_id']
update_session_state(session_id, 'idle')
elif event_type == 'system':
# System events (hooks, init)
pass
except json.JSONDecodeError:
logger.warning(f"Invalid JSON: {line}")
Pattern 4: Process Lifecycle Management
What: Spawn on session switch, suspend (don't kill), rely on Phase 3 timeout for cleanup When to use: Session switching, process termination Example:
# Source: Asyncio subprocess best practices + Phase context decisions
import asyncio
import signal
async def switch_session(new_session: str):
"""Switch to new session, suspend current process."""
current = get_active_session()
# Mark current as suspended (don't kill)
if current and current.proc:
current.status = 'suspended'
save_metadata(current)
# Process stays alive, Phase 3 timeout handles cleanup
# Activate new session
new = load_session(new_session)
if not new.proc or new.proc.returncode is not None:
# No process or dead - spawn new one
new.proc = await spawn_claude(new.session_dir)
set_active_session(new)
async def terminate_gracefully(proc, timeout=10):
"""Terminate subprocess with timeout, prevent zombies."""
# Source: Python asyncio subprocess best practices research
try:
proc.terminate() # Send SIGTERM
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill() # Force SIGKILL
await proc.wait() # CRITICAL: Always await to prevent zombies
Pattern 5: Non-Blocking Telegram Handlers
What: Use block=False for handlers that spawn long-running tasks
When to use: Message handlers that interact with Claude Code subprocess
Example:
# Source: https://github.com/python-telegram-bot/python-telegram-bot/wiki/Concurrency
from telegram.ext import Application, MessageHandler, filters
async def handle_message(update, context):
"""Handle incoming Telegram messages."""
session = get_active_session()
if not session:
await update.message.reply_text("No active session. Use /new <name>")
return
# Queue message to subprocess (non-blocking)
await session.send_message(update.message.text)
# Register with block=False for concurrency
app.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
handle_message,
block=False
))
Anti-Patterns to Avoid
- Direct stream reading without concurrency: Calling
await proc.stdout.read()thenawait proc.stderr.read()sequentially will deadlock if stderr fills up first - Using
wait()with pipes:await proc.wait()deadlocks if stdout/stderr buffers fill; always usecommunicate()or concurrent stream readers - Killing processes without cleanup:
proc.kill()withoutawait proc.wait()creates zombie processes - PTY for non-interactive programs: PTY adds signal handling complexity; Claude Code CLI works fine with pipes
Don't Hand-Roll
Problems that look simple but have existing solutions:
| Problem | Don't Build | Use Instead | Why |
|---|---|---|---|
| Concurrent stream reading | Manual threading or sequential reads | asyncio.gather() with StreamReader |
Prevents deadlocks, handles backpressure, battle-tested |
| JSON Lines parsing | Custom line-by-line JSON parser | json.loads() per line with try/except |
Standard library is fast, handles edge cases |
| Session ID generation | Custom UUID logic | uuid.uuid4() from stdlib |
Cryptographically secure, standard format |
| Process termination | Manual signal handling | proc.terminate() + asyncio.wait_for(proc.wait()) |
Handles timeout, cleanup, zombie prevention |
Key insight: Asyncio subprocess management has well-documented pitfalls (deadlocks, zombies). Use standard patterns from official docs rather than custom solutions.
Common Pitfalls
Pitfall 1: Pipe Deadlock from Sequential Reading
What goes wrong: Reading stdout then stderr sequentially causes deadlock if stderr fills buffer first
Why it happens: OS pipe buffers are finite (~64KB). If stderr fills while code waits on stdout, child process blocks writing, parent blocks reading - deadlock.
How to avoid: Always read stdout and stderr concurrently using asyncio.gather()
Warning signs: Subprocess hangs indefinitely, no output, high CPU usage from blocked I/O
# WRONG - Sequential reading
stdout_data = await proc.stdout.read() # Blocks forever if stderr fills first
stderr_data = await proc.stderr.read()
# RIGHT - Concurrent reading
async def read_all(stream):
return await stream.read()
stdout_data, stderr_data = await asyncio.gather(
read_all(proc.stdout),
read_all(proc.stderr)
)
Pitfall 2: Zombie Processes from Missing wait()
What goes wrong: Process terminates but stays in zombie state (shows as <defunct> in ps)
Why it happens: Parent must call wait() to let OS reclaim process resources. Forgetting this after terminate()/kill() leaves zombies.
How to avoid: ALWAYS await proc.wait() after termination, even after kill()
Warning signs: ps aux shows increasing number of <defunct> processes, eventual resource exhaustion
# WRONG - Zombie process
proc.terminate()
# Process is now zombie - resources not reclaimed
# RIGHT - Clean termination
proc.terminate()
await proc.wait() # CRITICAL - reaps zombie
Pitfall 3: Blocking Telegram Bot Event Loop
What goes wrong: Long-running subprocess operations freeze bot, no messages processed
Why it happens: Telegram handlers run on main event loop. Blocking operations (like communicate() on long-running process) block all handlers.
How to avoid: Use block=False in handler registration, or spawn background tasks with asyncio.create_task()
Warning signs: Bot becomes unresponsive during Claude Code processing, commands queue up
# WRONG - Blocks event loop
async def handle_message(update, context):
stdout, stderr = await proc.communicate() # Blocks for minutes
await update.message.reply_text(stdout)
# RIGHT - Non-blocking handler
app.add_handler(MessageHandler(
filters.TEXT,
handle_message,
block=False # Runs as asyncio.Task
))
Pitfall 4: Assuming Claude Code Session Isolation
What goes wrong: Spawning multiple Claude Code processes in same directory causes session conflicts
Why it happens: Claude Code manages session state in .claude/ subdirectory. Multiple processes in same directory share session state, corrupting history.
How to avoid: Each session must have its own directory (~/telegram/sessions/<name>/). Change cwd parameter when spawning subprocess.
Warning signs: Session history mixed between conversations, --resume loads wrong context
# WRONG - Shared directory
proc = await asyncio.create_subprocess_exec('claude', '-p', msg)
# RIGHT - Isolated directory per session
session_dir = Path.home() / 'telegram' / 'sessions' / session_name
proc = await asyncio.create_subprocess_exec(
'claude', '-p', msg,
cwd=str(session_dir)
)
Pitfall 5: Ignoring stream-json Event Types
What goes wrong: Only handling 'assistant' events misses errors, tool confirmations, completion status Why it happens: stream-json emits multiple event types (system, assistant, result). Parsing only one type loses critical information. How to avoid: Handle all event types in stream parser, especially 'result' for completion status and 'system' for errors Warning signs: Missing error notifications, unclear when Claude finishes processing, tool use not tracked
# WRONG - Only handles assistant messages
if event['type'] == 'assistant':
send_to_telegram(event['message'])
# RIGHT - Handle all event types
if event['type'] == 'assistant':
send_to_telegram(event['message'])
elif event['type'] == 'result':
mark_session_complete(event)
elif event['type'] == 'system' and event.get('subtype') == 'error':
notify_user_error(event)
Code Examples
Verified patterns from official sources:
Creating and Managing Subprocess
# Source: https://docs.python.org/3/library/asyncio-subprocess.html
import asyncio
from pathlib import Path
async def spawn_claude_subprocess(session_dir: Path, initial_message: str):
"""Spawn Claude Code subprocess for session."""
proc = await asyncio.create_subprocess_exec(
'claude',
'-p', initial_message,
'--output-format', 'stream-json',
'--verbose',
'--continue', # Resume session if exists
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(session_dir)
)
return proc
Concurrent Stream Reading
# Source: https://docs.python.org/3/library/asyncio-subprocess.html
async def read_stream(stream, callback):
"""Read stream line-by-line, invoke callback for each line."""
while True:
line = await stream.readline()
if not line:
break
callback(line.decode().rstrip())
async def run_with_stream_handlers(proc, stdout_handler, stderr_handler):
"""Run subprocess with concurrent stdout/stderr reading."""
await asyncio.gather(
read_stream(proc.stdout, stdout_handler),
read_stream(proc.stderr, stderr_handler),
proc.wait()
)
Graceful Process Termination
# Source: Python asyncio subprocess research (multiple sources)
import asyncio
async def terminate_process(proc, timeout: int = 10):
"""Terminate subprocess gracefully, prevent zombie."""
if proc.returncode is not None:
return # Already terminated
try:
proc.terminate() # Send SIGTERM
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill() # Force SIGKILL
await proc.wait() # CRITICAL: Always reap zombie
Session Directory Management
# Source: Phase context + research
from pathlib import Path
import json
from datetime import datetime
def create_session_directory(name: str, persona: str = None) -> Path:
"""Create isolated session directory with metadata."""
session_dir = Path.home() / 'telegram' / 'sessions' / name
session_dir.mkdir(parents=True, exist_ok=True)
metadata = {
'name': name,
'created': datetime.now().isoformat(),
'persona': persona,
'pid': None,
'status': 'idle',
'last_active': None
}
metadata_file = session_dir / 'metadata.json'
metadata_file.write_text(json.dumps(metadata, indent=2))
return session_dir
Parsing stream-json Output
# Source: https://code.claude.com/docs/en/headless
import json
import logging
logger = logging.getLogger(__name__)
def parse_stream_json_line(line: str):
"""Parse single line of stream-json output."""
try:
event = json.loads(line)
return event
except json.JSONDecodeError:
logger.warning(f"Invalid JSON line: {line}")
return None
async def handle_claude_output(stream, telegram_chat_id, bot):
"""Handle Claude Code stream-json output."""
while True:
line = await stream.readline()
if not line:
break
event = parse_stream_json_line(line.decode().rstrip())
if not event:
continue
event_type = event.get('type')
if event_type == 'assistant':
# Extract text from assistant message
content = event.get('message', {}).get('content', [])
for block in content:
if block.get('type') == 'text':
text = block.get('text', '')
await bot.send_message(chat_id=telegram_chat_id, text=text)
elif event_type == 'result':
# Task completion
if event.get('is_error'):
await bot.send_message(
chat_id=telegram_chat_id,
text="Claude encountered an error."
)
State of the Art
| Old Approach | Current Approach | When Changed | Impact |
|---|---|---|---|
| PTY for all subprocess interaction | Pipes with concurrent reading for non-interactive programs | Python 3.6+ asyncio maturity | Simpler code, fewer edge cases, better error handling |
| Sequential stdout/stderr reading | Concurrent asyncio.gather() |
Python 3.5 async/await | Eliminates deadlocks from buffer overflow |
| Manual signal handling for termination | terminate() + wait_for() with timeout |
Python 3.7+ | Graceful shutdown with fallback to SIGKILL |
| Thread-based Telegram bots | Async python-telegram-bot 20.0+ | v20.0 (2023) | Native async/await, better performance |
| File-based Claude interaction | Stream-json subprocess with live parsing | Claude Code 2.0+ (2024) | Real-time responses, lower latency |
Deprecated/outdated:
- python-telegram-bot sync mode (< v20): Deprecated, removed in v20. All new code must use async/await.
- subprocess.PIPE without concurrent reading: Known deadlock risk since Python 3.4, documented as anti-pattern
- PTY for Claude Code: Unnecessary; Claude Code designed for pipe interaction, handles non-TTY gracefully
Open Questions
Things that couldn't be fully resolved:
-
Claude Code auto-restart behavior with --resume
- What we know:
--resumeloads session by ID,--continueloads most recent in directory - What's unclear: If Claude Code crashes mid-response, can we auto-restart with
--continueand it resumes cleanly? Or do we need to track message history ourselves? - Recommendation: Test crash recovery behavior. Likely safe to use
--continuein session directory after crash - Claude Code manages history in.claude/subdirectory.
- What we know:
-
Optimal buffer limit for long-running sessions
- What we know:
limitparameter oncreate_subprocess_exec()controls StreamReader buffer size (default 64KB) - What's unclear: Should we increase for Claude Code's potentially long responses? What's the memory tradeoff?
- Recommendation: Start with default (64KB). Monitor in Phase 4. Claude Code stream-json outputs line-by-line, so readline() should prevent buffer buildup.
- What we know:
-
Handling concurrent messages during Claude processing
- What we know: User might send multiple messages while Claude is responding
- What's unclear: Queue to subprocess stdin (if using
--input-format stream-json)? Or wait for completion and send as new turn? - Recommendation: Phase context says "queue messages, send after response completes." For Phase 1, buffer messages in Python and send as new
claude -pinvocation after previous completes. Phase 2+ might use--input-format stream-jsonfor live piping.
-
Session metadata beyond basics
- What we know: Need name, PID, timestamps, persona at minimum
- What's unclear: Should we track message count, last message timestamp, token usage, Claude Code session ID?
- Recommendation: Keep it minimal for Phase 1. Metadata schema:
Add fields in later phases as needed (token tracking in Phase 4, etc.){ "name": "session-name", "created": "2026-02-04T14:20:00Z", "last_active": "2026-02-04T15:30:00Z", "persona": "brainstorm", "pid": 12345, "status": "active|suspended|idle" }
Sources
Primary (HIGH confidence)
- Python asyncio subprocess documentation - Official Python 3.14 docs
- Claude Code CLI reference - Official Anthropic documentation
- Claude Code headless mode - Official programmatic usage guide
- python-telegram-bot Concurrency wiki - Official PTB documentation
Secondary (MEDIUM confidence)
- Super Fast Python - Asyncio Subprocess - Practical examples verified against official docs
- Python asyncio subprocess termination best practices - Community best practices, verified with official docs
- Claude Code session management guide - Educational resource on Claude sessions
- Stream-JSON chaining wiki - Community documentation on stream-json format
Tertiary (LOW confidence)
- WebSearch results on asyncio best practices - Multiple sources, cross-referenced but not deeply verified
Metadata
Confidence breakdown:
- Standard stack: HIGH - All libraries verified in use on mgmt container, versions confirmed
- Architecture: HIGH - Patterns sourced from official Python and Claude Code documentation
- Pitfalls: HIGH - Documented in Python subprocess docs, verified through official warnings
Research date: 2026-02-04 Valid until: 2026-03-04 (30 days - Python asyncio and Claude Code are stable, slow-moving APIs)