From 494bb510d355608d433f1f56b9b8925ef439597f Mon Sep 17 00:00:00 2001 From: Mikkel Georgsen Date: Mon, 30 Mar 2026 08:39:44 +0000 Subject: [PATCH] feat: add ingest API + health endpoint, fix bot-to-bot logging Telegram bots can't see messages from other bots in groups. Added: - POST /api/ingest - local services log messages into bridge DB - GET /api/health - status check endpoint - Fixed post_init not running (manual init lifecycle) Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/nexus-mcp-bridge-addendum.md | 219 ++++++++++++++++++++++++++++++ mcp_bridge/__main__.py | 23 ++-- mcp_bridge/mcp_server.py | 70 ++++++++++ 3 files changed, 303 insertions(+), 9 deletions(-) create mode 100644 docs/nexus-mcp-bridge-addendum.md diff --git a/docs/nexus-mcp-bridge-addendum.md b/docs/nexus-mcp-bridge-addendum.md new file mode 100644 index 0000000..3c91562 --- /dev/null +++ b/docs/nexus-mcp-bridge-addendum.md @@ -0,0 +1,219 @@ +# MCP Bridge Spec — Addendum: Session Tracking & Per-Chat Context + +Add these sections to the existing nexus-mcp-bridge-spec.md + +--- + +## 11. Session Tracking + +Claude Code sessions are ephemeral. If a session dies and restarts, +context is lost. The bridge must track session state to detect this. + +### Agent-side: session file + +On startup, each Claude Code session writes its session ID to a +known file in its working directory: + +```python +# Agent writes on boot (in CLAUDE.md startup hook or .bashrc) +import uuid, json +from datetime import datetime + +session = { + "id": str(uuid.uuid4()), + "started_at": datetime.utcnow().isoformat(), + "chat_id": TELEGRAM_CHAT_ID, # Which chat this session serves + "project": "homelab", # or "felt", "sentry", etc. +} + +with open(os.path.expanduser("~/.claude-session"), "w") as f: + json.dump(session, f) +``` + +### Bridge-side: session awareness + +The bridge reads the session file on every poll. If the session ID +changes between polls, it means the agent restarted: + +```python +@dataclass +class SessionState: + session_id: str + started_at: datetime + chat_id: str + project: str + last_seen_id: str | None = None # Previous session for comparison + +def check_session(self) -> tuple[SessionState, bool]: + """Returns (current_session, has_restarted).""" + with open(os.path.expanduser("~/.claude-session")) as f: + data = json.load(f) + + current = SessionState(**data) + restarted = (self.last_seen_id is not None + and self.last_seen_id != current.session_id) + self.last_seen_id = current.session_id + return current, restarted +``` + +### Restart notification + +When a restart is detected, pull_updates includes a warning: + +```json +{ + "updates": [ + { + "message_id": null, + "content": "⚠️ Agent session restarted. Previous context lost. Session: abc-123 → def-456", + "created_at": "2026-03-30T08:15:00Z", + "type": "session_restart" + } + ] +} +``` + +### Persistent task state + +The agent persists its current task state to disk so it can resume +after a restart instead of starting blind: + +``` +~/.claude-session-state.json +{ + "session_id": "abc-123", + "current_task": "Fix nexus.mg DNS record", + "current_task_id": "msg-456", + "last_completed_step": 2, + "pending_messages": ["msg-789"], + "updated_at": "2026-03-30T01:15:00Z" +} +``` + +On restart, the agent reads this file, announces what it was doing, +and asks the owner whether to resume or start fresh. + +--- + +## 12. Per-Chat Context Isolation + +Each Telegram group chat maps to a separate Claude Code session with +its own context window, working directory, skills, and knowledge files. + +### Chat → Session mapping + +```python +CHAT_SESSIONS = { + "homelab": { + "chat_id": "", + "workdir": "/home/mikkel/homelab", + "claude_md": "/home/mikkel/homelab/.claude/CLAUDE.md", + "session_file": "/home/mikkel/homelab/.claude-session", + }, + "felt": { + "chat_id": "", + "workdir": "/home/mikkel/felt", + "claude_md": "/home/mikkel/felt/.claude/CLAUDE.md", + "session_file": "/home/mikkel/felt/.claude-session", + }, + "sentry": { + "chat_id": "", + "workdir": "/home/mikkel/sentry", + "claude_md": "/home/mikkel/sentry/.claude/CLAUDE.md", + "session_file": "/home/mikkel/sentry/.claude-session", + }, +} +``` + +### Queue is per-chat + +Each chat has its own queue. Messages dispatched to "homelab" don't +appear in the "felt" queue. The MCP tools gain a `target` parameter: + +``` +send_message(target="homelab", message="Fix the DNS record") +send_message(target="felt", message="Update the Go backend") +pull_updates(target="homelab") +pull_updates(target="felt") +queue_status() # Returns status for ALL targets +``` + +### Session lifecycle per chat + +Each Claude Code session is started with its project-specific context: + +```bash +# Homelab session +cd ~/homelab && claude --channels plugin:telegram@claude-plugins-official + +# Felt session +cd ~/felt && claude --channels plugin:telegram@claude-plugins-official + +# Sentry session +cd ~/sentry && claude --channels plugin:telegram@claude-plugins-official +``` + +Each session reads its own CLAUDE.md, has its own skills directory, +and only sees messages from its own Telegram group. Context stays +clean and focused. + +### The bridge routes by target + +```python +async def send_message(target: str, message: str) -> str: + session_config = CHAT_SESSIONS.get(target) + if not session_config: + return json.dumps({"queued": False, "reason": f"unknown target: {target}"}) + + msg = queues[target].enqueue(message) + if msg is None: + return json.dumps({"queued": False, "reason": "duplicate"}) + + await transport.send( + chat_id=session_config["chat_id"], + message=message, + ) + return json.dumps({"queued": True, "id": msg.id, "target": target}) +``` + +--- + +## 13. Updated MCP Tools Summary + +| Tool | Parameters | Returns | +|------|-----------|---------| +| `send_message` | `target`, `message` | `{queued, id, target}` or `{reason}` | +| `pull_updates` | `target`, `since?` | `{updates[], session_restart?}` | +| `queue_status` | — | Per-target: `{pending, in_progress, current_task, session_id}` | + +--- + +## 14. Agent System Prompt Addition (for CLAUDE.md) + +```markdown +## Session Tracking + +On startup: +1. Generate a session UUID +2. Write it to ~/.claude-session with timestamp and project name +3. Read ~/.claude-session-state.json if it exists +4. If resuming from a crash, announce: + "[Session restarted. Previous session was working on: . + Resume or start fresh?]" +5. Persist task state to ~/.claude-session-state.json after every step + +## Message Queue Awareness + +You receive messages from multiple sources: +- Direct Telegram messages from Mikkel +- Queued tasks from claude.ai via the MCP bridge + +When you receive a new message while working on a task: +1. Announce: "[New message received: ]" +2. Assess urgency — URGENT or STOP = halt immediately +3. If non-urgent, acknowledge and continue current task +4. Address queued messages in order after current task completes + +Always announce which message you're responding to: +"[Responding to: ]" +``` diff --git a/mcp_bridge/__main__.py b/mcp_bridge/__main__.py index b592c10..76bc366 100644 --- a/mcp_bridge/__main__.py +++ b/mcp_bridge/__main__.py @@ -7,7 +7,7 @@ import signal from .config import MCP_HOST, MCP_PORT, MEDIA_DIR, DB_PATH from .db import Database from .telegram_bot import BridgeBot -from .mcp_server import mcp, init as init_mcp +from .mcp_server import mcp, init as init_mcp, custom_routes logging.basicConfig( level=logging.INFO, @@ -21,6 +21,8 @@ async def run_telegram_bot(bot: BridgeBot): """Run the telegram bot polling loop.""" app = bot.build_application() await app.initialize() + # post_init isn't called automatically when we manually start + await bot._post_init(app) await app.start() updater = app.updater await updater.start_polling(drop_pending_updates=True) @@ -37,14 +39,17 @@ async def run_telegram_bot(bot: BridgeBot): async def run_mcp_server(): - """Run the FastMCP HTTP server using its built-in runner.""" + """Run the FastMCP HTTP server with custom API routes.""" + import uvicorn + + # Get the FastMCP Starlette app and add our custom routes to it + mcp_app = mcp.http_app() + mcp_app.routes.extend(custom_routes) + logger.info(f"MCP server starting on {MCP_HOST}:{MCP_PORT}") - await mcp.run_http_async( - host=MCP_HOST, - port=MCP_PORT, - log_level="info", - show_banner=False, - ) + config = uvicorn.Config(mcp_app, host=MCP_HOST, port=MCP_PORT, log_level="info") + server = uvicorn.Server(config) + await server.serve() async def main(): @@ -73,7 +78,7 @@ async def main(): loop.add_signal_handler(sig, handle_signal) try: - await asyncio.gather(telegram_task, mcp_task) + await asyncio.gather(telegram_task, mcp_task, return_exceptions=True) except asyncio.CancelledError: pass diff --git a/mcp_bridge/mcp_server.py b/mcp_bridge/mcp_server.py index f95fa0c..9534f5b 100644 --- a/mcp_bridge/mcp_server.py +++ b/mcp_bridge/mcp_server.py @@ -1,16 +1,86 @@ """FastMCP server exposing bridge tools to claude.ai.""" import json +import logging from datetime import datetime, timezone from fastmcp import FastMCP +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.routing import Route from .db import Database from .config import get_group_chat_id +logger = logging.getLogger(__name__) + # Will be initialized in __main__ with shared db instance db: Database | None = None + +async def ingest_message(request: Request) -> JSONResponse: + """HTTP endpoint for local services to log messages into the bridge. + + POST /api/ingest + { + "telegram_message_id": 123, # required + "chat_id": -100..., # required + "sender_type": "homelab_bot", # required + "sender_id": 8521598773, # optional + "sender_name": "Homelab Bot", # optional + "content": "message text", # optional + "reply_to_message_id": null, # optional + "created_at": "ISO8601" # optional (defaults to now) + } + """ + try: + data = await request.json() + except Exception: + return JSONResponse({"error": "invalid JSON"}, status_code=400) + + telegram_message_id = data.get("telegram_message_id") + chat_id = data.get("chat_id") + if not telegram_message_id or not chat_id: + return JSONResponse( + {"error": "telegram_message_id and chat_id are required"}, + status_code=400, + ) + + created_at = data.get("created_at", datetime.now(timezone.utc).isoformat()) + + msg_id = db.insert_message( + telegram_message_id=telegram_message_id, + chat_id=chat_id, + sender_type=data.get("sender_type", "unknown"), + sender_id=data.get("sender_id"), + sender_name=data.get("sender_name"), + content=data.get("content"), + reply_to_message_id=data.get("reply_to_message_id"), + has_attachment=data.get("has_attachment", False), + created_at=created_at, + ) + + if msg_id is None: + return JSONResponse({"ok": True, "duplicate": True}) + + logger.info( + f"Ingested message {telegram_message_id} from {data.get('sender_name', 'unknown')}" + ) + return JSONResponse({"ok": True, "id": msg_id}) + + +async def health(request: Request) -> JSONResponse: + """Health check endpoint.""" + status = db.get_status() + return JSONResponse({"status": "ok", **status}) + + +# Custom routes added to the FastMCP app +custom_routes = [ + Route("/api/ingest", ingest_message, methods=["POST"]), + Route("/api/health", health, methods=["GET"]), +] + mcp = FastMCP( name="homelab-bridge", instructions=(