- Create MessageBatcher class for debounce-based message batching - Update make_callbacks() to include on_tool_use with progress notifications - Add typing indicator support with stop_event control - Implement smart message splitting with MarkdownV2 escaping - Update handle_message() to use typing and batching - Update handle_photo() and handle_document() to save to session directories - Add auto-analysis for photos and file upload notifications - Update session switching and archiving to handle typing and batchers
127 lines
4.1 KiB
Python
127 lines
4.1 KiB
Python
"""
|
|
Message batching with debounce for Telegram bot.
|
|
|
|
Collects rapid sequential messages and combines them into a single prompt
|
|
after a configurable debounce period of silence.
|
|
|
|
Based on research in: .planning/phases/02-telegram-integration/02-RESEARCH.md
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Callable
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MessageBatcher:
|
|
"""
|
|
Batches rapid sequential messages with debounce timer.
|
|
|
|
When messages arrive in quick succession, waits for a period of silence
|
|
(debounce_seconds) before flushing all queued messages as a single batch
|
|
via the callback function.
|
|
|
|
Example:
|
|
async def handle_batch(combined: str):
|
|
await process_message(combined)
|
|
|
|
batcher = MessageBatcher(callback=handle_batch, debounce_seconds=2.0)
|
|
await batcher.add_message("one")
|
|
await batcher.add_message("two") # Resets timer
|
|
await batcher.add_message("three") # Resets timer
|
|
# After 2s of silence, callback receives: "one\n\ntwo\n\nthree"
|
|
"""
|
|
|
|
def __init__(self, callback: Callable[[str], None], debounce_seconds: float = 2.0):
|
|
"""
|
|
Initialize MessageBatcher.
|
|
|
|
Args:
|
|
callback: Async function to call with combined message string
|
|
debounce_seconds: Seconds of silence before flushing batch
|
|
"""
|
|
self._callback = callback
|
|
self._debounce_seconds = debounce_seconds
|
|
self._queue = asyncio.Queue()
|
|
self._timer_task: asyncio.Task | None = None
|
|
self._lock = asyncio.Lock()
|
|
logger.debug(f"MessageBatcher initialized: debounce={debounce_seconds}s")
|
|
|
|
async def add_message(self, message: str) -> None:
|
|
"""
|
|
Add message to batch and reset debounce timer.
|
|
|
|
Args:
|
|
message: Message text to batch
|
|
"""
|
|
async with self._lock:
|
|
# Add message to queue
|
|
await self._queue.put(message)
|
|
logger.debug(f"Message queued (size={self._queue.qsize()}): {message[:50]}...")
|
|
|
|
# Cancel previous timer if running
|
|
if self._timer_task and not self._timer_task.done():
|
|
self._timer_task.cancel()
|
|
try:
|
|
await self._timer_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Start new timer
|
|
self._timer_task = asyncio.create_task(self._debounce_timer())
|
|
|
|
async def _debounce_timer(self) -> None:
|
|
"""
|
|
Wait for debounce period, then flush batch.
|
|
|
|
Runs as a task that can be cancelled when new messages arrive.
|
|
"""
|
|
try:
|
|
await asyncio.sleep(self._debounce_seconds)
|
|
await self._flush_batch()
|
|
except asyncio.CancelledError:
|
|
# Timer was cancelled by new message, not an error
|
|
logger.debug("Debounce timer cancelled (new message arrived)")
|
|
|
|
async def _flush_batch(self) -> None:
|
|
"""
|
|
Combine all queued messages and call callback.
|
|
|
|
Joins messages with double newline separator.
|
|
"""
|
|
async with self._lock:
|
|
# Collect all messages from queue
|
|
messages = []
|
|
while not self._queue.empty():
|
|
messages.append(await self._queue.get())
|
|
|
|
if not messages:
|
|
return
|
|
|
|
# Combine with double newline
|
|
combined = "\n\n".join(messages)
|
|
logger.info(f"Flushing batch: {len(messages)} messages -> {len(combined)} chars")
|
|
|
|
# Call callback
|
|
try:
|
|
await self._callback(combined)
|
|
except Exception as e:
|
|
logger.error(f"Error in batch callback: {e}")
|
|
|
|
async def flush_immediately(self) -> None:
|
|
"""
|
|
Flush batch immediately without waiting for debounce timer.
|
|
|
|
Useful when switching sessions or shutting down.
|
|
"""
|
|
# Cancel timer
|
|
if self._timer_task and not self._timer_task.done():
|
|
self._timer_task.cancel()
|
|
try:
|
|
await self._timer_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Flush
|
|
await self._flush_batch()
|