Initial spec
This commit is contained in:
parent
257bb36782
commit
c339980411
1 changed files with 436 additions and 0 deletions
436
docs/nexus-mcp-bridge-spec.md
Normal file
436
docs/nexus-mcp-bridge-spec.md
Normal file
|
|
@ -0,0 +1,436 @@
|
|||
# Nexus MCP Bridge: claude.ai ↔ Homelab Agent
|
||||
|
||||
**Author:** Mikkel Georgsen (msgeorgsen@gmail.com)
|
||||
**Date:** 2026-03-30
|
||||
**Language:** Python
|
||||
**Runs on:** LXC 102 (mgmt)
|
||||
**Transport:** Telegram bot (existing) or direct HTTP over NetBird
|
||||
|
||||
---
|
||||
|
||||
## 1. Purpose
|
||||
|
||||
Let Claude (in claude.ai) dispatch tasks to and pull updates from the
|
||||
homelab management agent (Claude Code on LXC 102) without Mikkel being
|
||||
the copy-paste middleman.
|
||||
|
||||
---
|
||||
|
||||
## 2. Architecture
|
||||
|
||||
```
|
||||
┌──────────────┐ ┌──────────────────┐ ┌──────────────────┐
|
||||
│ claude.ai │ │ MCP Bridge │ │ Homelab Agent │
|
||||
│ (this chat) │────▶│ (Python on mgmt)│────▶│ (Claude Code) │
|
||||
│ │◀────│ │◀────│ │
|
||||
│ MCP client │ │ Queue + Dedup │ │ Telegram bot │
|
||||
└──────────────┘ └──────────────────┘ └──────────────────┘
|
||||
│
|
||||
Hosted on LXC 102
|
||||
Accessible via NetBird mesh
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 3. Queue Model
|
||||
|
||||
The queue accepts multiple messages but deduplicates identical ones.
|
||||
The agent triages incoming messages and announces which one it's
|
||||
responding to. No locking — the agent handles interrupts per its
|
||||
system prompt rules.
|
||||
|
||||
```python
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
import hashlib
|
||||
|
||||
|
||||
class MessageStatus(Enum):
|
||||
PENDING = "pending" # In queue, not yet picked up
|
||||
ACKNOWLEDGED = "acknowledged" # Agent has seen it
|
||||
IN_PROGRESS = "in_progress" # Agent is actively working on it
|
||||
COMPLETE = "complete" # Done
|
||||
ERROR = "error" # Failed
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueueMessage:
|
||||
id: str # Auto-generated UUID
|
||||
content: str # The task/message text
|
||||
content_hash: str # SHA256 of content for dedup
|
||||
status: MessageStatus = MessageStatus.PENDING
|
||||
created_at: datetime = field(default_factory=datetime.utcnow)
|
||||
acknowledged_at: datetime | None = None
|
||||
completed_at: datetime | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentUpdate:
|
||||
id: str # Auto-generated UUID
|
||||
message_id: str | None # Which queued message this relates to
|
||||
content: str # The status update text
|
||||
created_at: datetime = field(default_factory=datetime.utcnow)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Queue:
|
||||
messages: list[QueueMessage] = field(default_factory=list)
|
||||
updates: list[AgentUpdate] = field(default_factory=list)
|
||||
|
||||
def enqueue(self, content: str) -> QueueMessage | None:
|
||||
"""Add a message. Returns None if duplicate exists."""
|
||||
content_hash = hashlib.sha256(content.encode()).hexdigest()
|
||||
|
||||
# Check for duplicate among unprocessed messages
|
||||
for msg in self.messages:
|
||||
if (msg.content_hash == content_hash
|
||||
and msg.status in (MessageStatus.PENDING,
|
||||
MessageStatus.ACKNOWLEDGED,
|
||||
MessageStatus.IN_PROGRESS)):
|
||||
return None # Duplicate — reject
|
||||
|
||||
msg = QueueMessage(
|
||||
id=str(uuid4()),
|
||||
content=content,
|
||||
content_hash=content_hash,
|
||||
)
|
||||
self.messages.append(msg)
|
||||
return msg
|
||||
|
||||
def pull_updates(self, since: datetime | None = None) -> list[AgentUpdate]:
|
||||
"""Get all updates since a timestamp."""
|
||||
if since is None:
|
||||
return self.updates
|
||||
return [u for u in self.updates if u.created_at > since]
|
||||
|
||||
def status(self) -> dict:
|
||||
"""Current queue state."""
|
||||
return {
|
||||
"pending": len([m for m in self.messages
|
||||
if m.status == MessageStatus.PENDING]),
|
||||
"in_progress": len([m for m in self.messages
|
||||
if m.status == MessageStatus.IN_PROGRESS]),
|
||||
"total_updates": len(self.updates),
|
||||
"current_task": next(
|
||||
(m.content for m in self.messages
|
||||
if m.status == MessageStatus.IN_PROGRESS),
|
||||
None
|
||||
),
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 4. MCP Server Tools
|
||||
|
||||
Three tools exposed to claude.ai:
|
||||
|
||||
### send_message
|
||||
```
|
||||
Send a task/message to the homelab agent.
|
||||
Rejected if an identical message is already pending or in progress.
|
||||
|
||||
Input: { "message": "Fix the nexus.mg DNS record to 100.79.65.206" }
|
||||
Output: { "queued": true, "id": "abc-123" }
|
||||
or: { "queued": false, "reason": "duplicate" }
|
||||
```
|
||||
|
||||
### pull_updates
|
||||
```
|
||||
Get all status updates from the agent since last pull.
|
||||
Returns step-by-step reports, completions, errors.
|
||||
|
||||
Input: {} (or { "since": "2026-03-30T01:00:00Z" })
|
||||
Output: {
|
||||
"updates": [
|
||||
{
|
||||
"message_id": "abc-123",
|
||||
"content": "[Step 1] Checking Technitium... ✅ Record found",
|
||||
"created_at": "2026-03-30T01:02:15Z"
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### queue_status
|
||||
```
|
||||
Check the current state of the queue.
|
||||
|
||||
Input: {}
|
||||
Output: {
|
||||
"pending": 1,
|
||||
"in_progress": 1,
|
||||
"total_updates": 5,
|
||||
"current_task": "Fix the nexus.mg DNS record to 100.79.65.206"
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. MCP Server Implementation
|
||||
|
||||
```python
|
||||
# mcp_bridge/server.py
|
||||
import json
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from mcp.server import Server
|
||||
from mcp.types import Tool, TextContent
|
||||
|
||||
from .queue import Queue, MessageStatus
|
||||
from .telegram_transport import TelegramTransport
|
||||
|
||||
|
||||
app = Server("homelab-bridge")
|
||||
queue = Queue()
|
||||
transport = TelegramTransport() # Wraps existing ~/bin/telegram
|
||||
|
||||
|
||||
@app.tool()
|
||||
async def send_message(message: str) -> str:
|
||||
"""Send a task to the homelab agent. Rejects duplicates."""
|
||||
msg = queue.enqueue(message)
|
||||
if msg is None:
|
||||
return json.dumps({"queued": False, "reason": "duplicate"})
|
||||
|
||||
# Dispatch to agent via Telegram
|
||||
await transport.send(message)
|
||||
|
||||
return json.dumps({"queued": True, "id": msg.id})
|
||||
|
||||
|
||||
@app.tool()
|
||||
async def pull_updates(since: str | None = None) -> str:
|
||||
"""Pull status updates from the agent since a given timestamp."""
|
||||
since_dt = datetime.fromisoformat(since) if since else None
|
||||
updates = queue.pull_updates(since_dt)
|
||||
|
||||
return json.dumps({
|
||||
"updates": [
|
||||
{
|
||||
"message_id": u.message_id,
|
||||
"content": u.content,
|
||||
"created_at": u.created_at.isoformat(),
|
||||
}
|
||||
for u in updates
|
||||
]
|
||||
})
|
||||
|
||||
|
||||
@app.tool()
|
||||
async def queue_status() -> str:
|
||||
"""Check current queue state."""
|
||||
return json.dumps(queue.status())
|
||||
|
||||
|
||||
# Background task: poll Telegram for agent responses
|
||||
async def poll_agent_responses():
|
||||
"""Watch for incoming Telegram messages from the agent."""
|
||||
async for message in transport.receive():
|
||||
# Try to match to a queued task
|
||||
current = next(
|
||||
(m for m in queue.messages
|
||||
if m.status == MessageStatus.IN_PROGRESS),
|
||||
None
|
||||
)
|
||||
queue.updates.append(AgentUpdate(
|
||||
id=str(uuid4()),
|
||||
message_id=current.id if current else None,
|
||||
content=message,
|
||||
))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
from mcp.server.stdio import stdio_server
|
||||
|
||||
async def main():
|
||||
asyncio.create_task(poll_agent_responses())
|
||||
async with stdio_server() as (read, write):
|
||||
await app.run(read, write)
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 6. Telegram Transport
|
||||
|
||||
Wraps the existing `~/bin/telegram` script or hits the Telegram Bot API
|
||||
directly.
|
||||
|
||||
```python
|
||||
# mcp_bridge/telegram_transport.py
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import os
|
||||
|
||||
|
||||
class TelegramTransport:
|
||||
def __init__(self):
|
||||
self.bot_token = self._load_token()
|
||||
self.chat_id = self._load_chat_id() # The agent's Telegram chat
|
||||
self.base_url = f"https://api.telegram.org/bot{self.bot_token}"
|
||||
self.last_update_id = 0
|
||||
|
||||
def _load_token(self) -> str:
|
||||
creds_path = os.path.expanduser(
|
||||
"~/homelab/telegram/credentials"
|
||||
)
|
||||
with open(creds_path) as f:
|
||||
# Parse token from credentials file
|
||||
for line in f:
|
||||
if "token" in line.lower() or "bot" in line.lower():
|
||||
return line.split("=", 1)[1].strip()
|
||||
return f.read().strip()
|
||||
|
||||
def _load_chat_id(self) -> str:
|
||||
creds_path = os.path.expanduser(
|
||||
"~/homelab/telegram/credentials"
|
||||
)
|
||||
with open(creds_path) as f:
|
||||
for line in f:
|
||||
if "chat_id" in line.lower():
|
||||
return line.split("=", 1)[1].strip()
|
||||
return ""
|
||||
|
||||
async def send(self, message: str):
|
||||
"""Send a message to the agent via Telegram."""
|
||||
async with aiohttp.ClientSession() as session:
|
||||
await session.post(
|
||||
f"{self.base_url}/sendMessage",
|
||||
json={
|
||||
"chat_id": self.chat_id,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown",
|
||||
},
|
||||
)
|
||||
|
||||
async def receive(self):
|
||||
"""Async generator that yields incoming messages."""
|
||||
async with aiohttp.ClientSession() as session:
|
||||
while True:
|
||||
resp = await session.get(
|
||||
f"{self.base_url}/getUpdates",
|
||||
params={
|
||||
"offset": self.last_update_id + 1,
|
||||
"timeout": 30, # Long polling
|
||||
},
|
||||
)
|
||||
data = await resp.json()
|
||||
|
||||
for update in data.get("result", []):
|
||||
self.last_update_id = update["update_id"]
|
||||
msg = update.get("message", {})
|
||||
text = msg.get("text", "")
|
||||
if text:
|
||||
yield text
|
||||
|
||||
await asyncio.sleep(1)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 7. Agent-Side Integration
|
||||
|
||||
The agent's system prompt (CLAUDE.md) includes awareness of the queue:
|
||||
|
||||
```markdown
|
||||
## Message Queue
|
||||
|
||||
You may receive messages from multiple sources:
|
||||
- Direct Telegram messages from Mikkel
|
||||
- Queued tasks from claude.ai via the MCP bridge
|
||||
|
||||
When you receive a new message while working on a task:
|
||||
1. Announce: "[New message received: <summary>]"
|
||||
2. Assess urgency — if it says URGENT or STOP, halt immediately
|
||||
3. If non-urgent, acknowledge and continue current task
|
||||
4. Address queued messages in order after current task completes
|
||||
|
||||
Always announce which message you're responding to:
|
||||
"[Responding to: <first line of message>]"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 8. Deployment
|
||||
|
||||
```bash
|
||||
# On LXC 102 (mgmt)
|
||||
cd ~/homelab
|
||||
mkdir -p mcp-bridge
|
||||
cd mcp-bridge
|
||||
|
||||
# Create venv
|
||||
python3 -m venv .venv
|
||||
source .venv/bin/activate
|
||||
|
||||
# Install dependencies
|
||||
pip install mcp aiohttp
|
||||
|
||||
# Copy the server files
|
||||
# (server.py, queue.py, telegram_transport.py)
|
||||
|
||||
# Test locally
|
||||
python -m mcp_bridge.server
|
||||
```
|
||||
|
||||
### As a systemd service:
|
||||
|
||||
```ini
|
||||
[Unit]
|
||||
Description=Nexus MCP Bridge
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=mikkel
|
||||
WorkingDirectory=/home/mikkel/homelab/mcp-bridge
|
||||
ExecStart=/home/mikkel/homelab/mcp-bridge/.venv/bin/python -m mcp_bridge.server
|
||||
Restart=always
|
||||
RestartSec=5
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
```
|
||||
|
||||
### MCP registration in claude.ai:
|
||||
|
||||
The MCP server needs to be accessible from claude.ai. Options:
|
||||
1. **HTTP transport** — Expose via NPM at mcp.georgsen.dk with SSE
|
||||
2. **Direct registration** — If claude.ai supports remote MCP URLs
|
||||
|
||||
The HTTP/SSE transport is the most practical since claude.ai's MCP
|
||||
connections expect a URL endpoint.
|
||||
|
||||
---
|
||||
|
||||
## 9. Future: Sub-Agent Dispatch
|
||||
|
||||
Once the bridge works for the homelab agent, extend it to dispatch
|
||||
to Nexus project agents. The queue becomes multi-target:
|
||||
|
||||
```
|
||||
send_message(target="homelab", message="...")
|
||||
send_message(target="nexus:felt", message="...")
|
||||
send_message(target="nexus:sentry", message="...")
|
||||
```
|
||||
|
||||
Each target has its own queue and its own agent on the other end.
|
||||
This is where Nexus's project management layer comes in — the PM
|
||||
agent decides how to route work to specialist agents.
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 10. Why Python
|
||||
|
||||
- MCP SDK has first-class Python support
|
||||
- Telegram bot integration already Python on mgmt (~/venv)
|
||||
- Thin glue layer, not performance-critical
|
||||
- ~50 messages/day between three participants
|
||||
- Fastest to iterate on while figuring out queue semantics
|
||||
- Agent already has Python set up and ready
|
||||
Loading…
Add table
Reference in a new issue