""" Message batching with debounce for Telegram bot. Collects rapid sequential messages and combines them into a single prompt after a configurable debounce period of silence. Based on research in: .planning/phases/02-telegram-integration/02-RESEARCH.md """ import asyncio import logging from typing import Callable logger = logging.getLogger(__name__) class MessageBatcher: """ Batches rapid sequential messages with debounce timer. When messages arrive in quick succession, waits for a period of silence (debounce_seconds) before flushing all queued messages as a single batch via the callback function. Example: async def handle_batch(combined: str): await process_message(combined) batcher = MessageBatcher(callback=handle_batch, debounce_seconds=2.0) await batcher.add_message("one") await batcher.add_message("two") # Resets timer await batcher.add_message("three") # Resets timer # After 2s of silence, callback receives: "one\n\ntwo\n\nthree" """ def __init__(self, callback: Callable[[str], None], debounce_seconds: float = 2.0): """ Initialize MessageBatcher. Args: callback: Async function to call with combined message string debounce_seconds: Seconds of silence before flushing batch """ self._callback = callback self._debounce_seconds = debounce_seconds self._queue = asyncio.Queue() self._timer_task: asyncio.Task | None = None self._lock = asyncio.Lock() logger.debug(f"MessageBatcher initialized: debounce={debounce_seconds}s") async def add_message(self, message: str) -> None: """ Add message to batch and reset debounce timer. Args: message: Message text to batch """ async with self._lock: # Add message to queue await self._queue.put(message) logger.debug(f"Message queued (size={self._queue.qsize()}): {message[:50]}...") # Cancel previous timer if running if self._timer_task and not self._timer_task.done(): self._timer_task.cancel() try: await self._timer_task except asyncio.CancelledError: pass # Start new timer self._timer_task = asyncio.create_task(self._debounce_timer()) async def _debounce_timer(self) -> None: """ Wait for debounce period, then flush batch. Runs as a task that can be cancelled when new messages arrive. """ try: await asyncio.sleep(self._debounce_seconds) await self._flush_batch() except asyncio.CancelledError: # Timer was cancelled by new message, not an error logger.debug("Debounce timer cancelled (new message arrived)") async def _flush_batch(self) -> None: """ Combine all queued messages and call callback. Joins messages with double newline separator. """ async with self._lock: # Collect all messages from queue messages = [] while not self._queue.empty(): messages.append(await self._queue.get()) if not messages: return # Combine with double newline combined = "\n\n".join(messages) logger.info(f"Flushing batch: {len(messages)} messages -> {len(combined)} chars") # Call callback try: await self._callback(combined) except Exception as e: logger.error(f"Error in batch callback: {e}") async def flush_immediately(self) -> None: """ Flush batch immediately without waiting for debounce timer. Useful when switching sessions or shutting down. """ # Cancel timer if self._timer_task and not self._timer_task.done(): self._timer_task.cancel() try: await self._timer_task except asyncio.CancelledError: pass # Flush await self._flush_batch()