436 lines
12 KiB
Markdown
436 lines
12 KiB
Markdown
# Nexus MCP Bridge: claude.ai ↔ Homelab Agent
|
|
|
|
**Author:** Mikkel Georgsen (msgeorgsen@gmail.com)
|
|
**Date:** 2026-03-30
|
|
**Language:** Python
|
|
**Runs on:** LXC 102 (mgmt)
|
|
**Transport:** Telegram bot (existing) or direct HTTP over NetBird
|
|
|
|
---
|
|
|
|
## 1. Purpose
|
|
|
|
Let Claude (in claude.ai) dispatch tasks to and pull updates from the
|
|
homelab management agent (Claude Code on LXC 102) without Mikkel being
|
|
the copy-paste middleman.
|
|
|
|
---
|
|
|
|
## 2. Architecture
|
|
|
|
```
|
|
┌──────────────┐ ┌──────────────────┐ ┌──────────────────┐
|
|
│ claude.ai │ │ MCP Bridge │ │ Homelab Agent │
|
|
│ (this chat) │────▶│ (Python on mgmt)│────▶│ (Claude Code) │
|
|
│ │◀────│ │◀────│ │
|
|
│ MCP client │ │ Queue + Dedup │ │ Telegram bot │
|
|
└──────────────┘ └──────────────────┘ └──────────────────┘
|
|
│
|
|
Hosted on LXC 102
|
|
Accessible via NetBird mesh
|
|
```
|
|
|
|
---
|
|
|
|
## 3. Queue Model
|
|
|
|
The queue accepts multiple messages but deduplicates identical ones.
|
|
The agent triages incoming messages and announces which one it's
|
|
responding to. No locking — the agent handles interrupts per its
|
|
system prompt rules.
|
|
|
|
```python
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import hashlib
|
|
|
|
|
|
class MessageStatus(Enum):
|
|
PENDING = "pending" # In queue, not yet picked up
|
|
ACKNOWLEDGED = "acknowledged" # Agent has seen it
|
|
IN_PROGRESS = "in_progress" # Agent is actively working on it
|
|
COMPLETE = "complete" # Done
|
|
ERROR = "error" # Failed
|
|
|
|
|
|
@dataclass
|
|
class QueueMessage:
|
|
id: str # Auto-generated UUID
|
|
content: str # The task/message text
|
|
content_hash: str # SHA256 of content for dedup
|
|
status: MessageStatus = MessageStatus.PENDING
|
|
created_at: datetime = field(default_factory=datetime.utcnow)
|
|
acknowledged_at: datetime | None = None
|
|
completed_at: datetime | None = None
|
|
|
|
|
|
@dataclass
|
|
class AgentUpdate:
|
|
id: str # Auto-generated UUID
|
|
message_id: str | None # Which queued message this relates to
|
|
content: str # The status update text
|
|
created_at: datetime = field(default_factory=datetime.utcnow)
|
|
|
|
|
|
@dataclass
|
|
class Queue:
|
|
messages: list[QueueMessage] = field(default_factory=list)
|
|
updates: list[AgentUpdate] = field(default_factory=list)
|
|
|
|
def enqueue(self, content: str) -> QueueMessage | None:
|
|
"""Add a message. Returns None if duplicate exists."""
|
|
content_hash = hashlib.sha256(content.encode()).hexdigest()
|
|
|
|
# Check for duplicate among unprocessed messages
|
|
for msg in self.messages:
|
|
if (msg.content_hash == content_hash
|
|
and msg.status in (MessageStatus.PENDING,
|
|
MessageStatus.ACKNOWLEDGED,
|
|
MessageStatus.IN_PROGRESS)):
|
|
return None # Duplicate — reject
|
|
|
|
msg = QueueMessage(
|
|
id=str(uuid4()),
|
|
content=content,
|
|
content_hash=content_hash,
|
|
)
|
|
self.messages.append(msg)
|
|
return msg
|
|
|
|
def pull_updates(self, since: datetime | None = None) -> list[AgentUpdate]:
|
|
"""Get all updates since a timestamp."""
|
|
if since is None:
|
|
return self.updates
|
|
return [u for u in self.updates if u.created_at > since]
|
|
|
|
def status(self) -> dict:
|
|
"""Current queue state."""
|
|
return {
|
|
"pending": len([m for m in self.messages
|
|
if m.status == MessageStatus.PENDING]),
|
|
"in_progress": len([m for m in self.messages
|
|
if m.status == MessageStatus.IN_PROGRESS]),
|
|
"total_updates": len(self.updates),
|
|
"current_task": next(
|
|
(m.content for m in self.messages
|
|
if m.status == MessageStatus.IN_PROGRESS),
|
|
None
|
|
),
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 4. MCP Server Tools
|
|
|
|
Three tools exposed to claude.ai:
|
|
|
|
### send_message
|
|
```
|
|
Send a task/message to the homelab agent.
|
|
Rejected if an identical message is already pending or in progress.
|
|
|
|
Input: { "message": "Fix the nexus.mg DNS record to 100.79.65.206" }
|
|
Output: { "queued": true, "id": "abc-123" }
|
|
or: { "queued": false, "reason": "duplicate" }
|
|
```
|
|
|
|
### pull_updates
|
|
```
|
|
Get all status updates from the agent since last pull.
|
|
Returns step-by-step reports, completions, errors.
|
|
|
|
Input: {} (or { "since": "2026-03-30T01:00:00Z" })
|
|
Output: {
|
|
"updates": [
|
|
{
|
|
"message_id": "abc-123",
|
|
"content": "[Step 1] Checking Technitium... ✅ Record found",
|
|
"created_at": "2026-03-30T01:02:15Z"
|
|
},
|
|
...
|
|
]
|
|
}
|
|
```
|
|
|
|
### queue_status
|
|
```
|
|
Check the current state of the queue.
|
|
|
|
Input: {}
|
|
Output: {
|
|
"pending": 1,
|
|
"in_progress": 1,
|
|
"total_updates": 5,
|
|
"current_task": "Fix the nexus.mg DNS record to 100.79.65.206"
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 5. MCP Server Implementation
|
|
|
|
```python
|
|
# mcp_bridge/server.py
|
|
import json
|
|
import asyncio
|
|
from datetime import datetime
|
|
from mcp.server import Server
|
|
from mcp.types import Tool, TextContent
|
|
|
|
from .queue import Queue, MessageStatus
|
|
from .telegram_transport import TelegramTransport
|
|
|
|
|
|
app = Server("homelab-bridge")
|
|
queue = Queue()
|
|
transport = TelegramTransport() # Wraps existing ~/bin/telegram
|
|
|
|
|
|
@app.tool()
|
|
async def send_message(message: str) -> str:
|
|
"""Send a task to the homelab agent. Rejects duplicates."""
|
|
msg = queue.enqueue(message)
|
|
if msg is None:
|
|
return json.dumps({"queued": False, "reason": "duplicate"})
|
|
|
|
# Dispatch to agent via Telegram
|
|
await transport.send(message)
|
|
|
|
return json.dumps({"queued": True, "id": msg.id})
|
|
|
|
|
|
@app.tool()
|
|
async def pull_updates(since: str | None = None) -> str:
|
|
"""Pull status updates from the agent since a given timestamp."""
|
|
since_dt = datetime.fromisoformat(since) if since else None
|
|
updates = queue.pull_updates(since_dt)
|
|
|
|
return json.dumps({
|
|
"updates": [
|
|
{
|
|
"message_id": u.message_id,
|
|
"content": u.content,
|
|
"created_at": u.created_at.isoformat(),
|
|
}
|
|
for u in updates
|
|
]
|
|
})
|
|
|
|
|
|
@app.tool()
|
|
async def queue_status() -> str:
|
|
"""Check current queue state."""
|
|
return json.dumps(queue.status())
|
|
|
|
|
|
# Background task: poll Telegram for agent responses
|
|
async def poll_agent_responses():
|
|
"""Watch for incoming Telegram messages from the agent."""
|
|
async for message in transport.receive():
|
|
# Try to match to a queued task
|
|
current = next(
|
|
(m for m in queue.messages
|
|
if m.status == MessageStatus.IN_PROGRESS),
|
|
None
|
|
)
|
|
queue.updates.append(AgentUpdate(
|
|
id=str(uuid4()),
|
|
message_id=current.id if current else None,
|
|
content=message,
|
|
))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import asyncio
|
|
from mcp.server.stdio import stdio_server
|
|
|
|
async def main():
|
|
asyncio.create_task(poll_agent_responses())
|
|
async with stdio_server() as (read, write):
|
|
await app.run(read, write)
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
---
|
|
|
|
## 6. Telegram Transport
|
|
|
|
Wraps the existing `~/bin/telegram` script or hits the Telegram Bot API
|
|
directly.
|
|
|
|
```python
|
|
# mcp_bridge/telegram_transport.py
|
|
import asyncio
|
|
import aiohttp
|
|
import os
|
|
|
|
|
|
class TelegramTransport:
|
|
def __init__(self):
|
|
self.bot_token = self._load_token()
|
|
self.chat_id = self._load_chat_id() # The agent's Telegram chat
|
|
self.base_url = f"https://api.telegram.org/bot{self.bot_token}"
|
|
self.last_update_id = 0
|
|
|
|
def _load_token(self) -> str:
|
|
creds_path = os.path.expanduser(
|
|
"~/homelab/telegram/credentials"
|
|
)
|
|
with open(creds_path) as f:
|
|
# Parse token from credentials file
|
|
for line in f:
|
|
if "token" in line.lower() or "bot" in line.lower():
|
|
return line.split("=", 1)[1].strip()
|
|
return f.read().strip()
|
|
|
|
def _load_chat_id(self) -> str:
|
|
creds_path = os.path.expanduser(
|
|
"~/homelab/telegram/credentials"
|
|
)
|
|
with open(creds_path) as f:
|
|
for line in f:
|
|
if "chat_id" in line.lower():
|
|
return line.split("=", 1)[1].strip()
|
|
return ""
|
|
|
|
async def send(self, message: str):
|
|
"""Send a message to the agent via Telegram."""
|
|
async with aiohttp.ClientSession() as session:
|
|
await session.post(
|
|
f"{self.base_url}/sendMessage",
|
|
json={
|
|
"chat_id": self.chat_id,
|
|
"text": message,
|
|
"parse_mode": "Markdown",
|
|
},
|
|
)
|
|
|
|
async def receive(self):
|
|
"""Async generator that yields incoming messages."""
|
|
async with aiohttp.ClientSession() as session:
|
|
while True:
|
|
resp = await session.get(
|
|
f"{self.base_url}/getUpdates",
|
|
params={
|
|
"offset": self.last_update_id + 1,
|
|
"timeout": 30, # Long polling
|
|
},
|
|
)
|
|
data = await resp.json()
|
|
|
|
for update in data.get("result", []):
|
|
self.last_update_id = update["update_id"]
|
|
msg = update.get("message", {})
|
|
text = msg.get("text", "")
|
|
if text:
|
|
yield text
|
|
|
|
await asyncio.sleep(1)
|
|
```
|
|
|
|
---
|
|
|
|
## 7. Agent-Side Integration
|
|
|
|
The agent's system prompt (CLAUDE.md) includes awareness of the queue:
|
|
|
|
```markdown
|
|
## Message Queue
|
|
|
|
You may receive messages from multiple sources:
|
|
- Direct Telegram messages from Mikkel
|
|
- Queued tasks from claude.ai via the MCP bridge
|
|
|
|
When you receive a new message while working on a task:
|
|
1. Announce: "[New message received: <summary>]"
|
|
2. Assess urgency — if it says URGENT or STOP, halt immediately
|
|
3. If non-urgent, acknowledge and continue current task
|
|
4. Address queued messages in order after current task completes
|
|
|
|
Always announce which message you're responding to:
|
|
"[Responding to: <first line of message>]"
|
|
```
|
|
|
|
---
|
|
|
|
## 8. Deployment
|
|
|
|
```bash
|
|
# On LXC 102 (mgmt)
|
|
cd ~/homelab
|
|
mkdir -p mcp-bridge
|
|
cd mcp-bridge
|
|
|
|
# Create venv
|
|
python3 -m venv .venv
|
|
source .venv/bin/activate
|
|
|
|
# Install dependencies
|
|
pip install mcp aiohttp
|
|
|
|
# Copy the server files
|
|
# (server.py, queue.py, telegram_transport.py)
|
|
|
|
# Test locally
|
|
python -m mcp_bridge.server
|
|
```
|
|
|
|
### As a systemd service:
|
|
|
|
```ini
|
|
[Unit]
|
|
Description=Nexus MCP Bridge
|
|
After=network.target
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=mikkel
|
|
WorkingDirectory=/home/mikkel/homelab/mcp-bridge
|
|
ExecStart=/home/mikkel/homelab/mcp-bridge/.venv/bin/python -m mcp_bridge.server
|
|
Restart=always
|
|
RestartSec=5
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
```
|
|
|
|
### MCP registration in claude.ai:
|
|
|
|
The MCP server needs to be accessible from claude.ai. Options:
|
|
1. **HTTP transport** — Expose via NPM at mcp.georgsen.dk with SSE
|
|
2. **Direct registration** — If claude.ai supports remote MCP URLs
|
|
|
|
The HTTP/SSE transport is the most practical since claude.ai's MCP
|
|
connections expect a URL endpoint.
|
|
|
|
---
|
|
|
|
## 9. Future: Sub-Agent Dispatch
|
|
|
|
Once the bridge works for the homelab agent, extend it to dispatch
|
|
to Nexus project agents. The queue becomes multi-target:
|
|
|
|
```
|
|
send_message(target="homelab", message="...")
|
|
send_message(target="nexus:felt", message="...")
|
|
send_message(target="nexus:sentry", message="...")
|
|
```
|
|
|
|
Each target has its own queue and its own agent on the other end.
|
|
This is where Nexus's project management layer comes in — the PM
|
|
agent decides how to route work to specialist agents.
|
|
```
|
|
|
|
---
|
|
|
|
## 10. Why Python
|
|
|
|
- MCP SDK has first-class Python support
|
|
- Telegram bot integration already Python on mgmt (~/venv)
|
|
- Thin glue layer, not performance-critical
|
|
- ~50 messages/day between three participants
|
|
- Fastest to iterate on while figuring out queue semantics
|
|
- Agent already has Python set up and ready
|