From c339980411ab24cd135c89940e6a0339cfdc7083 Mon Sep 17 00:00:00 2001 From: Mikkel Georgsen Date: Sun, 29 Mar 2026 23:32:03 +0000 Subject: [PATCH] Initial spec --- docs/nexus-mcp-bridge-spec.md | 436 ++++++++++++++++++++++++++++++++++ 1 file changed, 436 insertions(+) create mode 100644 docs/nexus-mcp-bridge-spec.md diff --git a/docs/nexus-mcp-bridge-spec.md b/docs/nexus-mcp-bridge-spec.md new file mode 100644 index 0000000..417a297 --- /dev/null +++ b/docs/nexus-mcp-bridge-spec.md @@ -0,0 +1,436 @@ +# Nexus MCP Bridge: claude.ai ↔ Homelab Agent + +**Author:** Mikkel Georgsen (msgeorgsen@gmail.com) +**Date:** 2026-03-30 +**Language:** Python +**Runs on:** LXC 102 (mgmt) +**Transport:** Telegram bot (existing) or direct HTTP over NetBird + +--- + +## 1. Purpose + +Let Claude (in claude.ai) dispatch tasks to and pull updates from the +homelab management agent (Claude Code on LXC 102) without Mikkel being +the copy-paste middleman. + +--- + +## 2. Architecture + +``` +┌──────────────┐ ┌──────────────────┐ ┌──────────────────┐ +│ claude.ai │ │ MCP Bridge │ │ Homelab Agent │ +│ (this chat) │────▶│ (Python on mgmt)│────▶│ (Claude Code) │ +│ │◀────│ │◀────│ │ +│ MCP client │ │ Queue + Dedup │ │ Telegram bot │ +└──────────────┘ └──────────────────┘ └──────────────────┘ + │ + Hosted on LXC 102 + Accessible via NetBird mesh +``` + +--- + +## 3. Queue Model + +The queue accepts multiple messages but deduplicates identical ones. +The agent triages incoming messages and announces which one it's +responding to. No locking — the agent handles interrupts per its +system prompt rules. + +```python +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +import hashlib + + +class MessageStatus(Enum): + PENDING = "pending" # In queue, not yet picked up + ACKNOWLEDGED = "acknowledged" # Agent has seen it + IN_PROGRESS = "in_progress" # Agent is actively working on it + COMPLETE = "complete" # Done + ERROR = "error" # Failed + + +@dataclass +class QueueMessage: + id: str # Auto-generated UUID + content: str # The task/message text + content_hash: str # SHA256 of content for dedup + status: MessageStatus = MessageStatus.PENDING + created_at: datetime = field(default_factory=datetime.utcnow) + acknowledged_at: datetime | None = None + completed_at: datetime | None = None + + +@dataclass +class AgentUpdate: + id: str # Auto-generated UUID + message_id: str | None # Which queued message this relates to + content: str # The status update text + created_at: datetime = field(default_factory=datetime.utcnow) + + +@dataclass +class Queue: + messages: list[QueueMessage] = field(default_factory=list) + updates: list[AgentUpdate] = field(default_factory=list) + + def enqueue(self, content: str) -> QueueMessage | None: + """Add a message. Returns None if duplicate exists.""" + content_hash = hashlib.sha256(content.encode()).hexdigest() + + # Check for duplicate among unprocessed messages + for msg in self.messages: + if (msg.content_hash == content_hash + and msg.status in (MessageStatus.PENDING, + MessageStatus.ACKNOWLEDGED, + MessageStatus.IN_PROGRESS)): + return None # Duplicate — reject + + msg = QueueMessage( + id=str(uuid4()), + content=content, + content_hash=content_hash, + ) + self.messages.append(msg) + return msg + + def pull_updates(self, since: datetime | None = None) -> list[AgentUpdate]: + """Get all updates since a timestamp.""" + if since is None: + return self.updates + return [u for u in self.updates if u.created_at > since] + + def status(self) -> dict: + """Current queue state.""" + return { + "pending": len([m for m in self.messages + if m.status == MessageStatus.PENDING]), + "in_progress": len([m for m in self.messages + if m.status == MessageStatus.IN_PROGRESS]), + "total_updates": len(self.updates), + "current_task": next( + (m.content for m in self.messages + if m.status == MessageStatus.IN_PROGRESS), + None + ), + } +``` + +--- + +## 4. MCP Server Tools + +Three tools exposed to claude.ai: + +### send_message +``` +Send a task/message to the homelab agent. +Rejected if an identical message is already pending or in progress. + +Input: { "message": "Fix the nexus.mg DNS record to 100.79.65.206" } +Output: { "queued": true, "id": "abc-123" } + or: { "queued": false, "reason": "duplicate" } +``` + +### pull_updates +``` +Get all status updates from the agent since last pull. +Returns step-by-step reports, completions, errors. + +Input: {} (or { "since": "2026-03-30T01:00:00Z" }) +Output: { + "updates": [ + { + "message_id": "abc-123", + "content": "[Step 1] Checking Technitium... ✅ Record found", + "created_at": "2026-03-30T01:02:15Z" + }, + ... + ] +} +``` + +### queue_status +``` +Check the current state of the queue. + +Input: {} +Output: { + "pending": 1, + "in_progress": 1, + "total_updates": 5, + "current_task": "Fix the nexus.mg DNS record to 100.79.65.206" +} +``` + +--- + +## 5. MCP Server Implementation + +```python +# mcp_bridge/server.py +import json +import asyncio +from datetime import datetime +from mcp.server import Server +from mcp.types import Tool, TextContent + +from .queue import Queue, MessageStatus +from .telegram_transport import TelegramTransport + + +app = Server("homelab-bridge") +queue = Queue() +transport = TelegramTransport() # Wraps existing ~/bin/telegram + + +@app.tool() +async def send_message(message: str) -> str: + """Send a task to the homelab agent. Rejects duplicates.""" + msg = queue.enqueue(message) + if msg is None: + return json.dumps({"queued": False, "reason": "duplicate"}) + + # Dispatch to agent via Telegram + await transport.send(message) + + return json.dumps({"queued": True, "id": msg.id}) + + +@app.tool() +async def pull_updates(since: str | None = None) -> str: + """Pull status updates from the agent since a given timestamp.""" + since_dt = datetime.fromisoformat(since) if since else None + updates = queue.pull_updates(since_dt) + + return json.dumps({ + "updates": [ + { + "message_id": u.message_id, + "content": u.content, + "created_at": u.created_at.isoformat(), + } + for u in updates + ] + }) + + +@app.tool() +async def queue_status() -> str: + """Check current queue state.""" + return json.dumps(queue.status()) + + +# Background task: poll Telegram for agent responses +async def poll_agent_responses(): + """Watch for incoming Telegram messages from the agent.""" + async for message in transport.receive(): + # Try to match to a queued task + current = next( + (m for m in queue.messages + if m.status == MessageStatus.IN_PROGRESS), + None + ) + queue.updates.append(AgentUpdate( + id=str(uuid4()), + message_id=current.id if current else None, + content=message, + )) + + +if __name__ == "__main__": + import asyncio + from mcp.server.stdio import stdio_server + + async def main(): + asyncio.create_task(poll_agent_responses()) + async with stdio_server() as (read, write): + await app.run(read, write) + + asyncio.run(main()) +``` + +--- + +## 6. Telegram Transport + +Wraps the existing `~/bin/telegram` script or hits the Telegram Bot API +directly. + +```python +# mcp_bridge/telegram_transport.py +import asyncio +import aiohttp +import os + + +class TelegramTransport: + def __init__(self): + self.bot_token = self._load_token() + self.chat_id = self._load_chat_id() # The agent's Telegram chat + self.base_url = f"https://api.telegram.org/bot{self.bot_token}" + self.last_update_id = 0 + + def _load_token(self) -> str: + creds_path = os.path.expanduser( + "~/homelab/telegram/credentials" + ) + with open(creds_path) as f: + # Parse token from credentials file + for line in f: + if "token" in line.lower() or "bot" in line.lower(): + return line.split("=", 1)[1].strip() + return f.read().strip() + + def _load_chat_id(self) -> str: + creds_path = os.path.expanduser( + "~/homelab/telegram/credentials" + ) + with open(creds_path) as f: + for line in f: + if "chat_id" in line.lower(): + return line.split("=", 1)[1].strip() + return "" + + async def send(self, message: str): + """Send a message to the agent via Telegram.""" + async with aiohttp.ClientSession() as session: + await session.post( + f"{self.base_url}/sendMessage", + json={ + "chat_id": self.chat_id, + "text": message, + "parse_mode": "Markdown", + }, + ) + + async def receive(self): + """Async generator that yields incoming messages.""" + async with aiohttp.ClientSession() as session: + while True: + resp = await session.get( + f"{self.base_url}/getUpdates", + params={ + "offset": self.last_update_id + 1, + "timeout": 30, # Long polling + }, + ) + data = await resp.json() + + for update in data.get("result", []): + self.last_update_id = update["update_id"] + msg = update.get("message", {}) + text = msg.get("text", "") + if text: + yield text + + await asyncio.sleep(1) +``` + +--- + +## 7. Agent-Side Integration + +The agent's system prompt (CLAUDE.md) includes awareness of the queue: + +```markdown +## Message Queue + +You may receive messages from multiple sources: +- Direct Telegram messages from Mikkel +- Queued tasks from claude.ai via the MCP bridge + +When you receive a new message while working on a task: +1. Announce: "[New message received: ]" +2. Assess urgency — if it says URGENT or STOP, halt immediately +3. If non-urgent, acknowledge and continue current task +4. Address queued messages in order after current task completes + +Always announce which message you're responding to: +"[Responding to: ]" +``` + +--- + +## 8. Deployment + +```bash +# On LXC 102 (mgmt) +cd ~/homelab +mkdir -p mcp-bridge +cd mcp-bridge + +# Create venv +python3 -m venv .venv +source .venv/bin/activate + +# Install dependencies +pip install mcp aiohttp + +# Copy the server files +# (server.py, queue.py, telegram_transport.py) + +# Test locally +python -m mcp_bridge.server +``` + +### As a systemd service: + +```ini +[Unit] +Description=Nexus MCP Bridge +After=network.target + +[Service] +Type=simple +User=mikkel +WorkingDirectory=/home/mikkel/homelab/mcp-bridge +ExecStart=/home/mikkel/homelab/mcp-bridge/.venv/bin/python -m mcp_bridge.server +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target +``` + +### MCP registration in claude.ai: + +The MCP server needs to be accessible from claude.ai. Options: +1. **HTTP transport** — Expose via NPM at mcp.georgsen.dk with SSE +2. **Direct registration** — If claude.ai supports remote MCP URLs + +The HTTP/SSE transport is the most practical since claude.ai's MCP +connections expect a URL endpoint. + +--- + +## 9. Future: Sub-Agent Dispatch + +Once the bridge works for the homelab agent, extend it to dispatch +to Nexus project agents. The queue becomes multi-target: + +``` +send_message(target="homelab", message="...") +send_message(target="nexus:felt", message="...") +send_message(target="nexus:sentry", message="...") +``` + +Each target has its own queue and its own agent on the other end. +This is where Nexus's project management layer comes in — the PM +agent decides how to route work to specialist agents. +``` + +--- + +## 10. Why Python + +- MCP SDK has first-class Python support +- Telegram bot integration already Python on mgmt (~/venv) +- Thin glue layer, not performance-critical +- ~50 messages/day between three participants +- Fastest to iterate on while figuring out queue semantics +- Agent already has Python set up and ready