homelab/.planning/phases/02-telegram-integration/02-01-PLAN.md
Mikkel Georgsen 36fabb41a6 docs(02): create phase plan
Phase 02: Telegram Integration
- 2 plan(s) in 2 wave(s)
- Wave 1: persistent subprocess + message utilities (02-01)
- Wave 2: bot integration + batching + file handling + systemd (02-02)
- Ready for execution

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 19:03:40 +00:00

12 KiB

phase plan type wave depends_on files_modified autonomous must_haves
02-telegram-integration 01 execute 1
telegram/claude_subprocess.py
telegram/telegram_utils.py
true
truths artifacts key_links
Persistent Claude Code subprocess accepts multiple messages without respawning
Stream-json events from Claude include tool_use events parseable for progress notifications
Long messages are split at smart boundaries without breaking MarkdownV2 code blocks
MarkdownV2 special characters are properly escaped before sending to Telegram
Typing indicator can be maintained for arbitrarily long operations via re-send loop
path provides contains
telegram/claude_subprocess.py Persistent subprocess with stream-json stdin/stdout input-format.*stream-json
path provides exports
telegram/telegram_utils.py Message splitting, MarkdownV2 escaping, typing indicator loop
split_message_smart
escape_markdown_v2
typing_indicator_loop
from to via pattern
telegram/claude_subprocess.py claude CLI --input-format stream-json stdin piping input-format.*stream-json
from to via pattern
telegram/claude_subprocess.py callbacks on_output/on_error/on_complete/on_status/on_tool_use on_tool_use
Refactor ClaudeSubprocess from fresh-process-per-turn to persistent process with stream-json I/O, and create Telegram utility functions for message formatting and typing indicators.

Purpose: The persistent process model eliminates ~1s spawn overhead per message, maintains conversation context across turns, and enables real-time tool call notifications. The utility functions provide safe MarkdownV2 formatting and smart message splitting that the bot integration (Plan 02) will consume.

Output: Refactored claude_subprocess.py with persistent subprocess, new telegram_utils.py with message splitting/escaping/typing utilities.

<execution_context> @/home/mikkel/.claude/get-shit-done/workflows/execute-plan.md @/home/mikkel/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-telegram-integration/02-RESEARCH.md @.planning/phases/02-telegram-integration/02-CONTEXT.md @.planning/phases/01-session-process-foundation/01-02-SUMMARY.md @telegram/claude_subprocess.py Task 1: Refactor ClaudeSubprocess to persistent process with stream-json I/O telegram/claude_subprocess.py Refactor ClaudeSubprocess to maintain a single long-lived process per session using `--input-format stream-json --output-format stream-json` instead of spawning fresh `claude -p` per turn.

Key changes:

  1. New start() method: Spawns persistent subprocess with:

    claude -p --input-format stream-json --output-format stream-json --verbose
    

    Plus persona flags (--system-prompt, --model, --max-turns) and --continue if .claude/ exists. Launches background tasks for _read_stdout() and _read_stderr() (concurrent readers from Phase 1 pattern).

  2. Refactor send_message(): Instead of spawning a new process, writes NDJSON to stdin:

    msg = json.dumps({"type": "user", "content": message}) + '\n'
    self._process.stdin.write(msg.encode())
    await self._process.stdin.drain()  # CRITICAL: prevent buffer deadlock
    

    Sets _busy = True. If process not started, calls start() first. If already busy, queues the message (existing queue behavior preserved).

  3. Add on_tool_use callback: New callback on_tool_use(tool_name: str, tool_input: dict) for progress notifications. In _handle_stdout_line(), parse content_block_start events with type: "tool_use" to extract tool name and input. Stream-json emits these as separate events during assistant turns.

  4. Update _handle_stdout_line(): Handle the stream-json event types from persistent mode:

    • assistant: Extract text blocks, call on_output with final text
    • result: Turn complete, set _busy = False, call on_complete, process queue
    • content_block_start / content_block_delta with tool_use type: Extract tool name + target, call on_tool_use
    • system: System events, log and handle errors
  5. Update _read_streams() / _read_stdout(): Since the process is persistent, the stdout reader must NOT exit when a turn completes. It stays alive reading events indefinitely. Remove the "mark as not busy" logic from _read_streams and move it to the result event handler in _handle_stdout_line instead. The reader only exits when process dies (readline returns empty bytes).

  6. Process lifecycle:

    • start() — spawns process, starts readers
    • send_message() — writes to stdin (auto-starts if needed)
    • terminate() — closes stdin, sends SIGTERM, waits, SIGKILL fallback (existing pattern)
    • is_alive — check process.returncode is None
    • is_busy — check _busy flag
  7. Crash recovery: Keep existing crash recovery logic but adapt: when process dies unexpectedly, restart with start() and resend any queued messages. The --continue flag in start() ensures session context is preserved.

  8. Remove _spawn() method: Replace with start() for process lifecycle and send_message() for message delivery. The old pattern of passing message to _spawn() is no longer needed since messages go to stdin.

Preserve from Phase 1:

  • Callback architecture (on_output, on_error, on_complete, on_status + new on_tool_use)
  • Message queue (asyncio.Queue)
  • Graceful termination (SIGTERM → wait_for → SIGKILL → wait)
  • Crash retry logic (MAX_CRASH_RETRIES, backoff)
  • PATH environment setup
  • Session directory as cwd
  • Timing/logging instrumentation

