From f246d18fa00837cf2e2a4bca0d8d97a2ec176147 Mon Sep 17 00:00:00 2001 From: Mikkel Georgsen Date: Wed, 4 Feb 2026 19:24:09 +0000 Subject: [PATCH] feat(02-02): integrate typing indicators, batching, and file handling - Create MessageBatcher class for debounce-based message batching - Update make_callbacks() to include on_tool_use with progress notifications - Add typing indicator support with stop_event control - Implement smart message splitting with MarkdownV2 escaping - Update handle_message() to use typing and batching - Update handle_photo() and handle_document() to save to session directories - Add auto-analysis for photos and file upload notifications - Update session switching and archiving to handle typing and batchers --- telegram/bot.py | 314 +++++++++++++++++++++++++++++------- telegram/message_batcher.py | 127 +++++++++++++++ 2 files changed, 385 insertions(+), 56 deletions(-) create mode 100644 telegram/message_batcher.py diff --git a/telegram/bot.py b/telegram/bot.py index 1c8458d..5b1abf9 100755 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -16,6 +16,8 @@ from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from session_manager import SessionManager from claude_subprocess import ClaudeSubprocess +from telegram_utils import split_message_smart, escape_markdown_v2, typing_indicator_loop +from message_batcher import MessageBatcher # Setup logging logging.basicConfig( @@ -41,9 +43,11 @@ AUTHORIZED_FILE = Path(__file__).parent / 'authorized_users' # Inbox file for messages to Claude INBOX_FILE = Path(__file__).parent / 'inbox' -# Session manager and subprocess tracking +# Session manager, subprocess tracking, and message batchers session_manager = SessionManager() subprocesses: dict[str, ClaudeSubprocess] = {} # Persistent subprocess per session +batchers: dict[str, MessageBatcher] = {} # Message batcher per session +typing_tasks: dict[str, tuple[asyncio.Task, asyncio.Event]] = {} # Typing indicator per session def get_authorized_users(): """Load authorized user IDs.""" @@ -64,27 +68,81 @@ def is_authorized(user_id: int) -> bool: """Check if user is authorized.""" return user_id in get_authorized_users() -def make_callbacks(bot, chat_id): - """Create callbacks for ClaudeSubprocess bound to specific chat.""" +def make_callbacks(bot, chat_id, stop_typing_event: asyncio.Event): + """Create callbacks for ClaudeSubprocess bound to specific chat with typing control.""" async def on_output(text): t0 = time.monotonic() - # Truncate to Telegram limit - if len(text) > 4000: - text = text[:4000] + "\n... (truncated)" - await bot.send_message(chat_id=chat_id, text=text) + + # Split message using smart splitting + chunks = split_message_smart(text) + + for chunk in chunks: + try: + # Try sending with MarkdownV2 + escaped = escape_markdown_v2(chunk) + await bot.send_message(chat_id=chat_id, text=escaped, parse_mode='MarkdownV2') + except Exception as e: + # Fall back to plain text if MarkdownV2 fails + logger.warning(f"MarkdownV2 parse failed, falling back to plain text: {e}") + await bot.send_message(chat_id=chat_id, text=chunk) + + # Stop typing indicator + stop_typing_event.set() + elapsed = time.monotonic() - t0 - logger.info(f"[TIMING] Telegram send: {elapsed:.3f}s ({len(text)} chars)") + logger.info(f"[TIMING] Telegram send: {elapsed:.3f}s ({len(text)} chars, {len(chunks)} chunks)") async def on_error(error): await bot.send_message(chat_id=chat_id, text=f"Error: {error}") + # Stop typing indicator on error + stop_typing_event.set() async def on_complete(): - pass # Phase 2 will add typing indicator cleanup + # Stop typing indicator on completion + stop_typing_event.set() async def on_status(status): await bot.send_message(chat_id=chat_id, text=f"[{status}]") - return on_output, on_error, on_complete, on_status + async def on_tool_use(tool_name: str, tool_input: dict): + """Format and send tool call progress notification.""" + # Extract meaningful target from tool_input + target = "" + if tool_name == "Bash": + cmd = tool_input.get("command", "") + target = cmd[:50] + "..." if len(cmd) > 50 else cmd + elif tool_name == "Read": + target = tool_input.get("file_path", "") + elif tool_name == "Edit": + target = tool_input.get("file_path", "") + elif tool_name == "Write": + target = tool_input.get("file_path", "") + elif tool_name == "Grep": + pattern = tool_input.get("pattern", "") + target = f"pattern: {pattern}" + elif tool_name == "Glob": + pattern = tool_input.get("pattern", "") + target = f"pattern: {pattern}" + else: + target = str(tool_input)[:50] + + # Format as italic message + message = f"_{tool_name}: {target}_" + + try: + await bot.send_message(chat_id=chat_id, text=message, parse_mode='MarkdownV2') + except Exception as e: + # Fall back to plain text + logger.warning(f"Failed to send tool notification with MarkdownV2: {e}") + await bot.send_message(chat_id=chat_id, text=f"{tool_name}: {target}") + + return { + 'on_output': on_output, + 'on_error': on_error, + 'on_complete': on_complete, + 'on_status': on_status, + 'on_tool_use': on_tool_use, + } def run_command(cmd: list, timeout: int = 30) -> str: """Run a shell command and return output.""" @@ -314,17 +372,21 @@ async def new_session(update: Update, context: ContextTypes.DEFAULT_TYPE): persona_config = session_manager.get_session(name) persona_data = session_manager.load_persona(persona or 'default') + # Create stop typing event for this session + stop_typing = asyncio.Event() + # Create callbacks bound to this chat - callbacks = make_callbacks(context.bot, update.effective_chat.id) + callbacks = make_callbacks(context.bot, update.effective_chat.id, stop_typing) # Create subprocess subprocesses[name] = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, - on_output=callbacks[0], - on_error=callbacks[1], - on_complete=callbacks[2], - on_status=callbacks[3], + on_output=callbacks['on_output'], + on_error=callbacks['on_error'], + on_complete=callbacks['on_complete'], + on_status=callbacks['on_status'], + on_tool_use=callbacks['on_tool_use'], ) logger.info(f"Created session '{name}' with persona '{persona or 'default'}'") @@ -372,6 +434,21 @@ async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) ) return + # Stop typing indicator for previous session if running + prev_session = session_manager.get_active_session() + if prev_session and prev_session in typing_tasks: + task, stop_event = typing_tasks[prev_session] + stop_event.set() + try: + await task + except Exception: + pass + del typing_tasks[prev_session] + + # Flush batcher for previous session immediately + if prev_session and prev_session in batchers: + await batchers[prev_session].flush_immediately() + # Switch session session_manager.switch_session(name) @@ -382,17 +459,21 @@ async def switch_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) persona_name = session_data.get('persona', 'default') persona_data = session_manager.load_persona(persona_name) + # Create stop typing event for this session + stop_typing = asyncio.Event() + # Create callbacks bound to this chat - callbacks = make_callbacks(context.bot, update.effective_chat.id) + callbacks = make_callbacks(context.bot, update.effective_chat.id, stop_typing) # Create subprocess subprocesses[name] = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, - on_output=callbacks[0], - on_error=callbacks[1], - on_complete=callbacks[2], - on_status=callbacks[3], + on_output=callbacks['on_output'], + on_error=callbacks['on_error'], + on_complete=callbacks['on_complete'], + on_status=callbacks['on_status'], + on_tool_use=callbacks['on_tool_use'], ) logger.info(f"Auto-spawned subprocess for session '{name}'") @@ -424,6 +505,21 @@ async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE name = context.args[0] try: + # Stop typing indicator if running + if name in typing_tasks: + task, stop_event = typing_tasks[name] + stop_event.set() + try: + await task + except Exception: + pass + del typing_tasks[name] + + # Flush and remove batcher + if name in batchers: + await batchers[name].flush_immediately() + del batchers[name] + # Terminate subprocess if running if name in subprocesses: if subprocesses[name].is_alive: @@ -445,7 +541,7 @@ async def archive_session_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE await update.message.reply_text(f"Error archiving session: {e}") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Handle free text messages - route to active Claude session.""" + """Handle free text messages - route to active Claude session with batching.""" if not is_authorized(update.effective_user.id): return @@ -464,7 +560,18 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): logger.info(f"[TIMING] Message received: session='{active_session}', age={msg_age:.1f}s, text={message[:50]}...") try: - # Get or create subprocess for active session + # Start typing indicator immediately (or reuse existing) + if active_session not in typing_tasks: + stop_typing = asyncio.Event() + typing_task = asyncio.create_task( + typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing) + ) + typing_tasks[active_session] = (typing_task, stop_typing) + else: + # Reuse existing typing indicator + typing_task, stop_typing = typing_tasks[active_session] + + # Get or create subprocess for active session (avoid double-start) already_alive = active_session in subprocesses and subprocesses[active_session].is_alive if not already_alive: session_dir = session_manager.get_session_dir(active_session) @@ -472,73 +579,135 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): persona_name = session_data.get('persona', 'default') persona_data = session_manager.load_persona(persona_name) - # Create callbacks bound to this chat - callbacks = make_callbacks(context.bot, update.effective_chat.id) + # Create callbacks bound to this chat with stop_typing event + callbacks = make_callbacks(context.bot, update.effective_chat.id, stop_typing) # Create subprocess - subprocesses[active_session] = ClaudeSubprocess( + subprocess_inst = ClaudeSubprocess( session_dir=session_dir, persona=persona_data, - on_output=callbacks[0], - on_error=callbacks[1], - on_complete=callbacks[2], - on_status=callbacks[3], + on_output=callbacks['on_output'], + on_error=callbacks['on_error'], + on_complete=callbacks['on_complete'], + on_status=callbacks['on_status'], + on_tool_use=callbacks['on_tool_use'], ) + await subprocess_inst.start() + subprocesses[active_session] = subprocess_inst + else: + subprocess_inst = subprocesses[active_session] subprocess_state = "reused" if already_alive else "cold-start" - logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocesses[active_session].is_busy}") + logger.info(f"[TIMING] Subprocess: {subprocess_state}, busy={subprocess_inst.is_busy}") - # Send message to Claude - await subprocesses[active_session].send_message(message) + # Get or create message batcher for this session + if active_session not in batchers: + # Batcher callback sends message to subprocess + batchers[active_session] = MessageBatcher( + callback=subprocess_inst.send_message, + debounce_seconds=2.0 + ) + + # Add message to batcher (typing indicator already running) + await batchers[active_session].add_message(message) except Exception as e: logger.error(f"Error handling message: {e}") await update.message.reply_text(f"Error: {e}") + # Stop typing indicator on error + if active_session in typing_tasks: + task, stop_event = typing_tasks[active_session] + stop_event.set() async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Handle photo messages - download and save for Claude.""" + """Handle photo messages - save to session directory and auto-analyze.""" if not is_authorized(update.effective_user.id): return + # Check for active session + active_session = session_manager.get_active_session() + if not active_session: + await update.message.reply_text( + "No active session. Use /new to start one, then send the photo again." + ) + return + user = update.effective_user - timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - # Create images directory - images_dir = Path(__file__).parent / 'images' - images_dir.mkdir(exist_ok=True) + # Get session directory + session_dir = session_manager.get_session_dir(active_session) # Get the largest photo (best quality) photo = update.message.photo[-1] file = await context.bot.get_file(photo.file_id) - # Download the image - filename = f"{file_timestamp}.jpg" - filepath = images_dir / filename + # Download to session directory + filename = f"photo_{file_timestamp}.jpg" + filepath = session_dir / filename await file.download_to_drive(filepath) + logger.info(f"Photo saved to session '{active_session}': {filepath}") + # Get caption if any caption = update.message.caption or "" - caption_text = f" - \"{caption}\"" if caption else "" - # Log to inbox - with open(INBOX_FILE, 'a') as f: - f.write(f"[{timestamp}] {user.first_name}: [IMAGE: {filepath}]{caption_text}\n") + # Start typing indicator + if active_session not in typing_tasks: + stop_typing = asyncio.Event() + typing_task = asyncio.create_task( + typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing) + ) + typing_tasks[active_session] = (typing_task, stop_typing) - logger.info(f"Photo saved from {user.first_name}: {filepath}") + # Auto-analyze: send message to Claude + if caption: + analysis_message = f"I've attached a photo: {filename}. {caption}" + else: + analysis_message = f"I've attached a photo: {filename}. Please describe what you see." + + # Get or create subprocess + if active_session not in subprocesses or not subprocesses[active_session].is_alive: + session_data = session_manager.get_session(active_session) + persona_name = session_data.get('persona', 'default') + persona_data = session_manager.load_persona(persona_name) + + stop_typing = typing_tasks[active_session][1] + callbacks = make_callbacks(context.bot, update.effective_chat.id, stop_typing) + + subprocess_inst = ClaudeSubprocess( + session_dir=session_dir, + persona=persona_data, + on_output=callbacks['on_output'], + on_error=callbacks['on_error'], + on_complete=callbacks['on_complete'], + on_status=callbacks['on_status'], + on_tool_use=callbacks['on_tool_use'], + ) + await subprocess_inst.start() + subprocesses[active_session] = subprocess_inst + + # Send analysis message directly (not batched) + await subprocesses[active_session].send_message(analysis_message) async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): - """Handle document/file messages - download and save for Claude.""" + """Handle document/file messages - save to session directory and notify Claude.""" if not is_authorized(update.effective_user.id): return + # Check for active session + active_session = session_manager.get_active_session() + if not active_session: + await update.message.reply_text( + "No active session. Use /new to start one, then send the file again." + ) + return + user = update.effective_user - timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') file_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - # Create files directory - files_dir = Path(__file__).parent / 'files' - files_dir.mkdir(exist_ok=True) + # Get session directory + session_dir = session_manager.get_session_dir(active_session) # Get document info doc = update.message.document @@ -547,18 +716,51 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): # Download with timestamp prefix to avoid collisions filename = f"{file_timestamp}_{original_name}" - filepath = files_dir / filename + filepath = session_dir / filename await file.download_to_drive(filepath) + logger.info(f"Document saved to session '{active_session}': {filepath}") + # Get caption if any caption = update.message.caption or "" - caption_text = f" - \"{caption}\"" if caption else "" - # Log to inbox - with open(INBOX_FILE, 'a') as f: - f.write(f"[{timestamp}] {user.first_name}: [FILE: {filepath}]{caption_text}\n") + # Start typing indicator + if active_session not in typing_tasks: + stop_typing = asyncio.Event() + typing_task = asyncio.create_task( + typing_indicator_loop(context.bot, update.effective_chat.id, stop_typing) + ) + typing_tasks[active_session] = (typing_task, stop_typing) - logger.info(f"Document saved from {user.first_name}: {filepath}") + # Notify Claude + if caption: + notify_message = f"{caption}\nThe file {filename} has been saved to your session." + else: + notify_message = f"User uploaded file: {filename}" + + # Get or create subprocess + if active_session not in subprocesses or not subprocesses[active_session].is_alive: + session_data = session_manager.get_session(active_session) + persona_name = session_data.get('persona', 'default') + persona_data = session_manager.load_persona(persona_name) + + stop_typing = typing_tasks[active_session][1] + callbacks = make_callbacks(context.bot, update.effective_chat.id, stop_typing) + + subprocess_inst = ClaudeSubprocess( + session_dir=session_dir, + persona=persona_data, + on_output=callbacks['on_output'], + on_error=callbacks['on_error'], + on_complete=callbacks['on_complete'], + on_status=callbacks['on_status'], + on_tool_use=callbacks['on_tool_use'], + ) + await subprocess_inst.start() + subprocesses[active_session] = subprocess_inst + + # Send notification directly (not batched) + await subprocesses[active_session].send_message(notify_message) async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle unknown commands.""" diff --git a/telegram/message_batcher.py b/telegram/message_batcher.py new file mode 100644 index 0000000..bd8ff4a --- /dev/null +++ b/telegram/message_batcher.py @@ -0,0 +1,127 @@ +""" +Message batching with debounce for Telegram bot. + +Collects rapid sequential messages and combines them into a single prompt +after a configurable debounce period of silence. + +Based on research in: .planning/phases/02-telegram-integration/02-RESEARCH.md +""" + +import asyncio +import logging +from typing import Callable + +logger = logging.getLogger(__name__) + + +class MessageBatcher: + """ + Batches rapid sequential messages with debounce timer. + + When messages arrive in quick succession, waits for a period of silence + (debounce_seconds) before flushing all queued messages as a single batch + via the callback function. + + Example: + async def handle_batch(combined: str): + await process_message(combined) + + batcher = MessageBatcher(callback=handle_batch, debounce_seconds=2.0) + await batcher.add_message("one") + await batcher.add_message("two") # Resets timer + await batcher.add_message("three") # Resets timer + # After 2s of silence, callback receives: "one\n\ntwo\n\nthree" + """ + + def __init__(self, callback: Callable[[str], None], debounce_seconds: float = 2.0): + """ + Initialize MessageBatcher. + + Args: + callback: Async function to call with combined message string + debounce_seconds: Seconds of silence before flushing batch + """ + self._callback = callback + self._debounce_seconds = debounce_seconds + self._queue = asyncio.Queue() + self._timer_task: asyncio.Task | None = None + self._lock = asyncio.Lock() + logger.debug(f"MessageBatcher initialized: debounce={debounce_seconds}s") + + async def add_message(self, message: str) -> None: + """ + Add message to batch and reset debounce timer. + + Args: + message: Message text to batch + """ + async with self._lock: + # Add message to queue + await self._queue.put(message) + logger.debug(f"Message queued (size={self._queue.qsize()}): {message[:50]}...") + + # Cancel previous timer if running + if self._timer_task and not self._timer_task.done(): + self._timer_task.cancel() + try: + await self._timer_task + except asyncio.CancelledError: + pass + + # Start new timer + self._timer_task = asyncio.create_task(self._debounce_timer()) + + async def _debounce_timer(self) -> None: + """ + Wait for debounce period, then flush batch. + + Runs as a task that can be cancelled when new messages arrive. + """ + try: + await asyncio.sleep(self._debounce_seconds) + await self._flush_batch() + except asyncio.CancelledError: + # Timer was cancelled by new message, not an error + logger.debug("Debounce timer cancelled (new message arrived)") + + async def _flush_batch(self) -> None: + """ + Combine all queued messages and call callback. + + Joins messages with double newline separator. + """ + async with self._lock: + # Collect all messages from queue + messages = [] + while not self._queue.empty(): + messages.append(await self._queue.get()) + + if not messages: + return + + # Combine with double newline + combined = "\n\n".join(messages) + logger.info(f"Flushing batch: {len(messages)} messages -> {len(combined)} chars") + + # Call callback + try: + await self._callback(combined) + except Exception as e: + logger.error(f"Error in batch callback: {e}") + + async def flush_immediately(self) -> None: + """ + Flush batch immediately without waiting for debounce timer. + + Useful when switching sessions or shutting down. + """ + # Cancel timer + if self._timer_task and not self._timer_task.done(): + self._timer_task.cancel() + try: + await self._timer_task + except asyncio.CancelledError: + pass + + # Flush + await self._flush_batch()