homelab/telegram/message_batcher.py
Mikkel Georgsen f246d18fa0 feat(02-02): integrate typing indicators, batching, and file handling
- Create MessageBatcher class for debounce-based message batching
- Update make_callbacks() to include on_tool_use with progress notifications
- Add typing indicator support with stop_event control
- Implement smart message splitting with MarkdownV2 escaping
- Update handle_message() to use typing and batching
- Update handle_photo() and handle_document() to save to session directories
- Add auto-analysis for photos and file upload notifications
- Update session switching and archiving to handle typing and batchers
2026-02-04 19:24:09 +00:00

127 lines
4.1 KiB
Python

"""
Message batching with debounce for Telegram bot.
Collects rapid sequential messages and combines them into a single prompt
after a configurable debounce period of silence.
Based on research in: .planning/phases/02-telegram-integration/02-RESEARCH.md
"""
import asyncio
import logging
from typing import Callable
logger = logging.getLogger(__name__)
class MessageBatcher:
"""
Batches rapid sequential messages with debounce timer.
When messages arrive in quick succession, waits for a period of silence
(debounce_seconds) before flushing all queued messages as a single batch
via the callback function.
Example:
async def handle_batch(combined: str):
await process_message(combined)
batcher = MessageBatcher(callback=handle_batch, debounce_seconds=2.0)
await batcher.add_message("one")
await batcher.add_message("two") # Resets timer
await batcher.add_message("three") # Resets timer
# After 2s of silence, callback receives: "one\n\ntwo\n\nthree"
"""
def __init__(self, callback: Callable[[str], None], debounce_seconds: float = 2.0):
"""
Initialize MessageBatcher.
Args:
callback: Async function to call with combined message string
debounce_seconds: Seconds of silence before flushing batch
"""
self._callback = callback
self._debounce_seconds = debounce_seconds
self._queue = asyncio.Queue()
self._timer_task: asyncio.Task | None = None
self._lock = asyncio.Lock()
logger.debug(f"MessageBatcher initialized: debounce={debounce_seconds}s")
async def add_message(self, message: str) -> None:
"""
Add message to batch and reset debounce timer.
Args:
message: Message text to batch
"""
async with self._lock:
# Add message to queue
await self._queue.put(message)
logger.debug(f"Message queued (size={self._queue.qsize()}): {message[:50]}...")
# Cancel previous timer if running
if self._timer_task and not self._timer_task.done():
self._timer_task.cancel()
try:
await self._timer_task
except asyncio.CancelledError:
pass
# Start new timer
self._timer_task = asyncio.create_task(self._debounce_timer())
async def _debounce_timer(self) -> None:
"""
Wait for debounce period, then flush batch.
Runs as a task that can be cancelled when new messages arrive.
"""
try:
await asyncio.sleep(self._debounce_seconds)
await self._flush_batch()
except asyncio.CancelledError:
# Timer was cancelled by new message, not an error
logger.debug("Debounce timer cancelled (new message arrived)")
async def _flush_batch(self) -> None:
"""
Combine all queued messages and call callback.
Joins messages with double newline separator.
"""
async with self._lock:
# Collect all messages from queue
messages = []
while not self._queue.empty():
messages.append(await self._queue.get())
if not messages:
return
# Combine with double newline
combined = "\n\n".join(messages)
logger.info(f"Flushing batch: {len(messages)} messages -> {len(combined)} chars")
# Call callback
try:
await self._callback(combined)
except Exception as e:
logger.error(f"Error in batch callback: {e}")
async def flush_immediately(self) -> None:
"""
Flush batch immediately without waiting for debounce timer.
Useful when switching sessions or shutting down.
"""
# Cancel timer
if self._timer_task and not self._timer_task.done():
self._timer_task.cancel()
try:
await self._timer_task
except asyncio.CancelledError:
pass
# Flush
await self._flush_batch()