Important implementation notes:

  • The stream-json input format message structure: Test with {"type": "user", "content": "message text"}. If that fails, try simpler {"content": "message text"}. Check claude --help or test interactively.
  • Always await proc.stdin.drain() after writing to prevent pipe buffer deadlock
  • The stdout reader task must run for the lifetime of the process, not per-turn
  • Track busy state via result events, not process completion
  1. Run python -c "import telegram.claude_subprocess; print('import OK')" from ~/homelab (or equivalent sibling import test)
  2. Verify the class has start(), send_message(), terminate() methods
  3. Verify on_tool_use callback parameter exists in init
  4. Verify --input-format stream-json appears in the command construction
  5. Verify stdin.write + drain pattern in send_message
  6. Verify stdout reader does NOT exit on result events (stays alive for persistent process) ClaudeSubprocess maintains a persistent process that accepts NDJSON messages on stdin, emits stream-json events on stdout, routes tool_use events to on_tool_use callback, and tracks busy state via result events instead of process completion. No fresh process spawned per turn.
Task 2: Create telegram_utils.py with message formatting and typing indicator telegram/telegram_utils.py Create `telegram/telegram_utils.py` with three utility functions consumed by the bot integration in Plan 02.

1. Smart message splitting — split_message_smart(text: str, max_length: int = 4000) -> list[str]:

  • Split long messages respecting MarkdownV2 code block boundaries
  • Never split inside triple-backtick code blocks
  • Prefer paragraph breaks (\n\n), then line breaks (\n), then hard character split as last resort
  • Use 4000 as default max (not 4096) to leave room for MarkdownV2 escape character expansion
  • Track in_code_block state by counting triple-backtick lines
  • If a code block exceeds max_length by itself, include it whole (Telegram will handle overflow gracefully or we truncate with "... (truncated)" marker)
  • Algorithm from research: iterate lines, track code block state, split only when NOT in code block and would exceed limit

2. MarkdownV2 escaping — escape_markdown_v2(text: str) -> str:

  • Escape the 17 MarkdownV2 special characters: _ * [ ] ( ) ~ \ > # + - = | { } . !`
  • BUT do NOT escape inside code blocks (text between triple backticks or single backticks)
  • Strategy: Split text by code regions, escape only non-code regions, rejoin
  • For inline code (single backtick): don't escape content between backticks
  • For code blocks (triple backtick): don't escape content between triple backticks
  • Use regex to identify code regions: find ... and ... blocks, escape everything else

3. Typing indicator loop — async def typing_indicator_loop(bot, chat_id: int, stop_event: asyncio.Event):

  • Send ChatAction.TYPING every 4 seconds until stop_event is set
  • Use asyncio.wait_for(stop_event.wait(), timeout=4.0) pattern from research
  • Catch exceptions from send_chat_action (network errors) and log warning, continue loop
  • Clean exit when stop_event is set
  • Import from telegram import ChatAction

Module structure:

"""
Telegram message formatting and UX utilities.

Provides smart message splitting, MarkdownV2 escaping, and typing indicator
management for the Telegram Claude Code bridge.
"""
import asyncio
import logging
import re
from telegram import ChatAction

logger = logging.getLogger(__name__)

TELEGRAM_MAX_LENGTH = 4096
SAFE_LENGTH = 4000

def split_message_smart(text: str, max_length: int = SAFE_LENGTH) -> list[str]:
    ...

def escape_markdown_v2(text: str) -> str:
    ...

async def typing_indicator_loop(bot, chat_id: int, stop_event: asyncio.Event):
    ...
1. Run `python -c "from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop; print('imports OK')"` from ~/homelab/telegram/ 2. Test split_message_smart: `split_message_smart("a" * 5000)` returns list with 2+ chunks, each <= 4000 chars 3. Test split_message_smart with code block: message containing ``` block is not split inside the block 4. Test escape_markdown_v2: `escape_markdown_v2("hello_world")` returns `"hello\_world"` 5. Test escape_markdown_v2 preserves code: text inside backticks is NOT escaped telegram_utils.py exists with split_message_smart (code-block-aware splitting), escape_markdown_v2 (context-sensitive escaping), and typing_indicator_loop (4s re-send with asyncio.Event cancellation). All functions importable and tested with basic cases. 1. `python -c "from telegram.claude_subprocess import ClaudeSubprocess"` succeeds (or sibling import equivalent) 2. `python -c "from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop"` from telegram/ succeeds 3. ClaudeSubprocess.__init__ accepts on_tool_use callback 4. ClaudeSubprocess has start(), send_message(), terminate() methods 5. send_message() writes NDJSON to stdin (not spawning new process) 6. split_message_smart handles code blocks without breaking them 7. escape_markdown_v2 escapes outside code blocks only

<success_criteria>

  • ClaudeSubprocess uses persistent process with --input-format stream-json
  • Messages sent via stdin NDJSON, not fresh process spawn
  • Tool use events parsed and routed to on_tool_use callback
  • Smart message splitting respects code block boundaries
  • MarkdownV2 escaping handles all 17 special characters with code block awareness
  • Typing indicator loop re-sends every 4 seconds with clean cancellation </success_criteria>
After completion, create `.planning/phases/02-telegram-integration/02-01-SUMMARY.md`