telegram-bot-mcp/docs/nexus-mcp-bridge-spec.md
2026-03-29 23:32:03 +00:00

436 lines
12 KiB
Markdown

# Nexus MCP Bridge: claude.ai ↔ Homelab Agent
**Author:** Mikkel Georgsen (msgeorgsen@gmail.com)
**Date:** 2026-03-30
**Language:** Python
**Runs on:** LXC 102 (mgmt)
**Transport:** Telegram bot (existing) or direct HTTP over NetBird
---
## 1. Purpose
Let Claude (in claude.ai) dispatch tasks to and pull updates from the
homelab management agent (Claude Code on LXC 102) without Mikkel being
the copy-paste middleman.
---
## 2. Architecture
```
┌──────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ claude.ai │ │ MCP Bridge │ │ Homelab Agent │
│ (this chat) │────▶│ (Python on mgmt)│────▶│ (Claude Code) │
│ │◀────│ │◀────│ │
│ MCP client │ │ Queue + Dedup │ │ Telegram bot │
└──────────────┘ └──────────────────┘ └──────────────────┘
Hosted on LXC 102
Accessible via NetBird mesh
```
---
## 3. Queue Model
The queue accepts multiple messages but deduplicates identical ones.
The agent triages incoming messages and announces which one it's
responding to. No locking — the agent handles interrupts per its
system prompt rules.
```python
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import hashlib
class MessageStatus(Enum):
PENDING = "pending" # In queue, not yet picked up
ACKNOWLEDGED = "acknowledged" # Agent has seen it
IN_PROGRESS = "in_progress" # Agent is actively working on it
COMPLETE = "complete" # Done
ERROR = "error" # Failed
@dataclass
class QueueMessage:
id: str # Auto-generated UUID
content: str # The task/message text
content_hash: str # SHA256 of content for dedup
status: MessageStatus = MessageStatus.PENDING
created_at: datetime = field(default_factory=datetime.utcnow)
acknowledged_at: datetime | None = None
completed_at: datetime | None = None
@dataclass
class AgentUpdate:
id: str # Auto-generated UUID
message_id: str | None # Which queued message this relates to
content: str # The status update text
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class Queue:
messages: list[QueueMessage] = field(default_factory=list)
updates: list[AgentUpdate] = field(default_factory=list)
def enqueue(self, content: str) -> QueueMessage | None:
"""Add a message. Returns None if duplicate exists."""
content_hash = hashlib.sha256(content.encode()).hexdigest()
# Check for duplicate among unprocessed messages
for msg in self.messages:
if (msg.content_hash == content_hash
and msg.status in (MessageStatus.PENDING,
MessageStatus.ACKNOWLEDGED,
MessageStatus.IN_PROGRESS)):
return None # Duplicate — reject
msg = QueueMessage(
id=str(uuid4()),
content=content,
content_hash=content_hash,
)
self.messages.append(msg)
return msg
def pull_updates(self, since: datetime | None = None) -> list[AgentUpdate]:
"""Get all updates since a timestamp."""
if since is None:
return self.updates
return [u for u in self.updates if u.created_at > since]
def status(self) -> dict:
"""Current queue state."""
return {
"pending": len([m for m in self.messages
if m.status == MessageStatus.PENDING]),
"in_progress": len([m for m in self.messages
if m.status == MessageStatus.IN_PROGRESS]),
"total_updates": len(self.updates),
"current_task": next(
(m.content for m in self.messages
if m.status == MessageStatus.IN_PROGRESS),
None
),
}
```
---
## 4. MCP Server Tools
Three tools exposed to claude.ai:
### send_message
```
Send a task/message to the homelab agent.
Rejected if an identical message is already pending or in progress.
Input: { "message": "Fix the nexus.mg DNS record to 100.79.65.206" }
Output: { "queued": true, "id": "abc-123" }
or: { "queued": false, "reason": "duplicate" }
```
### pull_updates
```
Get all status updates from the agent since last pull.
Returns step-by-step reports, completions, errors.
Input: {} (or { "since": "2026-03-30T01:00:00Z" })
Output: {
"updates": [
{
"message_id": "abc-123",
"content": "[Step 1] Checking Technitium... ✅ Record found",
"created_at": "2026-03-30T01:02:15Z"
},
...
]
}
```
### queue_status
```
Check the current state of the queue.
Input: {}
Output: {
"pending": 1,
"in_progress": 1,
"total_updates": 5,
"current_task": "Fix the nexus.mg DNS record to 100.79.65.206"
}
```
---
## 5. MCP Server Implementation
```python
# mcp_bridge/server.py
import json
import asyncio
from datetime import datetime
from mcp.server import Server
from mcp.types import Tool, TextContent
from .queue import Queue, MessageStatus
from .telegram_transport import TelegramTransport
app = Server("homelab-bridge")
queue = Queue()
transport = TelegramTransport() # Wraps existing ~/bin/telegram
@app.tool()
async def send_message(message: str) -> str:
"""Send a task to the homelab agent. Rejects duplicates."""
msg = queue.enqueue(message)
if msg is None:
return json.dumps({"queued": False, "reason": "duplicate"})
# Dispatch to agent via Telegram
await transport.send(message)
return json.dumps({"queued": True, "id": msg.id})
@app.tool()
async def pull_updates(since: str | None = None) -> str:
"""Pull status updates from the agent since a given timestamp."""
since_dt = datetime.fromisoformat(since) if since else None
updates = queue.pull_updates(since_dt)
return json.dumps({
"updates": [
{
"message_id": u.message_id,
"content": u.content,
"created_at": u.created_at.isoformat(),
}
for u in updates
]
})
@app.tool()
async def queue_status() -> str:
"""Check current queue state."""
return json.dumps(queue.status())
# Background task: poll Telegram for agent responses
async def poll_agent_responses():
"""Watch for incoming Telegram messages from the agent."""
async for message in transport.receive():
# Try to match to a queued task
current = next(
(m for m in queue.messages
if m.status == MessageStatus.IN_PROGRESS),
None
)
queue.updates.append(AgentUpdate(
id=str(uuid4()),
message_id=current.id if current else None,
content=message,
))
if __name__ == "__main__":
import asyncio
from mcp.server.stdio import stdio_server
async def main():
asyncio.create_task(poll_agent_responses())
async with stdio_server() as (read, write):
await app.run(read, write)
asyncio.run(main())
```
---
## 6. Telegram Transport
Wraps the existing `~/bin/telegram` script or hits the Telegram Bot API
directly.
```python
# mcp_bridge/telegram_transport.py
import asyncio
import aiohttp
import os
class TelegramTransport:
def __init__(self):
self.bot_token = self._load_token()
self.chat_id = self._load_chat_id() # The agent's Telegram chat
self.base_url = f"https://api.telegram.org/bot{self.bot_token}"
self.last_update_id = 0
def _load_token(self) -> str:
creds_path = os.path.expanduser(
"~/homelab/telegram/credentials"
)
with open(creds_path) as f:
# Parse token from credentials file
for line in f:
if "token" in line.lower() or "bot" in line.lower():
return line.split("=", 1)[1].strip()
return f.read().strip()
def _load_chat_id(self) -> str:
creds_path = os.path.expanduser(
"~/homelab/telegram/credentials"
)
with open(creds_path) as f:
for line in f:
if "chat_id" in line.lower():
return line.split("=", 1)[1].strip()
return ""
async def send(self, message: str):
"""Send a message to the agent via Telegram."""
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.base_url}/sendMessage",
json={
"chat_id": self.chat_id,
"text": message,
"parse_mode": "Markdown",
},
)
async def receive(self):
"""Async generator that yields incoming messages."""
async with aiohttp.ClientSession() as session:
while True:
resp = await session.get(
f"{self.base_url}/getUpdates",
params={
"offset": self.last_update_id + 1,
"timeout": 30, # Long polling
},
)
data = await resp.json()
for update in data.get("result", []):
self.last_update_id = update["update_id"]
msg = update.get("message", {})
text = msg.get("text", "")
if text:
yield text
await asyncio.sleep(1)
```
---
## 7. Agent-Side Integration
The agent's system prompt (CLAUDE.md) includes awareness of the queue:
```markdown
## Message Queue
You may receive messages from multiple sources:
- Direct Telegram messages from Mikkel
- Queued tasks from claude.ai via the MCP bridge
When you receive a new message while working on a task:
1. Announce: "[New message received: <summary>]"
2. Assess urgency — if it says URGENT or STOP, halt immediately
3. If non-urgent, acknowledge and continue current task
4. Address queued messages in order after current task completes
Always announce which message you're responding to:
"[Responding to: <first line of message>]"
```
---
## 8. Deployment
```bash
# On LXC 102 (mgmt)
cd ~/homelab
mkdir -p mcp-bridge
cd mcp-bridge
# Create venv
python3 -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install mcp aiohttp
# Copy the server files
# (server.py, queue.py, telegram_transport.py)
# Test locally
python -m mcp_bridge.server
```
### As a systemd service:
```ini
[Unit]
Description=Nexus MCP Bridge
After=network.target
[Service]
Type=simple
User=mikkel
WorkingDirectory=/home/mikkel/homelab/mcp-bridge
ExecStart=/home/mikkel/homelab/mcp-bridge/.venv/bin/python -m mcp_bridge.server
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
```
### MCP registration in claude.ai:
The MCP server needs to be accessible from claude.ai. Options:
1. **HTTP transport** — Expose via NPM at mcp.georgsen.dk with SSE
2. **Direct registration** — If claude.ai supports remote MCP URLs
The HTTP/SSE transport is the most practical since claude.ai's MCP
connections expect a URL endpoint.
---
## 9. Future: Sub-Agent Dispatch
Once the bridge works for the homelab agent, extend it to dispatch
to Nexus project agents. The queue becomes multi-target:
```
send_message(target="homelab", message="...")
send_message(target="nexus:felt", message="...")
send_message(target="nexus:sentry", message="...")
```
Each target has its own queue and its own agent on the other end.
This is where Nexus's project management layer comes in — the PM
agent decides how to route work to specialist agents.
```
---
## 10. Why Python
- MCP SDK has first-class Python support
- Telegram bot integration already Python on mgmt (~/venv)
- Thin glue layer, not performance-critical
- ~50 messages/day between three participants
- Fastest to iterate on while figuring out queue semantics
- Agent already has Python set up and ready